import abc
import asyncio
import typing
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Coroutine,
Generic,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
no_type_check,
)
from mode import ServiceT, SupervisorStrategyT
from mode.utils.collections import ManagedUserDict
from .codecs import CodecArg
from .core import HeadersArg, K, V
from .events import EventT
from .models import ModelArg
from .serializers import SchemaT
from .streams import StreamT
from .topics import ChannelT
from .tuples import TP, Message, RecordMetadata
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
else:
class _AppT:
... # noqa
__all__ = [
"AgentErrorHandler",
"AgentFun",
"ActorT",
"ActorRefT",
"AgentManagerT",
"AgentT",
"AgentTestWrapperT",
"AsyncIterableActorT",
"AwaitableActorT",
"ReplyToArg",
"SinkT",
]
_T = TypeVar("_T")
AgentErrorHandler = Callable[["AgentT", BaseException], Awaitable]
AgentFun = Callable[
[StreamT[_T]],
Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable],
]
#: A sink can be: Agent, Channel
#: or callable/async callable taking value as argument.
SinkT = Union["AgentT", ChannelT, Callable[[Any], Union[Awaitable, None]]]
ReplyToArg = Union["AgentT", ChannelT, str]
[docs]class ActorT(ServiceT, Generic[_T]):
agent: "AgentT"
stream: StreamT
it: _T
actor_task: Optional[asyncio.Task]
active_partitions: Optional[Set[TP]]
#: If multiple instance are started for concurrency, this is its index.
index: Optional[int] = None
@abc.abstractmethod
def __init__(
self,
agent: "AgentT",
stream: StreamT,
it: _T,
active_partitions: Optional[Set[TP]] = None,
**kwargs: Any
) -> None:
...
[docs] @abc.abstractmethod
def cancel(self) -> None:
...
[docs] @abc.abstractmethod
async def on_isolated_partition_revoked(self, tp: TP) -> None:
...
[docs] @abc.abstractmethod
async def on_isolated_partition_assigned(self, tp: TP) -> None:
...
[docs] @abc.abstractmethod
def traceback(self) -> str:
...
[docs]class AsyncIterableActorT(ActorT[AsyncIterable], AsyncIterable):
"""Used for agent function that yields."""
[docs]class AwaitableActorT(ActorT[Awaitable], Awaitable):
"""Used for agent function that do not yield."""
ActorRefT = ActorT[Union[AsyncIterable, Awaitable]]
[docs]class AgentT(ServiceT, Generic[_T]):
name: str
app: _AppT
concurrency: int
help: str
supervisor_strategy: Optional[Type[SupervisorStrategyT]]
isolated_partitions: bool
@abc.abstractmethod
def __init__(
self,
fun: AgentFun,
*,
name: Optional[str] = None,
app: Optional[_AppT] = None,
channel: Union[str, ChannelT] = None,
concurrency: int = 1,
sink: Iterable[SinkT] = None,
on_error: AgentErrorHandler = None,
supervisor_strategy: Type[SupervisorStrategyT] = None,
help: Optional[str] = None,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
isolated_partitions: bool = False,
**kwargs: Any
) -> None:
self.fun: AgentFun = fun
[docs] @abc.abstractmethod
def actor_tracebacks(self) -> List[str]:
...
@abc.abstractmethod
def __call__(
self,
*,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None
) -> ActorRefT:
...
[docs] @abc.abstractmethod
def test_context(
self,
channel: Optional[ChannelT] = None,
supervisor_strategy: SupervisorStrategyT = None,
**kwargs: Any
) -> "AgentTestWrapperT":
...
[docs] @abc.abstractmethod
def add_sink(self, sink: SinkT) -> None:
...
[docs] @abc.abstractmethod
def stream(self, **kwargs: Any) -> StreamT:
...
[docs] @abc.abstractmethod
async def on_partitions_assigned(self, assigned: Set[TP]) -> None:
...
[docs] @abc.abstractmethod
async def on_partitions_revoked(self, revoked: Set[TP]) -> None:
...
[docs] @abc.abstractmethod
async def cast(
self,
value: V = None,
*,
key: K = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None
) -> None:
...
[docs] @abc.abstractmethod
async def ask(
self,
value: V = None,
*,
key: K = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
reply_to: ReplyToArg = None,
correlation_id: Optional[str] = None
) -> Any:
...
[docs] @abc.abstractmethod
async def send(
self,
*,
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
reply_to: ReplyToArg = None,
correlation_id: Optional[str] = None
) -> Awaitable[RecordMetadata]:
...
[docs] @abc.abstractmethod
@no_type_check # XXX mypy bugs out on this
async def map(
self,
values: Union[AsyncIterable, Iterable],
key: K = None,
reply_to: ReplyToArg = None,
) -> AsyncIterator:
...
[docs] @abc.abstractmethod
@no_type_check # XXX mypy bugs out on this
async def kvmap(
self,
items: Union[AsyncIterable[Tuple[K, V]], Iterable[Tuple[K, V]]],
reply_to: ReplyToArg = None,
) -> AsyncIterator[str]:
...
[docs] @abc.abstractmethod
async def join(
self,
values: Union[AsyncIterable[V], Iterable[V]],
key: K = None,
reply_to: ReplyToArg = None,
) -> List[Any]:
...
[docs] @abc.abstractmethod
async def kvjoin(
self,
items: Union[AsyncIterable[Tuple[K, V]], Iterable[Tuple[K, V]]],
reply_to: ReplyToArg = None,
) -> List[Any]:
...
[docs] @abc.abstractmethod
def info(self) -> Mapping:
...
[docs] @abc.abstractmethod
def clone(self, *, cls: Type["AgentT"] = None, **kwargs: Any) -> "AgentT":
...
[docs] @abc.abstractmethod
def get_topic_names(self) -> Iterable[str]:
...
@property
@abc.abstractmethod
def channel(self) -> ChannelT:
...
@channel.setter
def channel(self, channel: ChannelT) -> None:
...
@property
@abc.abstractmethod
def channel_iterator(self) -> AsyncIterator:
...
@channel_iterator.setter
def channel_iterator(self, channel: AsyncIterator) -> None:
...
@abc.abstractmethod
def _agent_label(self, name_suffix: str = "") -> str:
...
[docs]class AgentManagerT(ServiceT, ManagedUserDict[str, AgentT]):
app: _AppT
[docs] @abc.abstractmethod
async def wait_until_agents_started(self) -> None:
...
[docs] @abc.abstractmethod
async def on_rebalance(self, revoked: Set[TP], newly_assigned: Set[TP]) -> None:
...
[docs] @abc.abstractmethod
def actor_tracebacks(self) -> Mapping[str, List[str]]:
...
[docs] @abc.abstractmethod
def human_tracebacks(self) -> str:
...
[docs]class AgentTestWrapperT(AgentT, AsyncIterable):
new_value_processed: asyncio.Condition
original_channel: ChannelT
results: MutableMapping[int, Any]
sent_offset: int = 0
processed_offset: int = 0
@abc.abstractmethod
def __init__(
self, *args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any
) -> None:
...
[docs] @abc.abstractmethod
async def put(
self,
value: V = None,
key: K = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
*,
reply_to: ReplyToArg = None,
correlation_id: Optional[str] = None,
wait: bool = True
) -> EventT:
...
[docs] @abc.abstractmethod
def to_message(
self,
key: K,
value: V,
*,
partition: int = 0,
offset: int = 0,
timestamp: Optional[float] = None,
timestamp_type: int = 0,
headers: HeadersArg = None
) -> Message:
...
[docs] @abc.abstractmethod
async def throw(self, exc: BaseException) -> None:
...