faust.agents
¶
Agents.
- class faust.agents.Agent(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, app: AppT, name: Optional[str] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, use_reply_headers: Optional[bool] = None, **kwargs: Any)[source]¶
Agent.
This is the type of object returned by the
@app.agent
decorator.- supervisor: SupervisorStrategyT = None¶
- app: _AppT¶
- fun: AgentFun¶
- supervisor_strategy: Optional[Type[SupervisorStrategyT]]¶
- on_init_dependencies() Iterable[ServiceT] [source]¶
Return list of services dependencies required to start agent.
- Return type:
_GenericAlias
[ServiceT
]
- cancel() None [source]¶
Cancel agent and its actor instances running in this process.
- Return type:
None
- async on_partitions_revoked(revoked: Set[TP]) None [source]¶
Call when partitions are revoked.
- Return type:
None
- async on_partitions_assigned(assigned: Set[TP]) None [source]¶
Call when partitions are assigned.
- Return type:
None
- async on_isolated_partitions_revoked(revoked: Set[TP]) None [source]¶
Call when isolated partitions are revoked.
- Return type:
None
- async on_isolated_partitions_assigned(assigned: Set[TP]) None [source]¶
Call when isolated partitions are assigned.
- Return type:
None
Call when non-isolated partitions are revoked.
- Return type:
None
Call when non-isolated partitions are assigned.
- Return type:
None
- clone(*, cls: Optional[Type[AgentT]] = None, **kwargs: Any) AgentT [source]¶
Create clone of this agent object.
Keyword arguments can be passed to override any argument supported by
Agent.__init__
.- Return type:
- test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, **kwargs: Any) AgentTestWrapperT [source]¶
Create new unit-testing wrapper for this agent.
- Return type:
- actor_from_stream(stream: Optional[StreamT], *, index: Optional[int] = None, active_partitions: Optional[Set[TP]] = None, channel: Optional[ChannelT] = None) ActorT[Union[AsyncIterable, Awaitable]] [source]¶
Create new actor from stream.
- Return type:
- add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None [source]¶
Add new sink to further handle results from this agent.
- Return type:
None
- stream(channel: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, **kwargs: Any) StreamT [source]¶
Create underlying stream used by this agent.
- Return type:
- async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None [source]¶
RPC operation: like
ask()
but do not expect reply.Cast here is like “casting a spell”, and will not expect a reply back from the agent.
- Return type:
None
- async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any [source]¶
RPC operation: ask agent for result of processing value.
This version will wait until the result is available and return the processed value.
- Return type:
- async ask_nowait(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, force: bool = False) ReplyPromise [source]¶
RPC operation: ask agent for result of processing value.
This version does not wait for the result to arrive, but instead returns a promise of future evaluation.
- Return type:
- async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, force: bool = False) Awaitable[RecordMetadata] [source]¶
Send message to topic used by agent.
- Return type:
_GenericAlias
[RecordMetadata
]
- async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) AsyncIterator [source]¶
RPC map operation on a list of values.
A map operation iterates over results as they arrive. See
join()
andkvjoin()
if you want them in order.- Return type:
_SpecialGenericAlias
- async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) AsyncIterator[str] [source]¶
RPC map operation on a list of
(key, value)
pairs.A map operation iterates over results as they arrive. See
join()
andkvjoin()
if you want them in order.- Return type:
_GenericAlias
[str
]
- async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any] [source]¶
RPC map operation on a list of values.
A join returns the results in order, and only returns once all values have been processed.
- Return type:
_GenericAlias
[Any
]
- async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any] [source]¶
RPC map operation on list of
(key, value)
pairs.A join returns the results in order, and only returns once all values have been processed.
- Return type:
_GenericAlias
[Any
]
- get_topic_names() Iterable[str] [source]¶
Return list of topic names this agent subscribes to.
- Return type:
_GenericAlias
[str
]
- property channel_iterator: AsyncIterator¶
Return channel agent iterates over. :rtype:
_SpecialGenericAlias
- logger: logging.Logger = <Logger faust.agents.agent (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class faust.agents.AgentT(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, name: Optional[str] = None, app: Optional[_AppT] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, **kwargs: Any)[source]¶
-
- app: _AppT¶
- supervisor_strategy: Optional[Type[SupervisorStrategyT]]¶
- abstract test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, **kwargs: Any) AgentTestWrapperT [source]¶
- Return type:
- abstract add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None [source]¶
- Return type:
None
- abstract async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None [source]¶
- Return type:
None
- abstract async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any [source]¶
- Return type:
- abstract async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Awaitable[RecordMetadata] [source]¶
- Return type:
_GenericAlias
[RecordMetadata
]
- abstract async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator [source]¶
- abstract async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator[str] [source]¶
- abstract async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any] [source]¶
- Return type:
_GenericAlias
[Any
]
- abstract async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any] [source]¶
- Return type:
_GenericAlias
[Any
]
- abstract property channel_iterator: AsyncIterator¶
- Return type:
_SpecialGenericAlias
- class faust.agents.AgentManager(app: AppT, **kwargs: Any)[source]¶
Agent manager.
- traceback_header: str = '\n=======================================\n TRACEBACK OF ALL RUNNING AGENT ACTORS\n=======================================\n'¶
- traceback_format: str = '\n* {name} ----->\n============================================================\n{traceback}\n\n'¶
- async on_rebalance(revoked: Set[TP], newly_assigned: Set[TP]) None [source]¶
Call when a rebalance is needed.
- Return type:
None
- logger: logging.Logger = <Logger faust.agents.manager (WARNING)>¶
- class faust.agents.AgentManagerT(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]¶
- app: _AppT¶
- abstract async on_rebalance(revoked: Set[TP], newly_assigned: Set[TP]) None [source]¶
- Return type:
None
- class faust.agents.ReplyConsumer(app: AppT, **kwargs: Any)[source]¶
Consumer responsible for redelegation of replies received.
- async add(correlation_id: str, promise: ReplyPromise) None [source]¶
Register promise to start tracking when it arrives.
- Return type:
None
- logger: logging.Logger = <Logger faust.agents.replies (WARNING)>¶