faust.streams

Streams.

class faust.streams.Stream(channel: AsyncIterator[T_co], *, app: AppT, processors: Optional[Iterable[Callable[[T], Union[T, Awaitable[T]]]]] = None, combined: Optional[List[JoinableT]] = None, on_start: Optional[Callable] = None, join_strategy: Optional[JoinT] = None, beacon: Optional[NodeT] = None, concurrency_index: Optional[int] = None, prev: Optional[StreamT] = None, active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = '', loop: Optional[AbstractEventLoop] = None)[source]

A stream: async iterator processing events in channels/topics.

logger: logging.Logger = <Logger faust.streams (WARNING)>
mundane_level = 'debug'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

events_total: int = 0

Number of events processed by this instance so far.

app: _AppT
channel: AsyncIterator[T_co]
combined: List[JoinableT]
get_active_stream() StreamT[source]

Return the currently active stream.

A stream can be derived using Stream.group_by etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:

>>> @app.agent()
... async def agent(a):
..      a = a
...     b = a.group_by(Withdrawal.account_id)
...     c = b.through('backup_topic')
...     async for value in c:
...         ...

The return value of a.get_active_stream() would be c.

Notes

The chain of streams that leads to the active stream is decided by the _next attribute. To get to the active stream we just traverse this linked-list:

>>> def get_active_stream(self):
...     node = self
...     while node._next:
...         node = node._next
Return type:

StreamT

get_root_stream() StreamT[source]

Get the root stream that this stream was derived from.

Return type:

StreamT

add_processor(processor: Callable[[T], Union[T, Awaitable[T]]]) None[source]

Add processor callback executed whenever a new event is received.

Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.

For example a processor handling a stream of numbers may modify the value:

def double(value: int) -> int:
    return value * 2

stream.add_processor(double)
Return type:

None

info() Mapping[str, Any][source]

Return stream settings as a dictionary.

Return type:

_GenericAlias[str, Any]

clone(**kwargs: Any) StreamT[source]

Create a clone of this stream.

Notes

If the cloned stream is supposed to supersede this stream, like in group_by/through/etc., you should use _chain() instead so stream._next = cloned_stream is set and get_active_stream() returns the cloned stream.

Return type:

StreamT

noack() StreamT[source]

Create new stream where acks are manual.

Return type:

StreamT

async items() AsyncIterator[Tuple[Optional[Union[bytes, _ModelT, Any]], T_co]][source]

Iterate over the stream as key, value pairs.

Examples

@app.agent(topic)
async def mytask(stream):
    async for key, value in stream.items():
        print(key, value)
Return type:

_GenericAlias[_GenericAlias[_GenericAlias[bytes, _ModelT, Any, None], +T_co]]

async events() AsyncIterable[EventT][source]

Iterate over the stream as events exclusively.

This means the stream must be iterating over a channel, or at least an iterable of event objects.

Return type:

_GenericAlias[EventT]

