"""Events received in streams."""
import typing
from types import TracebackType
from typing import Any, Awaitable, Optional, Type, Union, cast
from faust.types import (
AppT,
ChannelT,
CodecArg,
EventT,
HeadersArg,
K,
Message,
MessageSentCallback,
RecordMetadata,
SchemaT,
V,
)
if typing.TYPE_CHECKING: # pragma: no cover
from .app.base import App as _App
else:
class _App: ... # noqa
USE_EXISTING_KEY = object()
USE_EXISTING_VALUE = object()
USE_EXISTING_HEADERS = object()
[docs]class Event(EventT):
"""An event received on a channel.
Notes:
- Events have a key and a value::
event.key, event.value
- They also have a reference to the original message
(if available), such as a Kafka record:
event.message.offset
- Iterating over channels/topics yields Event:
async for event in channel:
...
- Iterating over a stream (that in turn iterate over channel) yields
Event.value::
async for value in channel.stream() # value is event.value
...
- If you only have a Stream object, you can also access underlying
events by using ``Stream.events``.
For example:
.. sourcecode:: python
async for event in channel.stream.events():
...
Also commonly used for finding the "current event" related to
a value in the stream:
.. sourcecode:: python
stream = channel.stream()
async for event in stream.events():
event = stream.current_event
message = event.message
topic = event.message.topic
You can retrieve the current event in a stream to:
- Get access to the serialized key+value.
- Get access to message properties like, what topic+partition
the value was received on, or its offset.
If you want access to both key and value, you should use
``stream.items()`` instead.
.. sourcecode:: python
async for key, value in stream.items():
...
``stream.current_event`` can also be accessed but you must take
extreme care you are using the correct stream object. Methods
such as ``.group_by(key)`` and ``.through(topic)`` returns cloned
stream objects, so in the example:
The best way to access the current_event in an agent is
to use the :class:`~contextvars.ContextVar`:
.. sourcecode:: python
from faust import current_event
@app.agent(topic)
async def process(stream):
async for value in stream:
event = current_event()
"""
def __init__(
self,
app: AppT,
key: K,
value: V,
headers: Optional[HeadersArg],
message: Message,
) -> None:
self.app: AppT = app
self.key: K = key
self.value: V = value
self.message: Message = message
if headers is not None:
if not isinstance(headers, dict):
self.headers = dict(headers)
else:
self.headers = headers
else:
self.headers = {}
self.acked: bool = False
[docs] async def send(
self,
channel: Union[str, ChannelT],
key: K = USE_EXISTING_KEY,
value: V = USE_EXISTING_VALUE,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: Any = USE_EXISTING_HEADERS,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Send object to channel."""
if key is USE_EXISTING_KEY:
key = self.key
if value is USE_EXISTING_VALUE:
value = self.value
if headers is USE_EXISTING_HEADERS:
headers = self.headers
return await self._send(
channel,
key,
value,
partition,
timestamp,
headers,
schema,
key_serializer,
value_serializer,
callback,
force=force,
)
[docs] async def forward(
self,
channel: Union[str, ChannelT],
key: K = USE_EXISTING_KEY,
value: V = USE_EXISTING_VALUE,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: Any = USE_EXISTING_HEADERS,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Forward original message (will not be reserialized)."""
if key is USE_EXISTING_KEY:
key = self.message.key
if value is USE_EXISTING_VALUE:
value = self.message.value
if headers is USE_EXISTING_HEADERS:
headers = self.message.headers
if not headers:
headers = None
return await self._send(
channel,
key,
value,
partition,
timestamp,
headers,
schema,
key_serializer,
value_serializer,
callback,
force=force,
)
async def _send(
self,
channel: Union[str, ChannelT],
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
return await cast(_App, self.app)._attachments.maybe_put(
channel,
key,
value,
partition,
timestamp,
headers,
schema,
key_serializer,
value_serializer,
callback,
force=force,
)
def _attach(
self,
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
) -> Awaitable[RecordMetadata]:
return cast(_App, self.app)._attachments.put(
self.message,
channel,
key,
value,
partition=partition,
timestamp=timestamp,
headers=headers,
schema=schema,
key_serializer=key_serializer,
value_serializer=value_serializer,
callback=callback,
)
[docs] def ack(self) -> bool:
"""Acknowledge event as being processed by stream.
When the last stream processor acks the message, the
offset in the source topic will be marked as safe-to-commit,
and the worker will commit and advance the committed offset.
"""
return self.message.ack(self.app.consumer)
def __repr__(self) -> str:
return f"<{type(self).__name__}: k={self.key!r} v={self.value!r}>"
async def __aenter__(self) -> EventT:
return self
async def __aexit__(
self,
_exc_type: Type[BaseException] = None,
_exc_val: BaseException = None,
_exc_tb: TracebackType = None,
) -> Optional[bool]:
self.ack()
return None