faust.agents.replies
¶
Agent replies: waiting for replies, sending them, etc.
- class faust.agents.replies.ReplyPromise(reply_to: str, correlation_id: str = '', **kwargs: Any)[source]¶
Reply promise can be
await
-ed to wait until result ready.
- class faust.agents.replies.BarrierState(reply_to: str, correlation_id: str = '', **kwargs: Any)[source]¶
State of pending/complete barrier.
A barrier is a synchronization primitive that will wait until a group of coroutines have completed.
- size: int = 0¶
This is the size while the messages are being sent. (it’s a tentative total, added to until the total is finalized).
- total: int = 0¶
This is the actual total when all messages have been sent. It’s set by
finalize()
.
- pending: MutableSet[ReplyPromise]¶
Set of pending replies that this barrier is composed of.
- add(p: ReplyPromise) None [source]¶
Add promise to barrier.
Note
You can only add promises before the barrier is finalized using
finalize()
.- Return type:
None
- finalize() None [source]¶
Finalize this barrier.
After finalization you can not grow or shrink the size of the barrier.
- Return type:
None
- fulfill(correlation_id: str, value: Any) None [source]¶
Fulfill one of the promises in this barrier.
Once all promises in this barrier is fulfilled, the barrier will be ready.
- Return type:
None
- get_nowait() ReplyTuple [source]¶
Return next reply, or raise
asyncio.QueueEmpty
.- Return type:
ReplyTuple
- async iterate() AsyncIterator[ReplyTuple] [source]¶
Iterate over results as they arrive.
- Return type:
_GenericAlias
[ReplyTuple
]
- class faust.agents.replies.ReplyConsumer(app: AppT, **kwargs: Any)[source]¶
Consumer responsible for redelegation of replies received.
- async add(correlation_id: str, promise: ReplyPromise) None [source]¶
Register promise to start tracking when it arrives.
- Return type:
None
- logger: logging.Logger = <Logger faust.agents.replies (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