faust.cli.faust
¶
Program faust
(umbrella command).
- class faust.cli.faust.agents(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
List agents.
- title = 'Agents'¶
- headers = ['name', 'topic', 'help']¶
- sortkey = operator.attrgetter('name')¶
- options: Optional[OptionList] = [option('--local/--no-local', help='Include agents using a local channel')]¶
- async run(local: bool) None [source]¶
Dump list of available agents in this application.
- Return type:
None
- agents(*, local: bool = False) Sequence[AgentT] [source]¶
Convert list of agents to terminal table rows.
- Return type:
_GenericAlias
[AgentT
]
- agent_to_row(agent: AgentT) Sequence[str] [source]¶
Convert agent fields to terminal table row.
- Return type:
_GenericAlias
[str
]
- key_serializer: CodecArg¶
The codec used to serialize keys. Taken from instance parameters or
key_serializer
.
- value_serialier: CodecArg¶
The codec used to serialize values. Taken from instance parameters or
value_serializer
.
- stdout: IO¶
- stderr: IO¶
- args: Tuple¶
- faust.cli.faust.call_command(command: str, args: Optional[List[str]] = None, stdout: Optional[IO] = None, stderr: Optional[IO] = None, side_effects: bool = False, **kwargs: Any) Tuple[int, IO, IO] [source]¶
- class faust.cli.faust.clean_versions(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
Delete old version directories.
Warning
This command will result in the destruction of the following files:
Table data for previous versions of the app.
- class faust.cli.faust.completion(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
Output shell completion to be evaluated by the shell.
- require_app = False¶
- key_serializer: CodecArg¶
The codec used to serialize keys. Taken from instance parameters or
key_serializer
.
- value_serialier: CodecArg¶
The codec used to serialize values. Taken from instance parameters or
value_serializer
.
- stdout: IO¶
- stderr: IO¶
- args: Tuple¶
- class faust.cli.faust.model(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
Show model detail.
- headers = ['field', 'type', 'default']¶
- options: Optional[OptionList] = [argument('name')]¶
- model_fields(model: Type[ModelT]) Sequence[Sequence[str]] [source]¶
Convert model fields to terminal table rows.
- Return type:
_GenericAlias
[_GenericAlias
[str
]]
- class faust.cli.faust.models(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
List all available models as a tabulated list.
- title = 'Models'¶
- headers = ['name', 'help']¶
- sortkey = operator.attrgetter('_options.namespace')¶
- options: Optional[OptionList] = [option('--builtins/--no-builtins', default=False)]¶
- async run(*, builtins: bool) None [source]¶
Dump list of available models in this application.
- Return type:
None
- class faust.cli.faust.reset(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
Delete local table state.
Warning
This command will result in the destruction of the following files:
The local database directories/files backing tables (does not apply if an in-memory store like memory:// is used).
Notes
This data is technically recoverable from the Kafka cluster (if intact), but it’ll take a long time to get the data back as you need to consume each changelog topic in total.
It’d be faster to copy the data from any standbys that happen to have the topic partitions you require.
- class faust.cli.faust.send(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
Send message to agent/topic.
- key_serializer: Optional[Union[CodecT, str]]¶
The codec used to serialize keys. Taken from instance parameters or
key_serializer
.
- options: Optional[OptionList] = [option('--key-type', '-K', help='Name of model to serialize key into.'), option('--key-serializer', help='Override default serializer for key.'), option('--value-type', '-V', help='Name of model to serialize value into.'), option('--value-serializer', help='Override default serializer for value.'), option('--key', '-k', help='String value for key (use json if model).'), option('--partition', type=<class 'int'>, help='Specific partition to send to.'), option('--repeat', '-r', type=<class 'int'>, default=1, help='Send message n times.'), option('--min-latency', type=<class 'float'>, default=0.0, help='Minimum delay between sending.'), option('--max-latency', type=<class 'float'>, default=0.0, help='Maximum delay between sending.'), argument('entity'), argument('value', default=None, required=False)]¶
- async run(entity: str, value: str, *args: Any, key: Optional[str] = None, key_type: Optional[str] = None, key_serializer: Optional[str] = None, value_type: Optional[str] = None, value_serializer: Optional[str] = None, partition: int = 1, timestamp: Optional[float] = None, repeat: int = 1, min_latency: float = 0.0, max_latency: float = 0.0, **kwargs: Any) Any [source]¶
Send message to topic/agent/channel.
- Return type:
- class faust.cli.faust.tables(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
List available tables.
- title = 'Tables'¶
- class faust.cli.faust.worker(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]¶
Start worker instance for given app.
- worker_options = [option('--with-web/--without-web', default=True, help='Enable/disable web server and related components.'), option('--web-port', '-p', default=None, type=<TCPPort 1<=x<=65535>, help='Port to run web server on (default: 6066)'), option('--web-transport', default=None, type=URL, help='Web server transport (default: tcp:)'), option('--web-bind', '-b', type=<class 'str'>), option('--web-host', '-h', default=None, type=<class 'str'>, help='Canonical host name for the web server (default: 0.0.0.0)')]¶
- options: Optional[OptionList] = [option('--with-web/--without-web', default=True, help='Enable/disable web server and related components.'), option('--web-port', '-p', default=None, type=<TCPPort 1<=x<=65535>, help='Port to run web server on (default: 6066)'), option('--web-transport', default=None, type=URL, help='Web server transport (default: tcp:)'), option('--web-bind', '-b', type=<class 'str'>), option('--web-host', '-h', default=None, type=<class 'str'>, help='Canonical host name for the web server (default: 0.0.0.0)'), option('--logfile', '-f', callback=<function compat_option.<locals>._callback>, expose_value=False, default=None, type=<click.types.Path object>, help='Path to logfile (default is <stderr>).'), option('--loglevel', '-l', callback=<function compat_option.<locals>._callback>, expose_value=False, default='WARN', type=Choice(['crit', 'error', 'warn', 'info', 'debug']), help='Logging level to use.'), option('--blocking-timeout', callback=<function compat_option.<locals>._callback>, expose_value=False, default=None, type=<class 'float'>, help='when --debug: Blocking detector timeout.'), option('--console-port', callback=<function compat_option.<locals>._callback>, expose_value=False, default=50101, type=<TCPPort 1<=x<=65535>, help='when --debug: Port to run debugger console on.')]¶
- as_service(loop: AbstractEventLoop, *args: Any, **kwargs: Any) ServiceT [source]¶
Return the service this command should execute.
For the worker we simply start the application itself.
Note
The application will be started using a
faust.Worker
.- Return type: