"""Channel.
A channel is used to send values to streams.
The stream will iterate over incoming events in the channel.
"""
import asyncio
from typing import (
Any,
Awaitable,
Callable,
Mapping,
MutableSet,
Optional,
Set,
Tuple,
TypeVar,
cast,
no_type_check,
)
from weakref import WeakSet
from mode import Seconds, get_logger, want_seconds
from mode.utils.futures import maybe_async, stampede
from mode.utils.queues import ThrowableQueue
from .types import (
TP,
AppT,
ChannelT,
CodecArg,
EventT,
FutureMessage,
K,
Message,
MessageSentCallback,
ModelArg,
PendingMessage,
RecordMetadata,
SchemaT,
StreamT,
V,
)
from .types.core import HeadersArg, OpenHeadersArg, prepare_headers
from .types.tuples import _PendingMessage_to_Message
__all__ = ["Channel"]
logger = get_logger(__name__)
T = TypeVar("T")
T_contra = TypeVar("T_contra", contravariant=True)
[docs]class Channel(ChannelT[T]):
"""Create new channel.
Arguments:
app: The app that created this channel (``app.channel()``)
schema: Schema used for serialization/deserialization
key_type: The Model used for keys in this channel.
(overrides schema if one is defined)
value_type: The Model used for values in this channel.
(overrides schema if one is defined)
maxsize: The maximum number of messages this channel can hold.
If exceeded any new ``put`` call will block until a message
is removed from the channel.
is_iterator: When streams iterate over a channel they will call
``stream.clone(is_iterator=True)`` so this attribute
denotes that this channel instance is currently being iterated
over.
active_partitions: Set of active topic partitions this
channel instance is assigned to.
loop: The :mod:`asyncio` event loop to use.
"""
app: AppT
schema: SchemaT
key_type: Optional[ModelArg]
value_type: Optional[ModelArg]
is_iterator: bool
_queue: Optional[ThrowableQueue]
_root: Optional["Channel"]
_subscribers: MutableSet["Channel"]
def __init__(
self,
app: AppT,
*,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
is_iterator: bool = False,
queue: Optional[ThrowableQueue] = None,
maxsize: Optional[int] = None,
root: Optional[ChannelT] = None,
active_partitions: Optional[Set[TP]] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
self.app = app
self.loop = loop
self.is_iterator = is_iterator
self._queue = queue
self.maxsize = maxsize
self.deliver = self._compile_deliver() # type: ignore
self._root = cast(Channel, root)
self.active_partitions = active_partitions
self._subscribers = WeakSet()
if schema is None:
self.schema = self._get_default_schema(key_type, value_type)
else:
self.schema = schema
self.schema.update(key_type=key_type, value_type=value_type)
self.key_type = self.schema.key_type
self.value_type = self.schema.value_type
def _get_default_schema(
self, key_type: ModelArg = None, value_type: ModelArg = None
) -> SchemaT:
return cast(
SchemaT,
self.app.conf.Schema( # type: ignore
key_type=key_type,
value_type=value_type,
),
)
@property
def queue(self) -> ThrowableQueue:
"""Return the underlying queue/buffer backing this channel."""
if self._queue is None:
# this should only be set after clone = channel.__aiter__()
# which means the loop is not accessed by merely defining
# a channel at module scope.
maxsize = self.maxsize
if maxsize is None:
maxsize = self.app.conf.stream_buffer_maxsize
self._queue = self.app.FlowControlQueue(
maxsize=maxsize,
clear_on_resume=True,
)
return self._queue
[docs] def clone(
self, *, is_iterator: Optional[bool] = None, **kwargs: Any
) -> ChannelT[T]:
"""Create clone of this channel.
Arguments:
is_iterator: Set to True if this is now a channel
that is being iterated over.
Keyword Arguments:
**kwargs: Any keyword arguments passed will override any
of the arguments supported by
:class:`Channel.__init__ <Channel>`.
"""
is_it = is_iterator if is_iterator is not None else self.is_iterator
subchannel: ChannelT = self._clone(is_iterator=is_it, **kwargs)
if is_it:
(self._root or self)._subscribers.add(cast(Channel, subchannel))
# make sure queue is created at this point
# ^ it's a cached_property
subchannel.queue
return subchannel
[docs] def clone_using_queue(self, queue: asyncio.Queue) -> ChannelT[T]:
"""Create clone of this channel using specific queue instance."""
return self.clone(queue=queue, is_iterator=True)
def _clone(self, **kwargs: Any) -> ChannelT[T]:
return type(self)(**{**self._clone_args(), **kwargs})
def _clone_args(self) -> Mapping:
# How to create a copy of this channel.
return {
"app": self.app,
"loop": self.loop,
"schema": self.schema,
"key_type": self.key_type,
"value_type": self.value_type,
"maxsize": self.maxsize,
"root": self._root if self._root is not None else self,
"queue": None,
"active_partitions": self.active_partitions,
}
[docs] def stream(self, **kwargs: Any) -> StreamT[T]:
"""Create stream reading from this channel."""
return self.app.stream(self, **kwargs)
[docs] def get_topic_name(self) -> str:
"""Get the topic name, or raise if this is not a named channel."""
raise NotImplementedError("Channels are unnamed topics")
[docs] async def send(
self,
*,
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Send message to channel."""
return await self._send_now(
key,
value,
partition=partition,
timestamp=timestamp,
headers=headers,
schema=schema,
key_serializer=key_serializer,
value_serializer=value_serializer,
callback=callback,
)
[docs] def send_soon(
self,
*,
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
eager_partitioning: bool = False,
) -> FutureMessage:
"""Produce message by adding to buffer.
This method is only supported by :class:`~faust.Topic`.
Raises:
NotImplementedError: always for in-memory channel.
"""
raise NotImplementedError()
[docs] def as_future_message(
self,
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
eager_partitioning: bool = False,
) -> FutureMessage:
"""Create promise that message will be transmitted."""
open_headers = self.prepare_headers(headers)
final_key, open_headers = self.prepare_key(
key, key_serializer, schema, open_headers
)
final_value, open_headers = self.prepare_value(
value, value_serializer, schema, open_headers
)
if partition is None and eager_partitioning:
# Note: raises NotImplementedError if used on unnamed channel.
partition = self.app.producer.key_partition(
self.get_topic_name(), final_key
).partition
return FutureMessage(
PendingMessage(
self,
final_key,
final_value,
key_serializer=key_serializer,
value_serializer=value_serializer,
partition=partition,
timestamp=timestamp,
headers=open_headers,
callback=callback,
# Python 3.6.0: NamedTuple doesn't support optional fields
# [ask]
topic=None,
offset=None,
generation_id=self.app.consumer_generation_id,
),
)
async def _send_now(
self,
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
) -> Awaitable[RecordMetadata]:
return await self.publish_message(
self.as_future_message(
key,
value,
partition,
timestamp,
headers,
schema,
key_serializer,
value_serializer,
callback,
)
)
[docs] async def publish_message(
self, fut: FutureMessage, wait: bool = True
) -> Awaitable[RecordMetadata]:
"""Publish message to channel.
This is the interface used by ``topic.send()``, etc.
to actually publish the message on the channel
after being buffered up or similar.
It takes a :class:`~faust.types.FutureMessage` object,
which contains all the information required to send
the message, and acts as a promise that is resolved
once the message has been fully transmitted.
"""
event = self._future_message_to_event(fut)
await self.put(event)
topic, partition = tp = TP(
fut.message.topic or "<anon>", fut.message.partition or -1
)
return await self._finalize_message(
fut,
RecordMetadata(
topic=topic,
partition=partition,
topic_partition=tp,
offset=-1,
timestamp=fut.message.timestamp,
timestamp_type=1,
),
)
def _future_message_to_event(self, fut: FutureMessage) -> EventT:
return self._create_event(
fut.message.key,
fut.message.value,
fut.message.headers,
message=_PendingMessage_to_Message(fut.message),
)
async def _finalize_message(
self, fut: FutureMessage, result: RecordMetadata
) -> FutureMessage:
fut.set_result(result)
if fut.message.callback:
await maybe_async(fut.message.callback(fut))
return fut
[docs] @stampede
async def maybe_declare(self) -> None:
"""Declare/create this channel, but only if it doesn't exist."""
...
[docs] async def declare(self) -> None:
"""Declare/create this channel.
This is used to create this channel on a server,
if that is required to operate it.
"""
...
[docs] def prepare_key(
self,
key: K,
key_serializer: CodecArg,
schema: Optional[SchemaT] = None,
headers: OpenHeadersArg = None,
) -> Tuple[Any, OpenHeadersArg]:
"""Prepare key before it is sent to this channel.
:class:`~faust.Topic` uses this to implement serialization of keys
sent to the channel.
"""
return key, headers
[docs] def prepare_value(
self,
value: V,
value_serializer: CodecArg,
schema: Optional[SchemaT] = None,
headers: OpenHeadersArg = None,
) -> Tuple[Any, OpenHeadersArg]:
"""Prepare value before it is sent to this channel.
:class:`~faust.Topic` uses this to implement serialization of values
sent to the channel.
"""
return value, headers
[docs] async def decode(self, message: Message, *, propagate: bool = False) -> EventT[T]:
"""Decode :class:`~faust.types.Message` into :class:`~faust.Event`."""
return self._create_event(
message.key, message.value, message.headers, message=message
)
[docs] async def deliver(self, message: Message) -> None: # pragma: no cover
"""Deliver message to queue from consumer.
This is called by the consumer to deliver the message
to the channel.
"""
... # closure compiled at __init__
def _compile_deliver(self) -> Callable[[Message], Awaitable[None]]:
put = None
async def deliver(message: Message) -> None:
nonlocal put
if put is None:
# NOTE circumvents self.put, using queue directly
put = self.queue.put
event = await self.decode(message)
await put(event)
return deliver
def _create_event(
self, key: K, value: V, headers: Optional[HeadersArg], message: Message
) -> EventT[T]:
return self.app.create_event(key, value, headers, message)
[docs] async def put(self, value: EventT[T_contra]) -> None:
"""Put event onto this channel."""
root = self._root if self._root is not None else self
for subscriber in root._subscribers:
await subscriber.queue.put(value)
[docs] async def get(self, *, timeout: Optional[Seconds] = None) -> EventT[T]:
"""Get the next :class:`~faust.Event` received on this channel."""
timeout_: float = want_seconds(timeout)
if timeout_:
return await asyncio.wait_for(self.queue.get(), timeout=timeout_)
return await self.queue.get()
[docs] def empty(self) -> bool:
"""Return :const:`True` if the queue is empty."""
return self.queue.empty()
[docs] async def on_key_decode_error(self, exc: Exception, message: Message) -> None:
"""Unable to decode the key of an item in the queue.
See Also:
:meth:`on_decode_error`
"""
await self.on_decode_error(exc, message)
await self.throw(exc)
[docs] async def on_value_decode_error(self, exc: Exception, message: Message) -> None:
"""Unable to decode the value of an item in the queue.
See Also:
:meth:`on_decode_error`
"""
await self.on_decode_error(exc, message)
await self.throw(exc)
[docs] async def on_decode_error(self, exc: Exception, message: Message) -> None:
"""Signal that there was an error reading an event in the queue.
When a message in the channel needs deserialization
to be reconstructed back to its original form, we will sometimes
see decoding/deserialization errors being raised, from missing
fields or malformed payloads, and so on.
We will log the exception, but you can also override this
to perform additional actions.
Admonition: Kafka
In the event a deserialization error occurs, we
HAVE to commit the offset of the source message to continue
processing the stream.
For this reason it is important that you keep a close eye on
error logs. For easy of use, we suggest using log aggregation
software, such as Sentry, to surface these errors to your
operations team.
"""
...
[docs] def on_stop_iteration(self) -> None:
"""Signal that iteration over this channel was stopped.
Tip:
Remember to call ``super`` when overriding this method.
"""
...
[docs] def derive(self, **kwargs: Any) -> ChannelT[T]:
"""Derive new channel from this channel, using new configuration.
See :class:`faust.Topic.derive`.
For local channels this will simply return the same channel.
"""
return self
def __aiter__(self) -> ChannelT[T]:
return self if self.is_iterator else self.clone(is_iterator=True)
async def __anext__(self) -> EventT[T]:
if not self.is_iterator:
raise RuntimeError("Need to call channel.__aiter__()")
return await self.queue.get()
[docs] async def throw(self, exc: BaseException) -> None:
"""Throw exception to be received by channel subscribers.
Tip:
When you find yourself having to call this from
a regular, non-``async def`` function, you can use :meth:`_throw`
instead.
"""
self.queue._throw(exc)
def _throw(self, exc: BaseException) -> None:
"""Non-async version of :meth:`throw`."""
self.queue._throw(exc)
def __repr__(self) -> str:
s = f"<{self.label}@{self._object_id_as_hex()}"
if self.active_partitions is not None:
# if we are restricted to a specific set of topic partitions,
# then include that in repr(channel).
if self.active_partitions:
active = (
"{"
+ ", ".join(
sorted(
f"{tp.topic}:{tp.partition}"
for tp in self.active_partitions
)
)
+ "}"
)
else:
# a defined, but empty .active_partitions signifies
# that we are still waiting for an assignment
# from the Consumer.
active = "{<pending for assignment>}"
s += f" active={active}"
s += ">"
return s
def _object_id_as_hex(self) -> str:
# hexadecimal version of id(self)
return f"{id(self):#x}"
def __str__(self) -> str:
# subclasses should override this
return "<ANON>"
@property
def subscriber_count(self) -> int:
"""Return number of active subscribers to local channel."""
return len(self._subscribers)
@property
def label(self) -> str:
"""Short textual description of channel."""
sym = "(*)" if self.is_iterator else ""
return f"{sym}{type(self).__name__}: {self}"
class SerializedChannel(Channel[T]):
def __init__(
self,
app: AppT,
*,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
allow_empty: Optional[bool] = None,
**kwargs: Any,
) -> None:
self.app = app # need to set early
if schema is not None:
self._contribute_to_schema(
schema,
key_type=key_type,
value_type=value_type,
key_serializer=key_serializer,
value_serializer=value_serializer,
allow_empty=allow_empty,
)
else:
schema = self._get_default_schema(
key_type, value_type, key_serializer, value_serializer, allow_empty
)
super().__init__(
app,
schema=schema,
key_type=key_type,
value_type=value_type,
**kwargs,
)
self.key_serializer = self.schema.key_serializer
self.value_serializer = self.schema.value_serializer
self.allow_empty = self.schema.allow_empty
def _contribute_to_schema(
self,
schema: SchemaT,
*,
key_type: ModelArg = None,
value_type: ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
allow_empty: Optional[bool] = None,
) -> None:
# Update schema and take compat attributes
# from passed schema.
schema.update(
key_type=key_type,
value_type=value_type,
key_serializer=key_serializer,
value_serializer=value_serializer,
allow_empty=allow_empty,
)
def _get_default_schema(
self,
key_type: ModelArg = None,
value_type: ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
allow_empty: Optional[bool] = None,
) -> SchemaT:
return cast(
SchemaT,
self.app.conf.Schema( # type: ignore
key_type=key_type,
value_type=value_type,
key_serializer=key_serializer,
value_serializer=value_serializer,
allow_empty=allow_empty,
),
)
@no_type_check # incompatible with base class, but OK
def _clone_args(self) -> Mapping:
return {
**super()._clone_args(),
**{
"key_serializer": self.key_serializer,
"value_serializer": self.value_serializer,
},
}
def prepare_key(
self,
key: K,
key_serializer: CodecArg,
schema: Optional[SchemaT] = None,
headers: OpenHeadersArg = None,
) -> Tuple[Any, OpenHeadersArg]:
"""Serialize key to format suitable for transport."""
if key is not None:
schema = schema or self.schema
assert schema is not None
return schema.dumps_key(
self.app, key, serializer=key_serializer, headers=headers
)
return None, headers
def prepare_value(
self,
value: V,
value_serializer: CodecArg,
schema: Optional[SchemaT] = None,
headers: OpenHeadersArg = None,
) -> Tuple[Any, OpenHeadersArg]:
"""Serialize value to format suitable for transport."""
schema = schema or self.schema
assert schema is not None
return schema.dumps_value(
self.app, value, serializer=value_serializer, headers=headers
)