faust.events
¶
Events received in streams.
- class faust.events.Event(app: AppT, key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], message: Message)[source]¶
An event received on a channel.
Notes
Events have a key and a value:
event.key, event.value
They also have a reference to the original message (if available), such as a Kafka record:
event.message.offset
Iterating over channels/topics yields Event:
- async for event in channel:
…
Iterating over a stream (that in turn iterate over channel) yields Event.value:
async for value in channel.stream() # value is event.value ...
If you only have a Stream object, you can also access underlying events by using
Stream.events
.For example:
async for event in channel.stream.events(): ...
Also commonly used for finding the “current event” related to a value in the stream:
stream = channel.stream() async for event in stream.events(): event = stream.current_event message = event.message topic = event.message.topic
You can retrieve the current event in a stream to:
Get access to the serialized key+value.
Get access to message properties like, what topic+partition the value was received on, or its offset.
If you want access to both key and value, you should use
stream.items()
instead.async for key, value in stream.items(): ...
stream.current_event
can also be accessed but you must take extreme care you are using the correct stream object. Methods such as.group_by(key)
and.through(topic)
returns cloned stream objects, so in the example:The best way to access the current_event in an agent is to use the
ContextVar
:from faust import current_event @app.agent(topic) async def process(stream): async for value in stream: event = current_event()
- app: _AppT¶
- async send(channel: ~typing.Union[str, ~faust.types.channels.ChannelT], key: ~typing.Optional[~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any]] = <object object>, value: ~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any] = <object object>, partition: ~typing.Optional[int] = None, timestamp: ~typing.Optional[float] = None, headers: ~typing.Any = <object object>, schema: ~typing.Optional[~faust.types.serializers.SchemaT] = None, key_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, value_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, callback: ~typing.Optional[~typing.Callable[[~faust.types.tuples.FutureMessage], ~typing.Union[None, ~typing.Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata] [source]¶
Send object to channel.
- Return type:
_GenericAlias
[RecordMetadata
]
- async forward(channel: ~typing.Union[str, ~faust.types.channels.ChannelT], key: ~typing.Optional[~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any]] = <object object>, value: ~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any] = <object object>, partition: ~typing.Optional[int] = None, timestamp: ~typing.Optional[float] = None, headers: ~typing.Any = <object object>, schema: ~typing.Optional[~faust.types.serializers.SchemaT] = None, key_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, value_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, callback: ~typing.Optional[~typing.Callable[[~faust.types.tuples.FutureMessage], ~typing.Union[None, ~typing.Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata] [source]¶
Forward original message (will not be reserialized).
- Return type:
_GenericAlias
[RecordMetadata
]