async take(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters:
  • max (int) – Max number of messages to receive. When more than this number of messages are received within the specified number of seconds then we flush the buffer immediately.

  • within (_GenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type:

_GenericAlias[_GenericAlias[+T_co]]

async take_events(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[EventT]][source]

Buffer n events at a time and yield a list of buffered events. :type max_: int :param max_: Max number of messages to receive. When more than this

number of messages are received within the specified number of seconds then we flush the buffer immediately.

Parameters:

within (_GenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type:

_GenericAlias[_GenericAlias[EventT]]

async take_with_timestamp(max_: int, within: Union[timedelta, float, str], timestamp_field_name: str) AsyncIterable[Sequence[T_co]][source]
Buffer n values at a time and yield a list of buffered values with the

timestamp when the message was added to kafka.

Parameters:
  • max (int) – Max number of messages to receive. When more than this number of messages are received within the specified number of seconds then we flush the buffer immediately.

  • within (_GenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

  • timestamp_field_name (str) – the name of the field containing kafka timestamp, that is going to be added to the value

Return type:

_GenericAlias[_GenericAlias[+T_co]]

enumerate(start: int = 0) AsyncIterable[Tuple[int, T_co]][source]

Enumerate values received on this stream.

Unlike Python’s built-in enumerate, this works with async generators.

Return type:

_GenericAlias[_GenericAlias[int, +T_co]]

async noack_take(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters:
  • max (int) – Max number of messages to receive. When more than this number of messages are received within the specified number of seconds then we flush the buffer immediately.

  • within (_GenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type:

_GenericAlias[_GenericAlias[+T_co]]

through(channel: Union[str, ChannelT]) StreamT[source]

Forward values to in this stream to channel.

Send messages received on this stream to another channel, and return a new stream that consumes from that channel.

Notes

The messages are forwarded after any processors have been applied.

Example

topic = app.topic('foo')

@app.agent(topic)
async def mytask(stream):
    async for value in stream.through(app.topic('bar')):
        # value was first received in topic 'foo',
        # then forwarded and consumed from topic 'bar'
        print(value)
Return type:

StreamT

echo(*channels: Union[str, ChannelT]) StreamT[source]

Forward values to one or more channels.

Unlike through(), we don’t consume from these channels.

Return type:

StreamT

group_by(key: Union[FieldDescriptorT, Callable[[T], Optional[Union[bytes, _ModelT, Any]]]], *, name: Optional[str] = None, topic: Optional[TopicT] = None, partitions: Optional[int] = None) StreamT[source]

Create new stream that repartitions the stream using a new key.

Parameters:
  • key (_GenericAlias[FieldDescriptorT, _GenericAlias[~T, _GenericAlias[bytes, _ModelT, Any, None]]]) –

    The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.

    Note: The name argument must be provided if the key

    argument is a callable.

  • name (_GenericAlias[str, None]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.

Examples

Using a field descriptor to use a field in the event as the new key:

s = withdrawals_topic.stream()
# values in this stream are of type Withdrawal
async for event in s.group_by(Withdrawal.account_id):
    ...

Using an async callable to extract a new key:

s = withdrawals_topic.stream()

async def get_key(withdrawal):
    return await aiohttp.get(
        f'http://e.com/resolve_account/{withdrawal.account_id}')

async for event in s.group_by(get_key):
    ...

Using a regular callable to extract a new key:

s = withdrawals_topic.stream()

def get_key(withdrawal):
    return withdrawal.account_id.upper()

async for event in s.group_by(get_key):
    ...
Return type:

StreamT

filter(fun: Callable[[T], Union[T, Awaitable[T]]]) StreamT[source]

Filter values from stream using callback.

The callback may be a traditional function, lambda function, or an async def function.

This method is useful for filtering events before repartitioning a stream.

Examples

>>> async for v in stream.filter(lambda: v > 1000).group_by(...):
...     # do something
Return type:

StreamT

derive_topic(name: str, *, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, prefix: str = '', suffix: str = '') TopicT[source]

Create Topic description derived from the K/V type of this stream.

Parameters:
  • name (str) – Topic name.

  • key_type (_GenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.

  • value_type (_GenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.

Raises:

ValueError – if the stream channel is not a topic.

Return type:

TopicT

async throw(exc: BaseException) None[source]

Send exception to stream iteration.

Return type:

None

combine(*nodes: JoinableT, **kwargs: Any) StreamT[source]

Combine streams and tables into joined stream.

Return type:

StreamT

contribute_to_stream(active: StreamT) None[source]

Add stream as node in joined stream.

Return type:

None

async remove_from_stream(stream: StreamT) None[source]

Remove as node in a joined stream.

Return type:

None

join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined.

Return type:

StreamT

left_join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined by LEFT JOIN.

Return type:

StreamT

inner_join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined by INNER JOIN.

Return type:

StreamT

outer_join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined by OUTER JOIN.

Return type:

StreamT

async on_merge(value: Optional[T] = None) Optional[T][source]

Signal called when an event is to be joined.

Return type:

_GenericAlias[~T, None]

async on_start() None[source]

Signal called when the stream starts.

Return type:

None

async stop() None[source]

Stop this stream.

Return type:

None

async on_stop() None[source]

Signal that the stream is stopping.

Return type:

None

async ack(event: EventT) bool[source]

Ack event.

This will decrease the reference count of the event message by one, and when the reference count reaches zero, the worker will commit the offset so that the message will not be seen by a worker again.

Parameters:

event (EventT) – Event to ack.

Return type:

bool

property label: str

Return description of stream, used in graphs and logs. :rtype: str

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
property shortlabel: str

Return short description of stream. :rtype: str

faust.streams.current_event() Optional[EventT][source]

Return the event currently being processed, or None.

Return type:

_GenericAlias[EventT, None]