faust.types.agents
¶
- class faust.types.agents.ActorT(agent: AgentT, stream: StreamT, it: _T, active_partitions: Optional[Set[TP]] = None, **kwargs: Any)[source]¶
-
- it: _T¶
- class faust.types.agents.AgentManagerT(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]¶
- app: _AppT¶
- abstract async on_rebalance(revoked: Set[TP], newly_assigned: Set[TP]) None [source]¶
- Return type:
None
- Diag: Type[DiagT]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- data: MutableMapping[KT, VT]¶
- class faust.types.agents.AgentT(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, name: Optional[str] = None, app: Optional[_AppT] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, **kwargs: Any)[source]¶
-
- app: _AppT¶
- supervisor_strategy: Optional[Type[SupervisorStrategyT]]¶
- fun: AgentFun¶
- abstract test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, **kwargs: Any) AgentTestWrapperT [source]¶
- Return type:
- abstract add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None [source]¶
- Return type:
None
- abstract async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None [source]¶
- Return type:
None
- abstract async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any [source]¶
- Return type:
- abstract async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Awaitable[RecordMetadata] [source]¶
- Return type:
_GenericAlias
[RecordMetadata
]
- abstract async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator [source]¶
- abstract async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator[str] [source]¶
- abstract async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any] [source]¶
- Return type:
_GenericAlias
[Any
]
- abstract async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any] [source]¶
- Return type:
_GenericAlias
[Any
]
- abstract property channel_iterator: AsyncIterator¶
- Return type:
_GenericAlias
[+T_co]
- Diag: Type[DiagT]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class faust.types.agents.AgentTestWrapperT(*args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any)[source]¶
- new_value_processed: Condition¶
- results: MutableMapping[int, Any]¶
- abstract async put(value: Optional[Union[bytes, _ModelT, Any]] = None, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, *, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, wait: bool = True) EventT [source]¶
- Return type:
- abstract to_message(key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], *, partition: int = 0, offset: int = 0, timestamp: Optional[float] = None, timestamp_type: int = 0, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) Message [source]¶
- Return type:
- abstract async throw(exc: BaseException) None [source]¶
- Return type:
None
- class faust.types.agents.AsyncIterableActorT(agent: AgentT, stream: StreamT, it: _T, active_partitions: Optional[Set[TP]] = None, **kwargs: Any)[source]¶
Used for agent function that yields.
- it: _T¶
- Diag: Type[DiagT]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