faust.channels
¶
Channel.
A channel is used to send values to streams.
The stream will iterate over incoming events in the channel.
- class faust.channels.Channel(app: AppT, *, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, is_iterator: bool = False, queue: Optional[ThrowableQueue] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, loop: Optional[AbstractEventLoop] = None)[source]¶
Create new channel.
- Parameters:
app (
AppT
) – The app that created this channel (app.channel()
)schema (
_GenericAlias
[SchemaT
,None
]) – Schema used for serialization/deserializationkey_type (
_GenericAlias
[_GenericAlias
[ModelT
],_GenericAlias
[bytes
],_GenericAlias
[str
],None
]) – The Model used for keys in this channel. (overrides schema if one is defined)value_type (
_GenericAlias
[_GenericAlias
[ModelT
],_GenericAlias
[bytes
],_GenericAlias
[str
],None
]) – The Model used for values in this channel. (overrides schema if one is defined)maxsize (
_GenericAlias
[int
,None
]) – The maximum number of messages this channel can hold. If exceeded any newput
call will block until a message is removed from the channel.is_iterator (
bool
) – When streams iterate over a channel they will callstream.clone(is_iterator=True)
so this attribute denotes that this channel instance is currently being iterated over.active_partitions (
_GenericAlias
[_GenericAlias
[TP
],None
]) – Set of active topic partitions this channel instance is assigned to.loop (
_GenericAlias
[AbstractEventLoop
,None
]) – Theasyncio
event loop to use.
- property queue: ThrowableQueue¶
Return the underlying queue/buffer backing this channel. :rtype:
ThrowableQueue
- clone(*, is_iterator: Optional[bool] = None, **kwargs: Any) ChannelT[T] [source]¶
Create clone of this channel.
- Parameters:
is_iterator (
_GenericAlias
[bool
,None
]) – Set to True if this is now a channel that is being iterated over.- Keyword Arguments:
**kwargs – Any keyword arguments passed will override any of the arguments supported by
Channel.__init__
.- Return type:
_GenericAlias
[~T]
- clone_using_queue(queue: Queue) ChannelT[T] [source]¶
Create clone of this channel using specific queue instance.
- Return type:
_GenericAlias
[~T]
- stream(**kwargs: Any) StreamT[T] [source]¶
Create stream reading from this channel.
- Return type:
_GenericAlias
[~T]
- get_topic_name() str [source]¶
Get the topic name, or raise if this is not a named channel.
- Return type:
- async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata] [source]¶
Send message to channel.
- Return type:
_GenericAlias
[RecordMetadata
]
- send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage [source]¶
Produce message by adding to buffer.
This method is only supported by
Topic
.- Raises:
NotImplementedError – always for in-memory channel.
- Return type:
- as_future_message(key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, eager_partitioning: bool = False) FutureMessage [source]¶
Create promise that message will be transmitted.
- Return type:
- prepare_headers(headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]]) Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] [source]¶
Prepare
headers
passed before publishing.
- async publish_message(fut: FutureMessage, wait: bool = True) Awaitable[RecordMetadata] [source]¶
Publish message to channel.
This is the interface used by
topic.send()
, etc. to actually publish the message on the channel after being buffered up or similar.It takes a
FutureMessage
object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.- Return type:
_GenericAlias
[RecordMetadata
]
- maybe_declare() None [source]¶
Declare/create this channel, but only if it doesn’t exist.
- Return type:
None
- async declare() None [source]¶
Declare/create this channel.
This is used to create this channel on a server, if that is required to operate it.
- Return type:
None
- prepare_key(key: Optional[Union[bytes, _ModelT, Any]], key_serializer: Optional[Union[CodecT, str]], schema: Optional[SchemaT] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] = None) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]] [source]¶
Prepare key before it is sent to this channel.
Topic
uses this to implement serialization of keys sent to the channel.
- prepare_value(value: Union[bytes, _ModelT, Any], value_serializer: Optional[Union[CodecT, str]], schema: Optional[SchemaT] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] = None) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]] [source]¶
Prepare value before it is sent to this channel.
Topic
uses this to implement serialization of values sent to the channel.
- async decode(message: Message, *, propagate: bool = False) EventT[T] [source]¶
Decode
Message
intoEvent
.- Return type:
_GenericAlias
[~T]
- async deliver(message: Message) None [source]¶
Deliver message to queue from consumer.
This is called by the consumer to deliver the message to the channel.
- Return type:
None
- async get(*, timeout: Optional[Union[timedelta, float, str]] = None) EventT[T] [source]¶
Get the next
Event
received on this channel.- Return type:
_GenericAlias
[~T]
- async on_key_decode_error(exc: Exception, message: Message) None [source]¶
Unable to decode the key of an item in the queue.
See also
- Return type:
None
- async on_value_decode_error(exc: Exception, message: Message) None [source]¶
Unable to decode the value of an item in the queue.
See also
- Return type:
None
- async on_decode_error(exc: Exception, message: Message) None [source]¶
Signal that there was an error reading an event in the queue.
When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.
We will log the exception, but you can also override this to perform additional actions.
- Admonition: Kafka
In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.
For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.
- Return type:
None
- on_stop_iteration() None [source]¶
Signal that iteration over this channel was stopped.
Tip
Remember to call
super
when overriding this method.- Return type:
None
- derive(**kwargs: Any) ChannelT[T] [source]¶
Derive new channel from this channel, using new configuration.
See
faust.Topic.derive
.For local channels this will simply return the same channel.
- Return type:
_GenericAlias
[~T]
- async throw(exc: BaseException) None [source]¶
Throw exception to be received by channel subscribers.
Tip
When you find yourself having to call this from a regular, non-
async def
function, you can use_throw()
instead.- Return type:
None