faust.agents.replies¶
Agent replies: waiting for replies, sending them, etc.
- class faust.agents.replies.ReplyPromise(reply_to: str, correlation_id: str = '', **kwargs: Any)[source]¶
Reply promise can be
await-ed to wait until result ready.
- class faust.agents.replies.BarrierState(reply_to: str, correlation_id: str = '', **kwargs: Any)[source]¶
State of pending/complete barrier.
A barrier is a synchronization primitive that will wait until a group of coroutines have completed.
- size: int = 0¶
This is the size while the messages are being sent. (it’s a tentative total, added to until the total is finalized).
- total: int = 0¶
This is the actual total when all messages have been sent. It’s set by
finalize().
- pending: MutableSet[ReplyPromise]¶
Set of pending replies that this barrier is composed of.
- add(p: ReplyPromise) None[source]¶
Add promise to barrier.
Note
You can only add promises before the barrier is finalized using
finalize().- Return type:
None
- finalize() None[source]¶
Finalize this barrier.
After finalization you can not grow or shrink the size of the barrier.
- Return type:
None
- fulfill(correlation_id: str, value: Any) None[source]¶
Fulfill one of the promises in this barrier.
Once all promises in this barrier is fulfilled, the barrier will be ready.
- Return type:
None
- get_nowait() ReplyTuple[source]¶
Return next reply, or raise
asyncio.QueueEmpty.- Return type:
ReplyTuple
- async iterate() AsyncIterator[ReplyTuple][source]¶
Iterate over results as they arrive.
- Return type:
_GenericAlias[ReplyTuple]
- class faust.agents.replies.ReplyConsumer(app: AppT, **kwargs: Any)[source]¶
Consumer responsible for redelegation of replies received.
- async add(correlation_id: str, promise: ReplyPromise) None[source]¶
Register promise to start tracking when it arrives.
- Return type:
None
- logger: logging.Logger = <Logger faust.agents.replies (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