faust.types.agents

class faust.types.agents.ActorT(agent: AgentT, stream: StreamT, it: _T, active_partitions: Optional[Set[TP]] = None, **kwargs: Any)[source]
agent: AgentT
stream: StreamT
it: _T
actor_task: Optional[Task]
active_partitions: Optional[Set[TP]]
index: Optional[int] = None

If multiple instance are started for concurrency, this is its index.

abstract cancel() None[source]
Return type:

None

abstract async on_isolated_partition_revoked(tp: TP) None[source]
Return type:

None

abstract async on_isolated_partition_assigned(tp: TP) None[source]
Return type:

None

abstract traceback() str[source]
Return type:

str

class faust.types.agents.AgentManagerT(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]
app: _AppT
abstract async wait_until_agents_started() None[source]
Return type:

None

abstract async on_rebalance(revoked: Set[TP], newly_assigned: Set[TP]) None[source]
Return type:

None

abstract actor_tracebacks() Mapping[str, List[str]][source]
Return type:

_GenericAlias[str, _GenericAlias[str]]

abstract human_tracebacks() str[source]
Return type:

str

Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
data: MutableMapping[KT, VT]
class faust.types.agents.AgentT(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, name: Optional[str] = None, app: Optional[_AppT] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, **kwargs: Any)[source]
name: str
app: _AppT
concurrency: int
help: str
supervisor_strategy: Optional[Type[SupervisorStrategyT]]
isolated_partitions: bool
fun: AgentFun
abstract actor_tracebacks() List[str][source]
Return type:

_GenericAlias[str]

abstract test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, **kwargs: Any) AgentTestWrapperT[source]
Return type:

AgentTestWrapperT

abstract add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None[source]
Return type:

None

abstract stream(**kwargs: Any) StreamT[source]
Return type:

StreamT

abstract async on_partitions_assigned(assigned: Set[TP]) None[source]
Return type:

None

abstract async on_partitions_revoked(revoked: Set[TP]) None[source]
Return type:

None

abstract async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None[source]
Return type:

None

abstract async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any[source]
Return type:

Any

abstract async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator[source]
abstract async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Union[AgentT, ChannelT, str] = None) AsyncIterator[str][source]
abstract async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]
Return type:

_GenericAlias[Any]

abstract async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]
Return type:

_GenericAlias[Any]

abstract info() Mapping[source]
Return type:

_GenericAlias[~KT, +VT_co]

abstract clone(*, cls: Optional[Type[AgentT]] = None, **kwargs: Any) AgentT[source]
Return type:

AgentT

abstract get_topic_names() Iterable[str][source]
Return type:

_GenericAlias[str]

abstract property channel: ChannelT
Return type:

ChannelT

abstract property channel_iterator: AsyncIterator
Return type:

_GenericAlias[+T_co]

Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
class faust.types.agents.AgentTestWrapperT(*args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any)[source]
new_value_processed: Condition
original_channel: ChannelT
results: MutableMapping[int, Any]
sent_offset: int = 0
processed_offset: int = 0
abstract async put(value: Optional[Union[bytes, _ModelT, Any]] = None, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, *, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, wait: bool = True) EventT[source]
Return type:

EventT

abstract to_message(key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], *, partition: int = 0, offset: int = 0, timestamp: Optional[float] = None, timestamp_type: int = 0, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) Message[source]
Return type:

Message

abstract async throw(exc: BaseException) None[source]
Return type:

None

class faust.types.agents.AsyncIterableActorT(agent: AgentT, stream: StreamT, it: _T, active_partitions: Optional[Set[TP]] = None, **kwargs: Any)[source]

Used for agent function that yields.

agent: AgentT
stream: StreamT
it: _T
actor_task: Optional[Task]
active_partitions: Optional[Set[TP]]
Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
class faust.types.agents.AwaitableActorT(agent: AgentT, stream: StreamT, it: _T, active_partitions: Optional[Set[TP]] = None, **kwargs: Any)[source]

Used for agent function that do not yield.

agent: AgentT
stream: StreamT
it: _T
actor_task: Optional[Task]
active_partitions: Optional[Set[TP]]
Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
faust.types.agents.SinkT

Agent, Channel or callable/async callable taking value as argument.

Type:

A sink can be

alias of Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]