Agent replies: waiting for replies, sending them, etc.

class faust.agents.replies.ReplyPromise(reply_to: str, correlation_id: str = '', **kwargs: Any)[source]

Reply promise can be await-ed to wait until result ready.

reply_to: str
correlation_id: str
fulfill(correlation_id: str, value: Any) None[source]

Fulfill promise: a reply was received.

Return type:


class faust.agents.replies.BarrierState(reply_to: str, correlation_id: str = '', **kwargs: Any)[source]

State of pending/complete barrier.

A barrier is a synchronization primitive that will wait until a group of coroutines have completed.

size: int = 0

This is the size while the messages are being sent. (it’s a tentative total, added to until the total is finalized).

total: int = 0

This is the actual total when all messages have been sent. It’s set by finalize().

fulfilled: int = 0

The number of results we have received.

pending: MutableSet[ReplyPromise]

Set of pending replies that this barrier is composed of.

add(p: ReplyPromise) None[source]

Add promise to barrier.


You can only add promises before the barrier is finalized using finalize().

Return type:


finalize() None[source]

Finalize this barrier.

After finalization you can not grow or shrink the size of the barrier.

Return type:


fulfill(correlation_id: str, value: Any) None[source]

Fulfill one of the promises in this barrier.

Once all promises in this barrier is fulfilled, the barrier will be ready.

Return type:


get_nowait() ReplyTuple[source]

Return next reply, or raise asyncio.QueueEmpty.

Return type:


async iterate() AsyncIterator[ReplyTuple][source]

Iterate over results as they arrive.

Return type:


class faust.agents.replies.ReplyConsumer(app: AppT, **kwargs: Any)[source]

Consumer responsible for redelegation of replies received.

async on_start() None[source]

Call when reply consumer starts.

Return type:


async add(correlation_id: str, promise: ReplyPromise) None[source]

Register promise to start tracking when it arrives.

Return type:


logger: logging.Logger = <Logger faust.agents.replies (WARNING)>
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack