faust.agents

Agents.

class faust.agents.Agent(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, app: AppT, name: Optional[str] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, use_reply_headers: Optional[bool] = None, **kwargs: Any)[source]

Agent.

This is the type of object returned by the @app.agent decorator.

supervisor: SupervisorStrategyT = None
app: _AppT
fun: AgentFun
name: str
concurrency: int
isolated_partitions: bool
help: str
supervisor_strategy: Optional[Type[SupervisorStrategyT]]
on_init_dependencies() Iterable[ServiceT][source]

Return list of services dependencies required to start agent.

Return type:

_GenericAlias[ServiceT]

actor_tracebacks() List[str][source]
Return type:

_GenericAlias[str]

async on_start() None[source]

Call when an agent starts.

Return type:

None

async on_stop() None[source]

Call when an agent stops.

Return type:

None

cancel() None[source]

Cancel agent and its actor instances running in this process.

Return type:

None

async on_partitions_revoked(revoked: Set[TP]) None[source]

Call when partitions are revoked.

Return type:

None

async on_partitions_assigned(assigned: Set[TP]) None[source]

Call when partitions are assigned.

Return type:

None

async on_isolated_partitions_revoked(revoked: Set[TP]) None[source]

Call when isolated partitions are revoked.

Return type:

None

async on_isolated_partitions_assigned(assigned: Set[TP]) None[source]

Call when isolated partitions are assigned.

Return type:

None

async on_shared_partitions_revoked(revoked: Set[TP]) None[source]

Call when non-isolated partitions are revoked.

Return type:

None

async on_shared_partitions_assigned(assigned: Set[TP]) None[source]

Call when non-isolated partitions are assigned.

Return type:

None

info() Mapping[source]

Return agent attributes as a dictionary.

Return type:

_SpecialGenericAlias

clone(*, cls: Optional[Type[AgentT]] = None, **kwargs: Any) AgentT[source]

Create clone of this agent object.

Keyword arguments can be passed to override any argument supported by Agent.__init__.

Return type:

AgentT

test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, **kwargs: Any) AgentTestWrapperT[source]

Create new unit-testing wrapper for this agent.

Return type:

AgentTestWrapperT

actor_from_stream(stream: Optional[StreamT], *, index: Optional[int] = None, active_partitions: Optional[Set[TP]] = None, channel: Optional[ChannelT] = None) ActorT[Union[AsyncIterable, Awaitable]][source]

Create new actor from stream.

Return type:

ActorT

add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None[source]

Add new sink to further handle results from this agent.

Return type:

None

stream(channel: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, **kwargs: Any) StreamT[source]

Create underlying stream used by this agent.

Return type:

StreamT

async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None[source]

RPC operation: like ask() but do not expect reply.

Cast here is like “casting a spell”, and will not expect a reply back from the agent.

Return type:

None

async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any[source]

RPC operation: ask agent for result of processing value.

This version will wait until the result is available and return the processed value.

Return type:

Any

async ask_nowait(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, force: bool = False) ReplyPromise[source]

RPC operation: ask agent for result of processing value.

This version does not wait for the result to arrive, but instead returns a promise of future evaluation.

Return type:

ReplyPromise

async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to topic used by agent.

Return type:

_GenericAlias[RecordMetadata]

async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) AsyncIterator[source]

RPC map operation on a list of values.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type:

_SpecialGenericAlias

async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) AsyncIterator[str][source]

RPC map operation on a list of (key, value) pairs.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type:

_GenericAlias[str]

async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]

RPC map operation on a list of values.

A join returns the results in order, and only returns once all values have been processed.

Return type:

_GenericAlias[Any]

async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]

RPC map operation on list of (key, value) pairs.

A join returns the results in order, and only returns once all values have been processed.

Return type:

_GenericAlias[Any]

get_topic_names() Iterable[str][source]

Return list of topic names this agent subscribes to.

Return type:

_GenericAlias[str]

property channel: ChannelT

Return channel used by agent. :rtype: ChannelT

