faust.events

Events received in streams.

class faust.events.Event(app: AppT, key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], message: Message)[source]

An event received on a channel.

Notes

  • Events have a key and a value:

    event.key, event.value
    
  • They also have a reference to the original message (if available), such as a Kafka record:

    event.message.offset

  • Iterating over channels/topics yields Event:

    async for event in channel:

  • Iterating over a stream (that in turn iterate over channel) yields Event.value:

    async for value in channel.stream()  # value is event.value
        ...
    
  • If you only have a Stream object, you can also access underlying events by using Stream.events.

    For example:

    async for event in channel.stream.events():
        ...
    

    Also commonly used for finding the “current event” related to a value in the stream:

    stream = channel.stream()
    async for event in stream.events():
        event = stream.current_event
        message = event.message
        topic = event.message.topic
    

    You can retrieve the current event in a stream to:

    • Get access to the serialized key+value.

    • Get access to message properties like, what topic+partition the value was received on, or its offset.

    If you want access to both key and value, you should use stream.items() instead.

    async for key, value in stream.items():
        ...
    

    stream.current_event can also be accessed but you must take extreme care you are using the correct stream object. Methods such as .group_by(key) and .through(topic) returns cloned stream objects, so in the example:

    The best way to access the current_event in an agent is to use the ContextVar:

    from faust import current_event
    
    @app.agent(topic)
    async def process(stream):
        async for value in stream:
            event = current_event()
    
app: _AppT
key: Optional[Union[bytes, _ModelT, Any]]
value: Union[bytes, _ModelT, Any]
message: Message
headers: Mapping
acked: bool
async send(channel: ~typing.Union[str, ~faust.types.channels.ChannelT], key: ~typing.Optional[~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any]] = <object object>, value: ~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any] = <object object>, partition: ~typing.Optional[int] = None, timestamp: ~typing.Optional[float] = None, headers: ~typing.Any = <object object>, schema: ~typing.Optional[~faust.types.serializers.SchemaT] = None, key_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, value_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, callback: ~typing.Optional[~typing.Callable[[~faust.types.tuples.FutureMessage], ~typing.Union[None, ~typing.Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send object to channel.

Return type:

_GenericAlias[RecordMetadata]

async forward(channel: ~typing.Union[str, ~faust.types.channels.ChannelT], key: ~typing.Optional[~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any]] = <object object>, value: ~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any] = <object object>, partition: ~typing.Optional[int] = None, timestamp: ~typing.Optional[float] = None, headers: ~typing.Any = <object object>, schema: ~typing.Optional[~faust.types.serializers.SchemaT] = None, key_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, value_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, callback: ~typing.Optional[~typing.Callable[[~faust.types.tuples.FutureMessage], ~typing.Union[None, ~typing.Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Forward original message (will not be reserialized).

Return type:

_GenericAlias[RecordMetadata]

ack() bool[source]

Acknowledge event as being processed by stream.

When the last stream processor acks the message, the offset in the source topic will be marked as safe-to-commit, and the worker will commit and advance the committed offset.

Return type:

bool