faust.types.app

class faust.types.app.AppT(id: str, *, monitor: _Monitor, config_source: Optional[Any] = None, **options: Any)[source]

Abstract type for the Faust application.

See also

faust.App.

Settings: ClassVar[Type[_Settings]]
BootStrategy: ClassVar[Type[BootStrategyT]]
boot_strategy: BootStrategyT
finalized: bool = False

Set to true when the app is finalized (can read configuration).

configured: bool = False

Set to true when the app has read configuration.

rebalancing: bool = False

Set to true if the worker is currently rebalancing.

rebalancing_count: int = 0
in_recovery: bool = False

Set to true when the worker is in recovery

consumer_generation_id: int = 0
unassigned: bool = False
in_worker: bool = False
on_configured: SyncSignal[_Settings] = <SyncSignal: AppT.on_configured>
on_before_configured: SyncSignal = <SyncSignal: AppT.on_before_configured>
on_after_configured: SyncSignal = <SyncSignal: AppT.on_after_configured>
on_partitions_assigned: Signal[Set[TP]] = <Signal: AppT.on_partitions_assigned>
on_partitions_revoked: Signal[Set[TP]] = <Signal: AppT.on_partitions_revoked>
on_rebalance_complete: Signal = <Signal: AppT.on_rebalance_complete>
on_before_shutdown: Signal = <Signal: AppT.on_before_shutdown>
on_worker_init: SyncSignal = <SyncSignal: AppT.on_worker_init>
on_produce_message: SyncSignal = <SyncSignal: AppT.on_produce_message>
client_only: bool
producer_only: bool
agents: AgentManagerT
sensors: SensorDelegateT
fixups: MutableSequence[FixupT]
tracer: Optional[TracerT] = None
abstract config_from_object(obj: Any, *, silent: bool = False, force: bool = False) None[source]
Return type:

None

abstract finalize() None[source]
Return type:

None

abstract main() NoReturn[source]
Return type:

_SpecialForm

abstract worker_init() None[source]
Return type:

None

abstract worker_init_post_autodiscover() None[source]
Return type:

None

abstract discover(*extra_modules: str, categories: Iterable[str] = ('a', 'b', 'c'), ignore: Iterable[Any] = ('foo', 'bar')) None[source]
Return type:

None

abstract topic(*topics: str, pattern: Optional[Union[str, Pattern]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, replicas: Optional[int] = None, acks: bool = True, internal: bool = False, config: Optional[Mapping[str, Any]] = None, maxsize: Optional[int] = None, allow_empty: bool = False, has_prefix: bool = False, loop: Optional[AbstractEventLoop] = None) TopicT[source]
Return type:

TopicT

abstract channel(*, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, maxsize: Optional[int] = None, loop: Optional[AbstractEventLoop] = None) ChannelT[source]
Return type:

ChannelT

abstract agent(channel: Optional[Union[str, ChannelT[_T]]] = None, *, name: Optional[str] = None, concurrency: int = 1, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = True, **kwargs: Any) Callable[[Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]]], AgentT[_T]][source]
Return type:

_GenericAlias[_GenericAlias[_GenericAlias[~_T], _GenericAlias[_GenericAlias[Any, Any, None], _GenericAlias[None], _GenericAlias[+T_co]]], _GenericAlias[~_T]]

abstract task(fun: Union[Callable[[AppT], Awaitable], Callable[[], Awaitable]], *, on_leader: bool = False, traced: bool = True) Callable[source]
abstract timer(interval: Union[timedelta, float, str], on_leader: bool = False, traced: bool = True, name: Optional[str] = None, max_drift_correction: float = 0.1) Callable[source]
Return type:

_VariadicGenericAlias

abstract crontab(cron_format: str, *, timezone: Optional[tzinfo] = None, on_leader: bool = False, traced: bool = True) Callable[source]
Return type:

_VariadicGenericAlias

abstract service(cls: Type[ServiceT]) Type[ServiceT][source]
Return type:

_GenericAlias[ServiceT]

