import abc
import asyncio
import typing
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
no_type_check,
)
from mode import Seconds, ServiceT
from mode.utils.trees import NodeT
from .channels import ChannelT
from .core import K
from .events import EventT
from .models import FieldDescriptorT, ModelArg
from .topics import TopicT
from .tuples import TP
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
from .join import JoinT as _JoinT
from .serializers import SchemaT as _SchemaT
else:
class _AppT:
... # noqa
class _JoinT:
... # noqa
class _SchemaT:
... # noqa
__all__ = [
"Processor",
"GroupByKeyArg",
"StreamT",
"T",
"T_co",
"T_contra",
]
# Used for typing StreamT[Withdrawal]
T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True)
T_contra = TypeVar("T_contra", contravariant=True)
Processor = Callable[[T], Union[T, Awaitable[T]]]
#: Type of the `key` argument to `Stream.group_by()`
GroupByKeyArg = Union[FieldDescriptorT, Callable[[T], K]]
class JoinableT(abc.ABC):
@abc.abstractmethod
def combine(self, *nodes: "JoinableT", **kwargs: Any) -> "StreamT":
...
@abc.abstractmethod
def join(self, *fields: FieldDescriptorT) -> "StreamT":
...
@abc.abstractmethod
def left_join(self, *fields: FieldDescriptorT) -> "StreamT":
...
@abc.abstractmethod
def inner_join(self, *fields: FieldDescriptorT) -> "StreamT":
...
@abc.abstractmethod
def outer_join(self, *fields: FieldDescriptorT) -> "StreamT":
...
@abc.abstractmethod
def __and__(self, other: Any) -> Any:
...
@abc.abstractmethod
def contribute_to_stream(self, active: "StreamT") -> None:
...
@abc.abstractmethod
async def remove_from_stream(self, stream: "StreamT") -> None:
...
@abc.abstractmethod
def _human_channel(self) -> str:
...
[docs]class StreamT(AsyncIterable[T_co], JoinableT, ServiceT):
app: _AppT
channel: AsyncIterator[T_co]
outbox: Optional[asyncio.Queue] = None
join_strategy: Optional[_JoinT] = None
task_owner: Optional[asyncio.Task] = None
current_event: Optional[EventT] = None
active_partitions: Optional[Set[TP]] = None
concurrency_index: Optional[int] = None
enable_acks: bool = True
prefix: str = ""
# List of combined streams/tables after ret = (s1 & s2) combined them.
# AFter this ret.combined == [s1, s2]
combined: List[JoinableT]
# group_by, through, etc. sets this, and it means the
# active stream (the one the agent would be reading from) can be found
# by walking the path of links::
# >>> node = stream
# >>> while node._next:
# ... node = node._next
# which is also what .get_active_stream() gives
_next: Optional["StreamT"] = None
_prev: Optional["StreamT"] = None
@abc.abstractmethod
def __init__(
self,
channel: AsyncIterator[T_co] = None,
*,
app: Optional[_AppT] = None,
processors: Iterable[Processor[T]] = None,
combined: List[JoinableT] = None,
on_start: Optional[Callable] = None,
join_strategy: _JoinT = None,
beacon: Optional[NodeT] = None,
concurrency_index: Optional[int] = None,
prev: "StreamT" = None,
active_partitions: Optional[Set[TP]] = None,
enable_acks: bool = True,
prefix: str = "",
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
...
[docs] @abc.abstractmethod
def get_active_stream(self) -> "StreamT":
...
[docs] @abc.abstractmethod
def add_processor(self, processor: Processor[T]) -> None:
...
[docs] @abc.abstractmethod
def info(self) -> Mapping[str, Any]:
...
[docs] @abc.abstractmethod
def clone(self, **kwargs: Any) -> "StreamT":
...
[docs] @abc.abstractmethod
@no_type_check
async def items(self) -> AsyncIterator[Tuple[K, T_co]]:
...
[docs] @abc.abstractmethod
@no_type_check
async def events(self) -> AsyncIterable[EventT]:
...
[docs] @abc.abstractmethod
@no_type_check
async def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]]:
...
[docs] @abc.abstractmethod
def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
...
[docs] @abc.abstractmethod
def through(self, channel: Union[str, ChannelT]) -> "StreamT":
...
[docs] @abc.abstractmethod
def echo(self, *channels: Union[str, ChannelT]) -> "StreamT":
...
[docs] @abc.abstractmethod
def group_by(
self,
key: GroupByKeyArg,
*,
name: Optional[str] = None,
topic: Optional[TopicT] = None
) -> "StreamT":
...
[docs] @abc.abstractmethod
def derive_topic(
self,
name: str,
*,
schema: Optional[_SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
prefix: str = "",
suffix: str = ""
) -> TopicT:
...
[docs] @abc.abstractmethod
async def throw(self, exc: BaseException) -> None:
...
@abc.abstractmethod
def __copy__(self) -> "StreamT":
...
@abc.abstractmethod
def __iter__(self) -> Any:
...
@abc.abstractmethod
def __next__(self) -> T:
...
@abc.abstractmethod
def __aiter__(self) -> AsyncIterator[T_co]:
...
[docs] @abc.abstractmethod
async def ack(self, event: EventT) -> bool:
...