property channel_iterator: AsyncIterator

Return channel agent iterates over. :rtype: _SpecialGenericAlias

property label: str

Return human-readable description of agent. :rtype: str

property shortlabel: str

Return short description of agent. :rtype: str

logger: logging.Logger = <Logger faust.agents.agent (WARNING)>
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class faust.agents.AgentT(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, name: Optional[str] = None, app: Optional[_AppT] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, **kwargs: Any)[source]
name: str
app: _AppT
concurrency: int
help: str
supervisor_strategy: Optional[Type[SupervisorStrategyT]]
isolated_partitions: bool
abstract actor_tracebacks() List[str][source]
Return type:

_GenericAlias[str]

abstract test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, **kwargs: Any) AgentTestWrapperT[source]
Return type:

AgentTestWrapperT

abstract add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None[source]
Return type:

None

abstract stream(**kwargs: Any) StreamT[source]
Return type:

StreamT

abstract async on_partitions_assigned(assigned: Set[TP]) None[source]
Return type:

None

abstract async on_partitions_revoked(revoked: Set[TP]) None[source]
Return type:

None

abstract async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None[source]
Return type:

None

abstract async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any[source]
Return type:

Any

abstract async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator[source]
abstract async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator[str][source]
abstract async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]
Return type:

_GenericAlias[Any]

abstract async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]
Return type:

_GenericAlias[Any]

abstract info() Mapping[source]
Return type:

_SpecialGenericAlias

abstract clone(*, cls: Optional[Type[AgentT]] = None, **kwargs: Any) AgentT[source]
Return type:

AgentT

abstract get_topic_names() Iterable[str][source]
Return type:

_GenericAlias[str]

abstract property channel: ChannelT
Return type:

ChannelT

abstract property channel_iterator: AsyncIterator
Return type:

_SpecialGenericAlias

class faust.agents.AgentManager(app: AppT, **kwargs: Any)[source]

Agent manager.

traceback_header: str = '\n=======================================\n TRACEBACK OF ALL RUNNING AGENT ACTORS\n=======================================\n'
traceback_format: str = '\n* {name} ----->\n============================================================\n{traceback}\n\n'
async on_start() None[source]

Call when agents are being started.

Return type:

None

actor_tracebacks() Mapping[str, List[str]][source]
Return type:

_GenericAlias[str, _GenericAlias[str]]

human_tracebacks() str[source]
Return type:

str

async wait_until_agents_started() None[source]
Return type:

None

service_reset() None[source]

Reset service state on restart.

Return type:

None

async on_stop() None[source]

Call when agents are being stopped.

Return type:

None

async stop() None[source]

Stop all running agents.

Return type:

None

cancel() None[source]

Cancel all running agents.

Return type:

None

update_topic_index() None[source]

Update indices.

Return type:

None

async on_rebalance(revoked: Set[TP], newly_assigned: Set[TP]) None[source]

Call when a rebalance is needed.

Return type:

None

logger: logging.Logger = <Logger faust.agents.manager (WARNING)>
class faust.agents.AgentManagerT(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]
app: _AppT
abstract async wait_until_agents_started() None[source]
Return type:

None

abstract async on_rebalance(revoked: Set[TP], newly_assigned: Set[TP]) None[source]
Return type:

None

abstract actor_tracebacks() Mapping[str, List[str]][source]
Return type:

_GenericAlias[str, _GenericAlias[str]]

abstract human_tracebacks() str[source]
Return type:

str

class faust.agents.ReplyConsumer(app: AppT, **kwargs: Any)[source]

Consumer responsible for redelegation of replies received.

async on_start() None[source]

Call when reply consumer starts.

Return type:

None

async add(correlation_id: str, promise: ReplyPromise) None[source]

Register promise to start tracking when it arrives.

Return type:

None

logger: logging.Logger = <Logger faust.agents.replies (WARNING)>
faust.agents.current_agent() Optional[AgentT][source]
Return type:

_UnionGenericAlias[AgentT, None]