Source code for faust.types.streams

import abc
import asyncio
import typing
from typing import (
    Any,
    AsyncIterable,
    AsyncIterator,
    Awaitable,
    Callable,
    Iterable,
    List,
    Mapping,
    Optional,
    Sequence,
    Set,
    Tuple,
    TypeVar,
    Union,
    no_type_check,
)

from mode import Seconds, ServiceT
from mode.utils.trees import NodeT

from .channels import ChannelT
from .core import K
from .events import EventT
from .models import FieldDescriptorT, ModelArg
from .topics import TopicT
from .tuples import TP

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .join import JoinT as _JoinT
    from .serializers import SchemaT as _SchemaT
else:

    class _AppT:
        ...  # noqa

    class _JoinT:
        ...  # noqa

    class _SchemaT:
        ...  # noqa


__all__ = [
    "Processor",
    "GroupByKeyArg",
    "StreamT",
    "T",
    "T_co",
    "T_contra",
]

# Used for typing StreamT[Withdrawal]
T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True)
T_contra = TypeVar("T_contra", contravariant=True)

Processor = Callable[[T], Union[T, Awaitable[T]]]

#: Type of the `key` argument to `Stream.group_by()`
GroupByKeyArg = Union[FieldDescriptorT, Callable[[T], K]]


class JoinableT(abc.ABC):
    @abc.abstractmethod
    def combine(self, *nodes: "JoinableT", **kwargs: Any) -> "StreamT":
        ...

    @abc.abstractmethod
    def join(self, *fields: FieldDescriptorT) -> "StreamT":
        ...

    @abc.abstractmethod
    def left_join(self, *fields: FieldDescriptorT) -> "StreamT":
        ...

    @abc.abstractmethod
    def inner_join(self, *fields: FieldDescriptorT) -> "StreamT":
        ...

    @abc.abstractmethod
    def outer_join(self, *fields: FieldDescriptorT) -> "StreamT":
        ...

    @abc.abstractmethod
    def __and__(self, other: Any) -> Any:
        ...

    @abc.abstractmethod
    def contribute_to_stream(self, active: "StreamT") -> None:
        ...

    @abc.abstractmethod
    async def remove_from_stream(self, stream: "StreamT") -> None:
        ...

    @abc.abstractmethod
    def _human_channel(self) -> str:
        ...


[docs]class StreamT(AsyncIterable[T_co], JoinableT, ServiceT): app: _AppT channel: AsyncIterator[T_co] outbox: Optional[asyncio.Queue] = None join_strategy: Optional[_JoinT] = None task_owner: Optional[asyncio.Task] = None current_event: Optional[EventT] = None active_partitions: Optional[Set[TP]] = None concurrency_index: Optional[int] = None enable_acks: bool = True prefix: str = "" # List of combined streams/tables after ret = (s1 & s2) combined them. # AFter this ret.combined == [s1, s2] combined: List[JoinableT] # group_by, through, etc. sets this, and it means the # active stream (the one the agent would be reading from) can be found # by walking the path of links:: # >>> node = stream # >>> while node._next: # ... node = node._next # which is also what .get_active_stream() gives _next: Optional["StreamT"] = None _prev: Optional["StreamT"] = None @abc.abstractmethod def __init__( self, channel: AsyncIterator[T_co] = None, *, app: Optional[_AppT] = None, processors: Iterable[Processor[T]] = None, combined: List[JoinableT] = None, on_start: Optional[Callable] = None, join_strategy: _JoinT = None, beacon: Optional[NodeT] = None, concurrency_index: Optional[int] = None, prev: "StreamT" = None, active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = "", loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: ...
[docs] @abc.abstractmethod def get_active_stream(self) -> "StreamT": ...
[docs] @abc.abstractmethod def add_processor(self, processor: Processor[T]) -> None: ...
[docs] @abc.abstractmethod def info(self) -> Mapping[str, Any]: ...
[docs] @abc.abstractmethod def clone(self, **kwargs: Any) -> "StreamT": ...
[docs] @abc.abstractmethod @no_type_check async def items(self) -> AsyncIterator[Tuple[K, T_co]]: ...
[docs] @abc.abstractmethod @no_type_check async def events(self) -> AsyncIterable[EventT]: ...
[docs] @abc.abstractmethod @no_type_check async def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]]: ...
[docs] @abc.abstractmethod def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: ...
[docs] @abc.abstractmethod def through(self, channel: Union[str, ChannelT]) -> "StreamT": ...
[docs] @abc.abstractmethod def echo(self, *channels: Union[str, ChannelT]) -> "StreamT": ...
[docs] @abc.abstractmethod def group_by( self, key: GroupByKeyArg, *, name: Optional[str] = None, topic: Optional[TopicT] = None ) -> "StreamT": ...
[docs] @abc.abstractmethod def derive_topic( self, name: str, *, schema: Optional[_SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, prefix: str = "", suffix: str = "" ) -> TopicT: ...
[docs] @abc.abstractmethod async def throw(self, exc: BaseException) -> None: ...
@abc.abstractmethod def __copy__(self) -> "StreamT": ... @abc.abstractmethod def __iter__(self) -> Any: ... @abc.abstractmethod def __next__(self) -> T: ... @abc.abstractmethod def __aiter__(self) -> AsyncIterator[T_co]: ...
[docs] @abc.abstractmethod async def ack(self, event: EventT) -> bool: ...