faust.topics
¶
Topic - Named channel using Kafka.
- class faust.topics.Topic(app: AppT, *, topics: Optional[Sequence[str]] = None, pattern: Optional[Union[str, Pattern]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, is_iterator: bool = False, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, replicas: Optional[int] = None, acks: bool = True, internal: bool = False, config: Optional[Mapping[str, Any]] = None, queue: Optional[ThrowableQueue] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, allow_empty: Optional[bool] = None, has_prefix: bool = False, loop: Optional[AbstractEventLoop] = None)[source]¶
Define new topic description.
- Parameters:
app (
AppT
) – App instance used to create this topic description.topics (
_GenericAlias
[_GenericAlias
[str
],None
]) – List of topic names.partitions (
_GenericAlias
[int
,None
]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, andautoCreateTopics
is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.retention (
_GenericAlias
[timedelta
,float
,str
,None
]) – Number of seconds (as float/timedelta
) to keep messages in the topic before they can be expired by the server.pattern (
_GenericAlias
[str
,_GenericAlias
[AnyStr
],None
]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.schema (
_GenericAlias
[SchemaT
,None
]) – Schema used for serialization/deserialization.key_type (
_GenericAlias
[_GenericAlias
[ModelT
],_GenericAlias
[bytes
],_GenericAlias
[str
],None
]) – How to deserialize keys for messages in this topic. Can be afaust.Model
type,str
,bytes
, orNone
for “autodetect” (Overrides schema if one is defined).value_type (
_GenericAlias
[_GenericAlias
[ModelT
],_GenericAlias
[bytes
],_GenericAlias
[str
],None
]) – How to deserialize values for messages in this topic. Can be afaust.Model
type,str
,bytes
, orNone
for “autodetect” (Overrides schema if ones is defined).active_partitions (
_GenericAlias
[_GenericAlias
[TP
],None
]) – Set offaust.types.tuples.TP
that this topic should be restricted to.
- Raises:
TypeError – if both topics and pattern is provided.
- async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata] [source]¶
Send message to topic.
- Return type:
_GenericAlias
[RecordMetadata
]
- send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage [source]¶
Produce message by adding to buffer.
Notes
This method can be used by non-async def functions to produce messages.
- Return type:
- async put(event: EventT) None [source]¶
Put event directly onto the underlying queue of this topic.
This will only affect subscribers to a particular instance, in a particular process.
- Return type:
None
- property pattern: Optional[Pattern]¶
Regular expression used by this topic (if any). :rtype:
_GenericAlias
[_GenericAlias
[AnyStr
],None
]
- property partitions: Optional[int]¶
Return the number of configured partitions for this topic.
Notes
This is only active for internal topics, fully owned and managed by Faust itself.
We never touch the configuration of a topic that exists in Kafka, and Kafka will sometimes automatically create topics when they don’t exist. In this case the number of partitions for the automatically created topic will depend on the Kafka server configuration (
num.partitions
).Always make sure your topics have the correct number of partitions. :rtype:
_GenericAlias
[int
,None
]
- derive(**kwargs: Any) ChannelT [source]¶
Create topic derived from the configuration of this topic.
Configuration will be copied from this topic, but any parameter overridden as a keyword argument.
See also
derive_topic()
: for a list of supported keyword arguments.- Return type:
- derive_topic(*, topics: Optional[Sequence[str]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, internal: Optional[bool] = None, config: Optional[Mapping[str, Any]] = None, prefix: str = '', suffix: str = '', **kwargs: Any) TopicT [source]¶
Create new topic with configuration derived from this topic.
- Return type:
- get_topic_name() str [source]¶
Return the main topic name of this topic description.
As topic descriptions can have multiple topic names, this will only return when the topic has a singular topic name in the description.
- Raises:
TypeError – if configured with a regular expression pattern.
ValueError – if configured with multiple topic names.
TypeError – if not configured with any names or patterns.
- Return type:
- async publish_message(fut: FutureMessage, wait: bool = False) Awaitable[RecordMetadata] [source]¶
Fulfill promise to publish message to topic.
- Return type:
_GenericAlias
[RecordMetadata
]