abstract stream(channel: AsyncIterable, beacon: Optional[NodeT] = None, **kwargs: Any) StreamT[source]
Return type:

StreamT

abstract Table(name: str, *, default: Optional[Callable[[], Any]] = None, window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, **kwargs: Any) TableT[source]
Return type:

TableT

abstract GlobalTable(name: str, *, default: Optional[Callable[[], Any]] = None, window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, **kwargs: Any) TableT[source]
Return type:

TableT

abstract SetTable(name: str, *, window: Optional[WindowT] = None, partitions: Optional[int] = None, start_manager: bool = False, help: Optional[str] = None, **kwargs: Any) TableT[source]
Return type:

TableT

abstract SetGlobalTable(name: str, *, window: Optional[WindowT] = None, partitions: Optional[int] = None, start_manager: bool = False, help: Optional[str] = None, **kwargs: Any) TableT[source]
Return type:

TableT

abstract page(path: str, *, base: ~typing.Type[~faust.types.web.View] = <class 'faust.types.web.View'>, cors_options: ~typing.Optional[~typing.Mapping[str, ~faust.types.web.ResourceOptions]] = None, name: ~typing.Optional[str] = None) Callable[[Union[Type[View], Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Type[View]][source]
Return type:

_GenericAlias[_GenericAlias[_GenericAlias[View], _GenericAlias[View, Request, _GenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _GenericAlias[View, Request, Any, Any, _GenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]], _GenericAlias[View]]

abstract table_route(table: CollectionT, shard_param: Optional[str] = None, *, query_param: Optional[str] = None, match_info: Optional[str] = None, exact_key: Optional[str] = None) Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]][source]
Return type:

_GenericAlias[_GenericAlias[_GenericAlias[View, Request, _GenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _GenericAlias[View, Request, Any, Any, _GenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]], _GenericAlias[_GenericAlias[View, Request, _GenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _GenericAlias[View, Request, Any, Any, _GenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]]]

abstract command(*options: Any, base: Optional[Type[_AppCommand]] = None, **kwargs: Any) Callable[[Callable], Type[_AppCommand]][source]
Return type:

_GenericAlias[_VariadicGenericAlias, _GenericAlias[_AppCommand]]

abstract create_event(key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], message: Message) _EventT[source]
Return type:

_EventT

abstract async start_client() None[source]
Return type:

None

abstract async maybe_start_client() None[source]
Return type:

None

abstract trace(name: str, trace_enabled: bool = True, **extra_context: Any) AbstractContextManager[source]
Return type:

_GenericAlias[+T_co]

abstract async send(channel: Union[ChannelT, str], key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[_SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract LiveCheck(**kwargs: Any) _LiveCheck[source]
Return type:

_LiveCheck

maybe_start_producer() ProducerT[source]
Return type:

ProducerT

abstract is_leader() bool[source]
Return type:

bool

abstract FlowControlQueue(maxsize: Optional[int] = None, *, clear_on_resume: bool = False) ThrowableQueue[source]
Return type:

ThrowableQueue

abstract Worker(**kwargs: Any) _Worker[source]
Return type:

_Worker

abstract on_webserver_init(web: Web) None[source]
Return type:

None

abstract on_rebalance_start() None[source]
Return type:

None

abstract on_rebalance_return() None[source]
Return type:

None

abstract on_rebalance_end() None[source]
Return type:

None

property conf: _Settings
Return type:

_Settings

abstract property transport: TransportT
Return type:

TransportT

abstract property producer_transport: TransportT
Return type:

TransportT

abstract property cache: CacheBackendT
Return type:

CacheBackendT

abstract property producer: ProducerT
Return type:

ProducerT

abstract property consumer: ConsumerT
Return type:

ConsumerT

tables[source]
topics[source]
abstract property monitor: _Monitor
Return type:

_Monitor

flow_control[source]
abstract property http_client: ClientSession
Return type:

ClientSession

assignor[source]
router[source]
serializers[source]
web[source]
in_transaction[source]