"""Message transport using :pypi:`aiokafka`."""
import asyncio
import typing
from asyncio import Lock, QueueEmpty
from collections import deque
from functools import partial
from time import monotonic
from typing import (
Any,
Awaitable,
Callable,
ClassVar,
Deque,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Set,
Tuple,
Type,
cast,
no_type_check,
)
import aiokafka
import aiokafka.abc
import opentracing
from aiokafka import TopicPartition
from aiokafka.consumer.group_coordinator import OffsetCommitRequest
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.errors import (
CommitFailedError,
ConsumerStoppedError,
IllegalStateError,
KafkaError,
NotControllerError,
ProducerFenced,
TopicAlreadyExistsError as TopicExistsError,
for_code,
)
from aiokafka.partitioner import DefaultPartitioner, murmur2
from aiokafka.protocol.metadata import MetadataRequest_v1
from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition
from aiokafka.util import parse_kafka_version
from mode import Service, get_logger
from mode.threads import ServiceThread, WorkerThread
from mode.utils import text
from mode.utils.futures import StampedeWrapper
from mode.utils.objects import cached_property
from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds
from opentracing.ext import tags
from yarl import URL
from faust.auth import (
GSSAPICredentials,
OAuthCredentials,
SASLCredentials,
SSLCredentials,
)
from faust.exceptions import (
ConsumerNotStarted,
ImproperlyConfigured,
NotReady,
ProducerSendError,
)
from faust.transport import base
from faust.transport.consumer import (
ConsumerThread,
RecordMap,
ThreadDelegateConsumer,
ensure_TPset,
)
from faust.types import (
TP,
ConsumerMessage,
FutureMessage,
HeadersArg,
PendingMessage,
RecordMetadata,
)
from faust.types.auth import CredentialsT
from faust.types.transports import ConsumerT, PartitionerT, ProducerT
from faust.utils.kafka.protocol.admin import CreateTopicsRequest
from faust.utils.tracing import noop_span, set_current_span, traced_from_parent_span
__all__ = ["Consumer", "Producer", "Transport"]
# if not hasattr(aiokafka, '__robinhood__'): # pragma: no cover
# raise RuntimeError(
# 'Please install robinhood-aiokafka, not aiokafka')
logger = get_logger(__name__)
DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID
TOPIC_LENGTH_MAX = 249
SLOW_PROCESSING_CAUSE_AGENT = """
The agent processing the stream is hanging (waiting for network, I/O or \
infinite loop).
""".strip()
SLOW_PROCESSING_CAUSE_STREAM = """
The stream has stopped processing events for some reason.
""".strip()
SLOW_PROCESSING_CAUSE_COMMIT = """
The commit handler background thread has stopped working (report as bug).
""".strip()
SLOW_PROCESSING_EXPLAINED = """
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the %(setting)s setting,
currently set to %(current_value)r. If you expect the time
required to process an event, to be greater than this then please
increase the timeout.
"""
SLOW_PROCESSING_NO_FETCH_SINCE_START = """
Aiokafka has not sent fetch request for %r since start (started %s)
""".strip()
SLOW_PROCESSING_NO_RESPONSE_SINCE_START = """
Aiokafka has not received fetch response for %r since start (started %s)
""".strip()
SLOW_PROCESSING_NO_RECENT_FETCH = """
Aiokafka stopped fetching from %r (last done %s)
""".strip()
SLOW_PROCESSING_NO_RECENT_RESPONSE = """
Broker stopped responding to fetch requests for %r (last responded %s)
""".strip()
SLOW_PROCESSING_NO_HIGHWATER_SINCE_START = """
Highwater not yet available for %r (started %s).
""".strip()
SLOW_PROCESSING_STREAM_IDLE_SINCE_START = """
Stream has not started processing %r (started %s).
""".strip()
SLOW_PROCESSING_STREAM_IDLE = """
Stream stopped processing, or is slow for %r (last inbound %s).
""".strip()
SLOW_PROCESSING_NO_COMMIT_SINCE_START = """
Has not committed %r at all since worker start (started %s).
""".strip()
SLOW_PROCESSING_NO_RECENT_COMMIT = """
Has not committed %r (last commit %s).
""".strip()
def __canon_host(host, default):
"""Ensure host is correctly formatted for aiokafka. That means IPv6
addresses must enclosed in squared brackets.
"""
if not host:
return default
if ":" in host:
return f"[{host}]"
return host
def server_list(urls: List[URL], default_port: int) -> List[str]:
"""Convert list of urls to list of servers accepted by :pypi:`aiokafka`."""
default_host = "127.0.0.1"
# Yarl strips [] from IPv6 adresses, and aiokafka expects them.
return [
f"{__canon_host(u.host, default_host)}:{u.port or default_port}" for u in urls
]
class ConsumerRebalanceListener(aiokafka.abc.ConsumerRebalanceListener): # type: ignore
# kafka's ridiculous class based callback interface makes this hacky.
def __init__(self, thread: ConsumerThread) -> None:
self._thread: ConsumerThread = thread
def on_partitions_revoked(self, revoked: Iterable[_TopicPartition]) -> Awaitable:
"""Call when partitions are being revoked."""
thread = self._thread
# XXX Must call app.on_rebalance_start as early as possible.
# we call this in the sync method, this way when we know
# that it will be called even if await never returns to the coroutine.
thread.app.on_rebalance_start()
# this way we should also get a warning if the coroutine
# is never awaited.
return thread.on_partitions_revoked(ensure_TPset(revoked))
async def on_partitions_assigned(self, assigned: Iterable[_TopicPartition]) -> None:
"""Call when partitions are being assigned."""
generation = self._thread._ensure_consumer()._coordinator.generation
# set the generation on the app
self._thread.app.consumer_generation_id = generation
await self._thread.on_partitions_assigned(ensure_TPset(assigned), generation)
[docs]class Consumer(ThreadDelegateConsumer):
"""Kafka consumer using :pypi:`aiokafka`."""
logger = logger
RebalanceListener: ClassVar[Type[ConsumerRebalanceListener]]
RebalanceListener = ConsumerRebalanceListener
consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = (
ConsumerStoppedError,
)
def _new_consumer_thread(self) -> ConsumerThread:
return AIOKafkaConsumerThread(self, loop=self.loop, beacon=self.beacon)
[docs] async def create_topic(
self,
topic: str,
partitions: int,
replication: int,
*,
config: Optional[Mapping[str, Any]] = None,
timeout: Seconds = 30.0,
retention: Optional[Seconds] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
if self.app.conf.topic_allow_declare:
await self._thread.create_topic(
topic,
partitions,
replication,
config=config,
timeout=timeout,
retention=retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
else:
logger.warning(f"Topic creation disabled! Can't create topic {topic}")
def _new_topicpartition(self, topic: str, partition: int) -> TP:
return cast(TP, _TopicPartition(topic, partition))
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:
timestamp: Optional[int] = record.timestamp
timestamp_s: float = cast(float, None)
if timestamp is not None:
timestamp_s = timestamp / 1000.0
return ConsumerMessage(
record.topic,
record.partition,
record.offset,
timestamp_s,
record.timestamp_type,
record.headers,
record.key,
record.value,
record.checksum,
record.serialized_key_size,
record.serialized_value_size,
tp,
generation_id=self.app.consumer_generation_id,
)
[docs] async def on_stop(self) -> None:
"""Call when consumer is stopping."""
await super().on_stop()
transport = cast(Transport, self.transport)
transport._topic_waiters.clear()
[docs] def verify_event_path(self, now: float, tp: TP) -> None:
return self._thread.verify_event_path(now, tp)
class ThreadedProducer(ServiceThread):
_producer: Optional[aiokafka.AIOKafkaProducer] = None
event_queue: Optional[asyncio.Queue] = None
_default_producer: Optional[aiokafka.AIOKafkaProducer] = None
_push_events_task: Optional[asyncio.Task] = None
app: None
stopped: bool
def __init__(
self,
default_producer,
app,
*,
executor: Any = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
thread_loop: Optional[asyncio.AbstractEventLoop] = None,
Worker: Type[WorkerThread] = None,
**kwargs: Any,
) -> None:
super().__init__(
executor=executor,
thread_loop=thread_loop,
Worker=Worker,
**kwargs,
)
self._default_producer = default_producer
self.app = app
async def flush(self) -> None:
"""Wait for producer to finish transmitting all buffered messages."""
while True:
try:
msg = self.event_queue.get_nowait()
except QueueEmpty:
break
else:
await self.publish_message(msg)
if self._producer is not None:
await self._producer.flush()
def _new_producer(
self, transactional_id: Optional[str] = None
) -> aiokafka.AIOKafkaProducer:
return aiokafka.AIOKafkaProducer(
loop=self.thread_loop,
**{
**self._default_producer._settings_default(),
**self._default_producer._settings_auth(),
**self._default_producer._settings_extra(),
},
transactional_id=transactional_id,
)
async def on_start(self) -> None:
self.event_queue = asyncio.Queue()
producer = self._producer = self._new_producer()
await producer.start()
self.stopped = False
self._push_events_task = self.thread_loop.create_task(self.push_events())
async def on_thread_stop(self) -> None:
"""Call when producer thread is stopping."""
logger.info("Stopping producer thread")
await super().on_thread_stop()
self.stopped = True
# when method queue is stopped, we can stop the consumer
if self._producer is not None:
await self.flush()
await self._producer.stop()
if self._push_events_task is not None:
while not self._push_events_task.done():
await asyncio.sleep(0.1)
async def push_events(self):
while not self.stopped:
try:
event = await asyncio.wait_for(self.event_queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue
self.app.sensors.on_threaded_producer_buffer_processed(
app=self.app, size=self.event_queue.qsize()
)
await self.publish_message(event)
async def publish_message(
self, fut_other: FutureMessage, wait: bool = False
) -> Awaitable[RecordMetadata]:
"""Fulfill promise to publish message to topic."""
fut = FutureMessage(fut_other.message)
message: PendingMessage = fut.message
topic = message.channel.get_topic_name()
key: bytes = cast(bytes, message.key)
value: bytes = cast(bytes, message.value)
partition: Optional[int] = message.partition
timestamp: float = cast(float, message.timestamp)
headers: Optional[HeadersArg] = message.headers
logger.debug(
"send: topic=%r k=%r v=%r timestamp=%r partition=%r",
topic,
key,
value,
timestamp,
partition,
)
producer = self._producer
state = self.app.sensors.on_send_initiated(
producer,
topic,
message=message,
keysize=len(key) if key else 0,
valsize=len(value) if value else 0,
)
timestamp_ms = int(timestamp * 1000.0) if timestamp else timestamp
if headers is not None:
if isinstance(headers, Mapping):
headers = list(headers.items())
if wait:
ret: RecordMetadata = await producer.send_and_wait(
topic=topic,
key=key,
value=value,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
)
fut.message.channel._on_published(
message=fut, state=state, producer=producer
)
fut.set_result(ret)
return fut
else:
fut2 = cast(
asyncio.Future,
await producer.send(
topic=topic,
key=key,
value=value,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
),
)
callback = partial(
fut.message.channel._on_published,
message=fut,
state=state,
producer=producer,
)
fut2.add_done_callback(cast(Callable, callback))
return fut2
class AIOKafkaConsumerThread(ConsumerThread):
_consumer: Optional[aiokafka.AIOKafkaConsumer] = None
_pending_rebalancing_spans: Deque[opentracing.Span]
tp_last_committed_at: MutableMapping[TP, float]
time_started: float
tp_fetch_request_timeout_secs: float
tp_fetch_response_timeout_secs: float
tp_stream_timeout_secs: float
tp_commit_timeout_secs: float
def __post_init__(self) -> None:
consumer = cast(Consumer, self.consumer)
self._partitioner: PartitionerT = (
self.app.conf.producer_partitioner or DefaultPartitioner()
)
self._rebalance_listener = consumer.RebalanceListener(self)
self._pending_rebalancing_spans = deque()
self.tp_last_committed_at = {}
app = self.consumer.app
stream_processing_timeout = app.conf.stream_processing_timeout
self.tp_fetch_request_timeout_secs = stream_processing_timeout
self.tp_fetch_response_timeout_secs = stream_processing_timeout
self.tp_stream_timeout_secs = stream_processing_timeout
commit_livelock_timeout = app.conf.broker_commit_livelock_soft_timeout
self.tp_commit_timeout_secs = commit_livelock_timeout
async def on_start(self) -> None:
"""Call when consumer starts."""
self._consumer = self._create_consumer(loop=self.thread_loop)
self.time_started = monotonic()
await self._consumer.start()
async def on_thread_stop(self) -> None:
"""Call when consumer thread is stopping."""
# super stops thread method queue (QueueServiceThread.method_queue)
await super().on_thread_stop()
# when method queue is stopped, we can stop the consumer
if self._consumer is not None:
await self._consumer.stop()
def _create_consumer(
self, loop: asyncio.AbstractEventLoop
) -> aiokafka.AIOKafkaConsumer:
transport = cast(Transport, self.transport)
if self.app.client_only:
return self._create_client_consumer(transport)
else:
return self._create_worker_consumer(transport)
def _create_worker_consumer(
self, transport: "Transport"
) -> aiokafka.AIOKafkaConsumer:
isolation_level: str = "read_uncommitted"
conf = self.app.conf
if self.consumer.in_transaction:
isolation_level = "read_committed"
self._assignor = (
self.app.assignor
if self.app.conf.table_standby_replicas > 0
else RoundRobinPartitionAssignor
)
auth_settings = credentials_to_aiokafka_auth(
conf.broker_credentials, conf.ssl_context
)
max_poll_interval = conf.broker_max_poll_interval or 0
request_timeout = conf.broker_request_timeout
session_timeout = conf.broker_session_timeout
rebalance_timeout = conf.broker_rebalance_timeout
if session_timeout > request_timeout:
raise ImproperlyConfigured(
f"Setting broker_session_timeout={session_timeout} "
f"cannot be greater than "
f"broker_request_timeout={request_timeout}"
)
return aiokafka.AIOKafkaConsumer(
api_version=conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
group_instance_id=conf.consumer_group_instance_id,
bootstrap_servers=server_list(transport.url, transport.default_port),
partition_assignment_strategy=[self._assignor],
enable_auto_commit=False,
auto_offset_reset=conf.consumer_auto_offset_reset,
max_poll_records=conf.broker_max_poll_records,
max_poll_interval_ms=int(max_poll_interval * 1000.0),
max_partition_fetch_bytes=conf.consumer_max_fetch_size,
fetch_max_wait_ms=1500,
request_timeout_ms=int(request_timeout * 1000.0),
check_crcs=conf.broker_check_crcs,
session_timeout_ms=int(session_timeout * 1000.0),
rebalance_timeout_ms=int(rebalance_timeout * 1000.0),
heartbeat_interval_ms=int(conf.broker_heartbeat_interval * 1000.0),
isolation_level=isolation_level,
metadata_max_age_ms=conf.consumer_metadata_max_age_ms,
connections_max_idle_ms=conf.consumer_connections_max_idle_ms,
# traced_from_parent_span=self.traced_from_parent_span,
# start_rebalancing_span=self.start_rebalancing_span,
# start_coordinator_span=self.start_coordinator_span,
# on_generation_id_known=self.on_generation_id_known,
# flush_spans=self.flush_spans,
**auth_settings,
)
def _create_client_consumer(
self, transport: "Transport"
) -> aiokafka.AIOKafkaConsumer:
conf = self.app.conf
auth_settings = credentials_to_aiokafka_auth(
conf.broker_credentials, conf.ssl_context
)
max_poll_interval = conf.broker_max_poll_interval or 0
return aiokafka.AIOKafkaConsumer(
client_id=conf.broker_client_id,
bootstrap_servers=server_list(transport.url, transport.default_port),
request_timeout_ms=int(conf.broker_request_timeout * 1000.0),
enable_auto_commit=True,
max_poll_records=conf.broker_max_poll_records,
max_poll_interval_ms=int(max_poll_interval * 1000.0),
auto_offset_reset=conf.consumer_auto_offset_reset,
check_crcs=conf.broker_check_crcs,
**auth_settings,
)
@cached_property
def trace_category(self) -> str:
return f"{self.app.conf.name}-_aiokafka"
def start_rebalancing_span(self) -> opentracing.Span:
return self._start_span("rebalancing", lazy=True)
def start_coordinator_span(self) -> opentracing.Span:
return self._start_span("coordinator")
def _start_span(self, name: str, *, lazy: bool = False) -> opentracing.Span:
tracer = self.app.tracer
if tracer is not None:
span = tracer.get_tracer(self.trace_category).start_span(
operation_name=name,
)
span.set_tag(tags.SAMPLING_PRIORITY, 1)
self.app._span_add_default_tags(span)
set_current_span(span)
if lazy:
self._transform_span_lazy(span)
return span
else:
return noop_span()
@no_type_check
def _transform_span_lazy(self, span: opentracing.Span) -> None:
# XXX slow
consumer = self
if typing.TYPE_CHECKING:
# MyPy completely disallows the statements below
# claiming it is an illegal dynamic baseclass.
# We know mypy, but do it anyway :D
pass
else:
cls = span.__class__
class LazySpan(cls):
def finish(self) -> None:
consumer._span_finish(span)
span._real_finish, span.finish = span.finish, LazySpan.finish
def _span_finish(self, span: opentracing.Span) -> None:
assert self._consumer is not None
if self._consumer._coordinator.generation == DEFAULT_GENERATION_ID:
self._on_span_generation_pending(span)
else:
self._on_span_generation_known(span)
def _on_span_generation_pending(self, span: opentracing.Span) -> None:
self._pending_rebalancing_spans.append(span)
def _on_span_generation_known(self, span: opentracing.Span) -> None:
if self._consumer:
coordinator = self._consumer._coordinator
coordinator_id = coordinator.coordinator_id
app_id = self.app.conf.id
generation = coordinator.generation
member_id = coordinator.member_id
try:
op_name = span.operation_name
set_tag = span.set_tag
except AttributeError: # pragma: no cover
pass # not a real span
else:
trace_id_str = f"reb-{app_id}-{generation}"
trace_id = murmur2(trace_id_str.encode())
span.context.trace_id = trace_id
if op_name.endswith(".REPLACE_WITH_MEMBER_ID"):
span.set_operation_name(f"rebalancing node {member_id}")
set_tag("kafka_generation", generation)
set_tag("kafka_member_id", member_id)
set_tag("kafka_coordinator_id", coordinator_id)
self.app._span_add_default_tags(span)
span._real_finish()
def _on_span_cancelled_early(self, span: opentracing.Span) -> None:
try:
op_name = span.operation_name
except AttributeError:
return
else:
span.set_operation_name(f"{op_name} (CANCELLED)")
span._real_finish()
def traced_from_parent_span(
self, parent_span: opentracing.Span, lazy: bool = False, **extra_context: Any
) -> Callable:
return traced_from_parent_span(
parent_span,
callback=self._transform_span_lazy if lazy else None,
**extra_context,
)
def flush_spans(self) -> None:
while self._pending_rebalancing_spans:
span = self._pending_rebalancing_spans.popleft()
self._on_span_cancelled_early(span)
def on_generation_id_known(self) -> None:
while self._pending_rebalancing_spans:
span = self._pending_rebalancing_spans.popleft()
self._on_span_generation_known(span)
def close(self) -> None:
"""Close consumer for graceful shutdown."""
if self._consumer is not None:
self._consumer._closed = True
asyncio.run_coroutine_threadsafe(
self._consumer._client.close(), self.app.loop
)
asyncio.run_coroutine_threadsafe(
self._consumer._coordinator.close(), self.app.loop
)
async def subscribe(self, topics: Iterable[str]) -> None:
"""Reset subscription (requires rebalance)."""
# XXX pattern does not work :/
await self.call_thread(
self._ensure_consumer().subscribe,
topics=set(topics),
listener=self._rebalance_listener,
)
async def seek_to_committed(self) -> Mapping[TP, int]:
"""Seek partitions to the last committed offset."""
return await self.call_thread(self._ensure_consumer().seek_to_committed)
async def commit(self, offsets: Mapping[TP, int]) -> bool:
"""Commit topic offsets."""
return await self.call_thread(self._commit, offsets)
async def _commit(self, offsets: Mapping[TP, int]) -> bool:
consumer = self._ensure_consumer()
now = monotonic()
commitable_offsets = {
tp: offset for tp, offset in offsets.items() if tp in self.assignment()
}
try:
aiokafka_offsets = {
ensure_aiokafka_TP(tp): OffsetAndMetadata(offset, "")
for tp, offset in commitable_offsets.items()
}
self.tp_last_committed_at.update({tp: now for tp in commitable_offsets})
await consumer.commit(aiokafka_offsets)
except CommitFailedError as exc:
if "already rebalanced" in str(exc):
return False
self.log.exception("Committing raised exception: %r", exc)
await self.crash(exc)
return False
except IllegalStateError as exc:
self.log.exception(
"Got exception: %r\nCurrent assignment: %r", exc, self.assignment()
)
await self.crash(exc)
return False
except Exception as exc:
self.log.exception(
"Got exception: %r\nCurrent assignment: %r", exc, self.assignment()
)
await self.crash(exc)
return False
return True
def verify_event_path(self, now: float, tp: TP) -> None:
# long function ahead, but not difficult to test
# as it always returns as soon as some condition is met.
if self._verify_aiokafka_event_path(now, tp):
# already logged error.
return None
parent = cast(Consumer, self.consumer)
app = parent.app
monitor = app.monitor
acks_enabled_for = app.topics.acks_enabled_for
secs_since_started = now - self.time_started
if monitor is not None: # need for .stream_inbound_time
aiotp = TopicPartition(tp.topic, tp.partition)
tp_state = self._ensure_consumer()._fetcher._subscriptions.subscription.assignment.state_value( # noqa: E501
aiotp
)
highwater = tp_state.highwater
committed_offset = tp_state.position
has_acks = acks_enabled_for(tp.topic)
if highwater is None:
if secs_since_started >= self.tp_stream_timeout_secs:
# AIOKAFKA HAS NOT UPDATED HIGHWATER SINCE STARTING
self.log.error(
SLOW_PROCESSING_NO_HIGHWATER_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return None
if has_acks and committed_offset is not None:
if highwater > committed_offset:
inbound = monitor.stream_inbound_time.get(tp)
if inbound is None:
if secs_since_started >= self.tp_stream_timeout_secs:
# AIOKAFKA IS FETCHING BUT STREAM IS NOT
# PROCESSING EVENTS (no events at all since
# start).
self._log_slow_processing_stream(
SLOW_PROCESSING_STREAM_IDLE_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return None
secs_since_stream = now - inbound
if secs_since_stream >= self.tp_stream_timeout_secs:
# AIOKAFKA IS FETCHING, AND STREAM WAS WORKING
# BEFORE BUT NOW HAS STOPPED PROCESSING
# (or processing of an event in the stream takes
# longer than tp_stream_timeout_secs).
self._log_slow_processing_stream(
SLOW_PROCESSING_STREAM_IDLE,
tp,
humanize_seconds_ago(secs_since_stream),
)
return None
last_commit = self.tp_last_committed_at.get(tp)
if last_commit is None:
if secs_since_started >= self.tp_commit_timeout_secs:
# AIOKAFKA IS FETCHING AND STREAM IS PROCESSING
# BUT WE HAVE NOT COMMITTED ANYTHING SINCE WORKER
# START.
self._log_slow_processing_commit(
SLOW_PROCESSING_NO_COMMIT_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return None
else:
secs_since_commit = now - last_commit
if secs_since_commit >= self.tp_commit_timeout_secs:
# AIOKAFKA IS FETCHING AND STREAM IS PROCESSING
# BUT WE HAVE NOT COMITTED ANYTHING IN A WHILE
# (commit offset is not advancing).
self._log_slow_processing_commit(
SLOW_PROCESSING_NO_RECENT_COMMIT,
tp,
humanize_seconds_ago(secs_since_commit),
)
return None
def verify_recovery_event_path(self, now: float, tp: TP) -> None:
self._verify_aiokafka_event_path(now, tp)
def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
"""Verify that :pypi:`aiokafka` event path is working.
Returns :const:`True` if any error was logged.
"""
consumer = self._ensure_consumer()
secs_since_started = now - self.time_started
aiotp = TopicPartition(tp.topic, tp.partition)
assignment = consumer._fetcher._subscriptions.subscription.assignment
if not assignment or not assignment.active:
self.log.error(f"No active partitions for {tp}")
return True
poll_at = None
aiotp_state = assignment.state_value(aiotp)
if aiotp_state and aiotp_state.timestamp:
poll_at = aiotp_state.timestamp / 1000
if poll_at is None:
if secs_since_started >= self.tp_fetch_request_timeout_secs:
# NO FETCH REQUEST SENT AT ALL SINCE WORKER START
self.log.error(
SLOW_PROCESSING_NO_FETCH_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return True
secs_since_request = now - poll_at
if secs_since_request >= self.tp_fetch_request_timeout_secs:
# NO REQUEST SENT BY AIOKAFKA IN THE LAST n SECONDS
self.log.error(
SLOW_PROCESSING_NO_RECENT_FETCH,
tp,
humanize_seconds_ago(secs_since_request),
)
return True
return False
def _log_slow_processing_stream(self, msg: str, *args: Any) -> None:
app = self.consumer.app
self._log_slow_processing(
msg,
*args,
causes=[
SLOW_PROCESSING_CAUSE_STREAM,
SLOW_PROCESSING_CAUSE_AGENT,
],
setting="stream_processing_timeout",
current_value=app.conf.stream_processing_timeout,
)
def _log_slow_processing_commit(self, msg: str, *args: Any) -> None:
app = self.consumer.app
self._log_slow_processing(
msg,
*args,
causes=[SLOW_PROCESSING_CAUSE_COMMIT],
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
)
def _make_slow_processing_error(
self, msg: str, causes: Iterable[str], setting: str, current_value: float
) -> str:
return " ".join(
[
msg,
SLOW_PROCESSING_EXPLAINED
% {"setting": setting, "current_value": current_value},
text.enumeration(causes, start=2, sep="\n\n"),
]
)
def _log_slow_processing(
self,
msg: str,
*args: Any,
causes: Iterable[str],
setting: str,
current_value: float,
) -> None:
return self.log.error(
self._make_slow_processing_error(msg, causes, setting, current_value),
*args,
)
async def position(self, tp: TP) -> Optional[int]:
"""Return the current position for topic partition."""
return await self.call_thread(self._ensure_consumer().position, tp)
async def seek_to_beginning(self, *partitions: _TopicPartition) -> None:
"""Seek list of offsets to the first available offset."""
await self.call_thread(self._ensure_consumer().seek_to_beginning, *partitions)
async def seek_wait(self, partitions: Mapping[TP, int]) -> None:
"""Seek partitions to specific offset and wait for operation."""
consumer = self._ensure_consumer()
await self.call_thread(self._seek_wait, consumer, partitions)
async def _seek_wait(
self, consumer: Consumer, partitions: Mapping[TP, int]
) -> None:
for tp, offset in partitions.items():
self.log.dev("SEEK %r -> %r", tp, offset)
consumer.seek(tp, offset)
if offset > 0:
self.consumer._read_offset[tp] = offset
elif tp in self.consumer._read_offset.keys():
del self.consumer._read_offset[tp]
await asyncio.wait_for(
asyncio.gather(*[consumer.position(tp) for tp in partitions]),
timeout=self.app.conf.broker_request_timeout,
)
def seek(self, partition: TP, offset: int) -> None:
"""Seek partition to specific offset."""
self._ensure_consumer().seek(partition, offset)
def assignment(self) -> Set[TP]:
"""Return the current assignment."""
return ensure_TPset(self._ensure_consumer().assignment())
def highwater(self, tp: TP) -> int:
"""Return the last offset in a specific partition."""
if self.consumer.in_transaction:
return self._ensure_consumer().last_stable_offset(tp)
else:
return self._ensure_consumer().highwater(tp)
def topic_partitions(self, topic: str) -> Optional[int]:
"""Return the number of partitions configured for topic by name."""
if self._consumer is not None:
return self._consumer._coordinator._metadata_snapshot.get(topic)
return None
async def earliest_offsets(self, *partitions: TP) -> Mapping[TP, int]:
"""Return the earliest offsets for a list of partitions."""
return await self.call_thread(
self._ensure_consumer().beginning_offsets, partitions
)
async def highwaters(self, *partitions: TP) -> Mapping[TP, int]:
"""Return the last offsets for a list of partitions."""
return await self.call_thread(self._highwaters, partitions)
async def _highwaters(self, partitions: List[TP]) -> Mapping[TP, int]:
consumer = self._ensure_consumer()
if self.consumer.in_transaction:
return {tp: consumer.last_stable_offset(tp) for tp in partitions}
else:
return cast(Mapping[TP, int], await consumer.end_offsets(partitions))
def _ensure_consumer(self) -> aiokafka.AIOKafkaConsumer:
if self._consumer is None:
raise ConsumerNotStarted("Consumer thread not yet started")
return self._consumer
async def getmany(
self, active_partitions: Optional[Set[TP]], timeout: float
) -> RecordMap:
"""Fetch batch of messages from server."""
# Implementation for the Fetcher service.
_consumer = self._ensure_consumer()
# NOTE: Since we are enqueing the fetch request,
# we need to check when dequeued that we are not in a rebalancing
# state at that point to return early, or we
# will create a deadlock (fetch request starts after flow stopped)
return await self.call_thread(
self._fetch_records,
_consumer,
active_partitions,
timeout=timeout,
max_records=_consumer._max_poll_records,
)
async def _fetch_records(
self,
consumer: aiokafka.AIOKafkaConsumer,
active_partitions: Set[TP],
timeout: Optional[float] = None,
max_records: Optional[int] = None,
) -> RecordMap:
if not self.consumer.flow_active:
return {}
fetcher = consumer._fetcher
if consumer._closed or fetcher._closed:
raise ConsumerStoppedError()
with fetcher._subscriptions.fetch_context():
try:
return await fetcher.fetched_records(
active_partitions,
timeout=timeout,
max_records=max_records,
)
finally:
pass
async def create_topic(
self,
topic: str,
partitions: int,
replication: int,
*,
config: Optional[Mapping[str, Any]] = None,
timeout: Seconds = 30.0,
retention: Optional[Seconds] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
if self.app.conf.topic_allow_declare:
transport = cast(Transport, self.consumer.transport)
_consumer = self._ensure_consumer()
_retention = int(want_seconds(retention) * 1000.0) if retention else None
if len(topic) > TOPIC_LENGTH_MAX:
raise ValueError(
f"Topic name {topic!r} is too long (max={TOPIC_LENGTH_MAX})"
)
await self.call_thread(
transport._create_topic,
self,
_consumer._client,
topic,
partitions,
replication,
config=config,
timeout=int(want_seconds(timeout) * 1000.0),
retention=_retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
else:
logger.warning(f"Topic creation disabled! Can't create topic {topic}")
def key_partition(
self, topic: str, key: Optional[bytes], partition: Optional[int] = None
) -> Optional[int]:
"""Hash key to determine partition destination."""
consumer = self._ensure_consumer()
metadata = consumer._client.cluster
partitions_for_topic = metadata.partitions_for_topic(topic)
if partitions_for_topic is None:
return None
if partition is not None:
assert partition >= 0
assert partition in partitions_for_topic, "Unrecognized partition"
return partition
all_partitions = list(partitions_for_topic)
available = list(metadata.available_partitions_for_topic(topic))
return self._partitioner(key, all_partitions, available)
[docs]class Producer(base.Producer):
"""Kafka producer using :pypi:`aiokafka`."""
logger = logger
allow_headers: bool = True
_producer: Optional[aiokafka.AIOKafkaProducer] = None
_transaction_producers: typing.Dict[str, aiokafka.AIOKafkaProducer] = {}
_trn_locks: typing.Dict[str, Lock] = {}
[docs] def create_threaded_producer(self):
return ThreadedProducer(default_producer=self, app=self.app)
def __post_init__(self) -> None:
self._send_on_produce_message = self.app.on_produce_message.send
if self.partitioner is None:
self.partitioner = DefaultPartitioner()
if self._api_version != "auto":
wanted_api_version = parse_kafka_version(self._api_version)
if wanted_api_version < (0, 11):
self.allow_headers = False
def _settings_default(self) -> Mapping[str, Any]:
transport = cast(Transport, self.transport)
return {
"bootstrap_servers": server_list(transport.url, transport.default_port),
"client_id": self.client_id,
"acks": self.acks,
"linger_ms": self.linger_ms,
"max_batch_size": self.max_batch_size,
"max_request_size": self.max_request_size,
"compression_type": self.compression_type,
"security_protocol": "SSL" if self.ssl_context else "PLAINTEXT",
"partitioner": self.partitioner,
"request_timeout_ms": int(self.request_timeout * 1000),
"api_version": self._api_version,
"metadata_max_age_ms": self.app.conf.producer_metadata_max_age_ms,
"connections_max_idle_ms": self.app.conf.producer_connections_max_idle_ms,
}
def _settings_auth(self) -> Mapping[str, Any]:
return credentials_to_aiokafka_auth(self.credentials, self.ssl_context)
[docs] async def begin_transaction(self, transactional_id: str) -> None:
"""Begin transaction by id."""
try:
transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer is None:
transaction_producer = self._new_producer(
transactional_id=transactional_id
)
await transaction_producer.start()
self._transaction_producers[transactional_id] = transaction_producer
self._trn_locks[transactional_id] = asyncio.Lock()
async with self._trn_locks[transactional_id]:
self._ensure_producer()
await self._transaction_producers[transactional_id].begin_transaction()
except ProducerFenced:
logger.info(f"Current Producer for transaction {transactional_id} fenced")
tp = self._transaction_producers.pop(transactional_id)
await tp.stop()
except Exception as ex:
logger.warning(
"Exception in begin_transaction for transaction "
f"{transactional_id} exception {ex}"
)
[docs] async def commit_transaction(self, transactional_id: str) -> None:
"""Commit transaction by id."""
try:
async with self._trn_locks[transactional_id]:
transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer:
await transaction_producer.commit_transaction()
logger.debug(f"Done committing transaction {transactional_id}")
else:
logger.warning(
f"Commit invoked for unknown transaction {transactional_id}"
)
except ProducerFenced:
logger.info(f"Current Producer for transaction {transactional_id} fenced")
tp = self._transaction_producers.pop(transactional_id)
await tp.stop()
except Exception as ex:
logger.warning(
"Exception in commit_transaction for transaction "
f"{transactional_id} exception {ex}"
)
[docs] async def abort_transaction(self, transactional_id: str) -> None:
"""Abort and rollback transaction by id."""
try:
async with self._trn_locks[transactional_id]:
transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer:
await transaction_producer.abort_transaction()
else:
logger.warning(
f"Abort invoked for unknown transaction {transactional_id}"
)
except ProducerFenced:
logger.info(f"Current Producer for transaction {transactional_id} fenced")
tp = self._transaction_producers.pop(transactional_id)
await tp.stop()
except Exception as ex:
logger.warning(
"Exception in abort_transaction for transaction "
f"{transactional_id} exception {ex}"
)
[docs] async def stop_transaction(self, transactional_id: str) -> None:
"""Stop transaction by id."""
# Let Kafka Manage the transactions. On rebalance a new producer with the
# same transaction id will fence off this producer
tp = self._transaction_producers.pop(transactional_id, None)
if tp:
await tp.stop()
[docs] async def maybe_begin_transaction(self, transactional_id: str) -> None:
"""Begin transaction (if one does not already exist)."""
await self.begin_transaction(transactional_id)
[docs] async def commit_transactions(
self,
tid_to_offset_map: Mapping[str, Mapping[TP, int]],
group_id: str,
start_new_transaction: bool = True,
) -> None:
"""Commit transactions."""
for transactional_id, offsets in tid_to_offset_map.items():
# get the producer
async with self._trn_locks[transactional_id]:
transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer:
logger.debug(
f"Sending offsets {offsets} to transaction {transactional_id}"
)
await transaction_producer.send_offsets_to_transaction(
offsets, group_id
)
await transaction_producer.commit_transaction()
logger.debug(f"Done committing transaction {transactional_id}")
if start_new_transaction:
logger.debug(f"Starting transaction {transactional_id}")
await transaction_producer.begin_transaction()
logger.debug(f"Started transaction {transactional_id}")
else:
logger.warning(
f"Commit invoked for unknown transaction {transactional_id}"
)
def _settings_extra(self) -> Mapping[str, Any]:
if self.app.in_transaction:
return {"acks": "all", "enable_idempotence": True}
return {}
def _new_producer(
self, transactional_id: Optional[str] = None
) -> aiokafka.AIOKafkaProducer:
return self._producer_type(
loop=self.loop,
**{
**self._settings_default(),
**self._settings_auth(),
**self._settings_extra(),
},
transactional_id=transactional_id,
)
@property
def _producer_type(self) -> Type[aiokafka.AIOKafkaProducer]:
return aiokafka.AIOKafkaProducer
[docs] async def create_topic(
self,
topic: str,
partitions: int,
replication: int,
*,
config: Optional[Mapping[str, Any]] = None,
timeout: Seconds = 20.0,
retention: Optional[Seconds] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
if self.app.conf.topic_allow_declare:
_retention = int(want_seconds(retention) * 1000.0) if retention else None
producer = self._ensure_producer()
await cast(Transport, self.transport)._create_topic(
self,
producer.client,
topic,
partitions,
replication,
config=config,
timeout=int(want_seconds(timeout) * 1000.0),
retention=_retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
await producer.client.force_metadata_update() # Fixes #499
else:
logger.warning(f"Topic creation disabled! Can't create topic {topic}")
def _ensure_producer(self) -> aiokafka.AIOKafkaProducer:
if self._producer is None:
raise NotReady("Producer service not yet started")
return self._producer
[docs] async def on_start(self) -> None:
"""Call when producer starts."""
await super().on_start()
producer = self._producer = self._new_producer()
self.beacon.add(producer)
await producer.start()
[docs] async def on_stop(self) -> None:
"""Call when producer stops."""
await super().on_stop()
cast(Transport, self.transport)._topic_waiters.clear()
producer, self._producer = self._producer, None
if producer is not None:
await producer.stop()
for transaction_producer in self._transaction_producers.values():
await transaction_producer.stop()
self._transaction_producers.clear()
[docs] async def send(
self,
topic: str,
key: Optional[bytes],
value: Optional[bytes],
partition: Optional[int],
timestamp: Optional[float],
headers: Optional[HeadersArg],
*,
transactional_id: Optional[str] = None,
) -> Awaitable[RecordMetadata]:
"""Schedule message to be transmitted by producer."""
transaction_producer = self._ensure_producer()
if transactional_id:
transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer is None:
raise ProducerSendError(
f"No transaction producer found for : {transactional_id}"
)
if headers is not None:
if isinstance(headers, Mapping):
headers = list(headers.items())
self._send_on_produce_message(
key=key,
value=value,
partition=partition,
timestamp=timestamp,
headers=headers,
)
if headers is not None and not self.allow_headers:
headers = None
timestamp_ms = int(timestamp * 1000.0) if timestamp else timestamp
try:
if transactional_id:
async with self._trn_locks[transactional_id]:
return cast(
Awaitable[RecordMetadata],
await transaction_producer.send(
topic,
value,
key=key,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
),
)
else:
return cast(
Awaitable[RecordMetadata],
await transaction_producer.send(
topic,
value,
key=key,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
),
)
except KafkaError as exc:
raise ProducerSendError(f"Error while sending: {exc!r}") from exc
[docs] async def send_and_wait(
self,
topic: str,
key: Optional[bytes],
value: Optional[bytes],
partition: Optional[int],
timestamp: Optional[float],
headers: Optional[HeadersArg],
*,
transactional_id: Optional[str] = None,
) -> RecordMetadata:
"""Send message and wait for it to be transmitted."""
fut = await self.send(
topic,
key=key,
value=value,
partition=partition,
timestamp=timestamp,
headers=headers,
transactional_id=transactional_id,
)
return await fut
[docs] async def flush(self) -> None:
"""Wait for producer to finish transmitting all buffered messages."""
await self.buffer.flush()
if self._producer is not None:
await self._producer.flush()
for transaction_producer in self._transaction_producers.values():
await transaction_producer.flush()
[docs] def key_partition(self, topic: str, key: bytes) -> TP:
"""Hash key to determine partition destination."""
producer = self._ensure_producer()
partition = producer._partition(
topic,
partition=None,
key=None,
value=None,
serialized_key=key,
serialized_value=None,
)
return TP(topic, partition)
[docs]class Transport(base.Transport):
"""Kafka transport using :pypi:`aiokafka`."""
Consumer: ClassVar[Type[ConsumerT]]
Consumer = Consumer
Producer: ClassVar[Type[ProducerT]]
Producer = Producer
default_port = 9092
driver_version = f"aiokafka={aiokafka.__version__}"
_topic_waiters: MutableMapping[str, StampedeWrapper]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._topic_waiters = {}
def _topic_config(
self,
retention: Optional[int] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
) -> MutableMapping[str, Any]:
config: MutableMapping[str, Any] = {}
cleanup_flags: Set[str] = set()
if compacting:
cleanup_flags |= {"compact"}
if deleting:
cleanup_flags |= {"delete"}
if cleanup_flags:
config["cleanup.policy"] = ",".join(sorted(cleanup_flags))
if retention:
config["retention.ms"] = retention
return config
async def _create_topic(
self,
owner: Service,
client: aiokafka.AIOKafkaClient,
topic: str,
partitions: int,
replication: int,
**kwargs: Any,
) -> None:
assert topic is not None
try:
wrap = self._topic_waiters[topic]
except KeyError:
wrap = self._topic_waiters[topic] = StampedeWrapper(
self._really_create_topic,
owner,
client,
topic,
partitions,
replication,
loop=self.loop,
**kwargs,
)
try:
await wrap()
except Exception:
self._topic_waiters.pop(topic, None)
raise
async def _get_controller_node(
self, owner: Service, client: aiokafka.AIOKafkaClient, timeout: int = 30000
) -> Optional[int]: # pragma: no cover
nodes = [broker.nodeId for broker in client.cluster.brokers()]
for node_id in nodes:
if node_id is None:
raise NotReady("Not connected to Kafka Broker")
request = MetadataRequest_v1([])
wait_result = await owner.wait(
client.send(node_id, request),
timeout=timeout,
)
if wait_result.stopped:
owner.log.info("Shutting down - skipping creation.")
return None
response = wait_result.result
return response.controller_id
raise Exception("Controller node not found")
async def _really_create_topic(
self,
owner: Service,
client: aiokafka.AIOKafkaClient,
topic: str,
partitions: int,
replication: int,
*,
config: Optional[Mapping[str, Any]] = None,
timeout: int = 30000,
retention: Optional[int] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
ensure_created: bool = False,
) -> None: # pragma: no cover
owner.log.info("Creating topic %r", topic)
if topic in client.cluster.topics():
owner.log.debug("Topic %r exists, skipping creation.", topic)
return
protocol_version = 1
extra_configs = config or {}
config = self._topic_config(retention, compacting, deleting)
config.update(extra_configs)
controller_node = await self._get_controller_node(
owner, client, timeout=timeout
)
owner.log.debug("Found controller: %r", controller_node)
if controller_node is None:
if owner.should_stop:
owner.log.info("Shutting down hence controller not found")
return
else:
raise Exception("Controller node is None")
request = CreateTopicsRequest[protocol_version](
[(topic, partitions, replication, [], list(config.items()))],
timeout,
False,
)
wait_result = await owner.wait(
client.send(controller_node, request),
timeout=timeout,
)
if wait_result.stopped:
owner.log.debug("Shutting down - skipping creation.")
return
response = wait_result.result
assert len(response.topic_error_codes), "single topic"
_, code, reason = response.topic_error_codes[0]
if code != 0:
if not ensure_created and code == TopicExistsError.errno:
owner.log.debug("Topic %r exists, skipping creation.", topic)
return
elif code == NotControllerError.errno:
raise RuntimeError(f"Invalid controller: {controller_node}")
else:
raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}")
else:
owner.log.info("Topic %r created.", topic)
return
def credentials_to_aiokafka_auth(
credentials: Optional[CredentialsT] = None, ssl_context: Any = None
) -> Mapping:
if credentials is not None:
if isinstance(credentials, SSLCredentials):
return {
"security_protocol": credentials.protocol.value,
"ssl_context": credentials.context,
}
elif isinstance(credentials, OAuthCredentials):
return {
"security_protocol": credentials.protocol.value,
"sasl_mechanism": credentials.mechanism.value,
"sasl_oauth_token_provider": credentials.oauth_cb,
"ssl_context": credentials.ssl_context,
}
elif isinstance(credentials, SASLCredentials):
return {
"security_protocol": credentials.protocol.value,
"sasl_mechanism": credentials.mechanism.value,
"sasl_plain_username": credentials.username,
"sasl_plain_password": credentials.password,
"ssl_context": credentials.ssl_context,
}
elif isinstance(credentials, GSSAPICredentials):
return {
"security_protocol": credentials.protocol.value,
"sasl_mechanism": credentials.mechanism.value,
"sasl_kerberos_service_name": credentials.kerberos_service_name,
"sasl_kerberos_domain_name": credentials.kerberos_domain_name,
"ssl_context": credentials.ssl_context,
}
else:
raise ImproperlyConfigured(f"aiokafka does not support {credentials}")
elif ssl_context is not None:
return {
"security_protocol": "SSL",
"ssl_context": ssl_context,
}
else:
return {"security_protocol": "PLAINTEXT"}
def ensure_aiokafka_TP(tp: TP) -> _TopicPartition:
"""Convert Faust ``TP`` to aiokafka ``TopicPartition``."""
return (
tp
if isinstance(tp, _TopicPartition)
else _TopicPartition(tp.topic, tp.partition)
)
def ensure_aiokafka_TPset(tps: Iterable[TP]) -> Set[_TopicPartition]:
"""Convert set of Faust ``TP`` to aiokafka ``TopicPartition``."""
return {ensure_aiokafka_TP(tp) for tp in tps}