faust.worker

Worker.

A “worker” starts a single instance of a Faust application.

See also

Starting the App: for more information.

class faust.worker.Worker(app: ~faust.types.app.AppT, *services: ~mode.types.services.ServiceT, sensors: ~typing.Optional[~typing.Iterable[~faust.types.sensors.SensorT]] = None, debug: bool = False, quiet: bool = False, loglevel: ~typing.Optional[~typing.Union[str, int]] = None, logfile: ~typing.Optional[~typing.Union[str, ~typing.IO]] = None, stdout: ~typing.IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, stderr: ~typing.IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>, blocking_timeout: ~typing.Optional[float] = None, workdir: ~typing.Optional[~typing.Union[~pathlib.Path, str]] = None, console_port: int = 50101, loop: ~typing.Optional[~asyncio.events.AbstractEventLoop] = None, redirect_stdouts: ~typing.Optional[bool] = None, redirect_stdouts_level: ~typing.Optional[~typing.Union[str, int]] = None, logging_config: ~typing.Optional[~typing.Dict] = None, **kwargs: ~typing.Any)[source]

Worker.

See also

This is a subclass of mode.Worker.

Usage:

You can start a worker using:

  1. the faust worker program.

  2. instantiating Worker programmatically and calling execute_from_commandline():

    >>> worker = Worker(app)
    >>> worker.execute_from_commandline()
    
  3. or if you already have an event loop, calling await start, but in that case you are responsible for gracefully shutting down the event loop:

    async def start_worker(worker: Worker) -> None:
        await worker.start()
    
    def manage_loop():
        loop = asyncio.get_event_loop_policy().get_event_loop()
        worker = Worker(app, loop=loop)
        try:
            loop.run_until_complete(start_worker(worker)
        finally:
            worker.stop_and_shutdown_loop()
    
Parameters:
  • app (AppT) – The Faust app to start.

  • *services – Services to start with worker. This includes application instances to start.

  • sensors (Iterable[SensorT]) – List of sensors to include.

  • debug (bool) – Enables debugging mode [disabled by default].

  • quiet (bool) – Do not output anything to console [disabled by default].

  • loglevel (Union[str, int]) – Level to use for logging, can be string (one of: CRIT|ERROR|WARN|INFO|DEBUG), or integer.

  • logfile (Union[str, IO]) – Name of file or a stream to log to.

  • stdout (IO) – Standard out stream.

  • stderr (IO) – Standard err stream.

  • blocking_timeout (float) – When debug is enabled this sets the timeout for detecting that the event loop is blocked.

  • workdir (Union[str, Path]) – Custom working directory for the process that the worker will change into when started. This working directory change is permanent for the process, or until something else changes the working directory again.

  • loop (asyncio.AbstractEventLoop) – Custom event loop object.

logger: logging.Logger = <Logger faust.worker (WARNING)>
app: AppT

The Faust app started by this worker.

sensors: Set[SensorT]

Additional sensors to add to the Faust app.

workdir: Path

Current working directory. Note that if passed as an argument to Worker, the worker will change to this directory when started.

spinner: Optional[Spinner]

Class that displays a terminal progress spinner (see https://pypi.org/project/progress/).

async on_start() None[source]

Signal called every time the worker starts.

Return type:

None

async maybe_start_blockdetection() None[source]

Start blocking detector service if enabled.

Return type:

None

async on_startup_finished() None[source]

Signal called when worker has started.

Return type:

None

on_init_dependencies() Iterable[ServiceT][source]

Return service dependencies that must start with the worker.

Return type:

_GenericAlias[ServiceT]

async on_first_start() None[source]

Signal called the first time the worker starts.

First time, means this callback is not called if the worker is restarted by an exception being raised.

Return type:

None

change_workdir(path: Path) None[source]

Change the current working directory (CWD).

Return type:

None

autodiscover() None[source]

Autodiscover modules and files to find @agent decorators, etc.

Return type:

None

async on_execute() None[source]

Signal called when the worker is about to start.

Return type:

None

on_worker_shutdown() None[source]

Signal called before the worker is shutting down.

Return type:

None

stdout: IO
stderr: IO
debug: bool
quiet: bool
blocking_timeout: Seconds
logging_config: Optional[Dict]
loglevel: Optional[Union[str, int]]
logfile: Optional[Union[str, IO]]
console_port: int
loghandlers: List[Handler]
redirect_stdouts: bool
redirect_stdouts_level: int
services: Iterable[ServiceT]
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
on_setup_root_logger(logger: Logger, level: int) None[source]

Signal called when the root logger is being configured.

Return type:

None