faust.transport.drivers.aiokafka

Message transport using https://pypi.org/project/aiokafka/.

class faust.transport.drivers.aiokafka.Consumer(*args: Any, **kwargs: Any)[source]

Kafka consumer using https://pypi.org/project/aiokafka/.

logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
RebalanceListener

alias of ConsumerRebalanceListener

consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = (<class 'aiokafka.errors.ConsumerStoppedError'>,)

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 30.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None[source]

Create/declare topic on server.

Return type:

None

async on_stop() None[source]

Call when consumer is stopping.

Return type:

None

verify_event_path(now: float, tp: TP) None[source]
Return type:

None

class faust.transport.drivers.aiokafka.Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]

Kafka producer using https://pypi.org/project/aiokafka/.

logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
allow_headers: bool = True
create_threaded_producer()[source]
async begin_transaction(transactional_id: str) None[source]

Begin transaction by id.

Return type:

None

async commit_transaction(transactional_id: str) None[source]

Commit transaction by id.

Return type:

None

async abort_transaction(transactional_id: str) None[source]

Abort and rollback transaction by id.

Return type:

None

async stop_transaction(transactional_id: str) None[source]

Stop transaction by id.

Return type:

None

async maybe_begin_transaction(transactional_id: str) None[source]

Begin transaction (if one does not already exist).

Return type:

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None[source]

Commit transactions.

Return type:

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 20.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None[source]

Create/declare topic on server.

Return type:

None

async on_start() None[source]

Call when producer starts.

Return type:

None

async on_stop() None[source]

Call when producer stops.

Return type:

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata][source]

Schedule message to be transmitted by producer.

Return type:

_GenericAlias[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata[source]

Send message and wait for it to be transmitted.

Return type:

RecordMetadata

async flush() None[source]

Wait for producer to finish transmitting all buffered messages.

Return type:

None

key_partition(topic: str, key: bytes) TP[source]

Hash key to determine partition destination.

Return type:

TP

supports_headers() bool[source]

Return True if message headers are supported.

Return type:

bool

class faust.transport.drivers.aiokafka.Transport(*args: Any, **kwargs: Any)[source]

Kafka transport using https://pypi.org/project/aiokafka/.

class Consumer(*args: Any, **kwargs: Any)

Kafka consumer using https://pypi.org/project/aiokafka/.

RebalanceListener

alias of ConsumerRebalanceListener

consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = (<class 'aiokafka.errors.ConsumerStoppedError'>,)

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 30.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None

Create/declare topic on server.

Return type:

None

logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
async on_stop() None

Call when consumer is stopping.

Return type:

None

verify_event_path(now: float, tp: TP) None
Return type:

None

app: AppT
can_resume_flow: Event
suspend_flow: Event
not_waiting_next_records: Event
log: CompositeLogger
transport: TransportT

The transport that created this Consumer.

transactions: TransactionManagerT
commit_interval: float

How often we commit topic offsets. See broker_commit_interval.

randomly_assigned_topics: Set[str]

Set of topic names that are considered “randomly assigned”. This means we don’t crash if it’s not part of our assignment. Used by e.g. the leader assignor service.

in_transaction: bool
scheduler: SchedulingStrategyT
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)

Kafka producer using https://pypi.org/project/aiokafka/.

async abort_transaction(transactional_id: str) None

Abort and rollback transaction by id.

Return type:

None

allow_headers: bool = True
async begin_transaction(transactional_id: str) None

Begin transaction by id.

Return type:

None

async commit_transaction(transactional_id: str) None

Commit transaction by id.

Return type:

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None

Commit transactions.

Return type:

None

create_threaded_producer()
async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 20.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None

Create/declare topic on server.

Return type:

None

async flush() None

Wait for producer to finish transmitting all buffered messages.

Return type:

None

key_partition(topic: str, key: bytes) TP

Hash key to determine partition destination.

Return type:

TP

logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
async maybe_begin_transaction(transactional_id: str) None

Begin transaction (if one does not already exist).

Return type:

None

async on_start() None

Call when producer starts.

Return type:

None

async on_stop() None

Call when producer stops.

Return type:

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata]

Schedule message to be transmitted by producer.

Return type:

_GenericAlias[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata

Send message and wait for it to be transmitted.

Return type:

RecordMetadata

async stop_transaction(transactional_id: str) None

Stop transaction by id.

Return type:

None

supports_headers() bool

Return True if message headers are supported.

Return type:

bool

app: AppT
log: CompositeLogger
transport: TransportT

The transport that created this Producer.

buffer: ProducerBufferT
client_id: str
linger_ms: int
max_batch_size: int
acks: int
max_request_size: int
compression_type: Optional[str]
ssl_context: Optional[ssl.SSLContext]
partitioner: Optional[PartitionerT]
request_timeout: float
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
default_port = 9092
driver_version: str = 'aiokafka=0.12.0'

String identifying the underlying driver used for this transport. E.g. for https://pypi.org/project/aiokafka/ this could be aiokafka 0.4.1.