faust.transport.drivers.aiokafka
¶
Message transport using https://pypi.org/project/aiokafka/.
- class faust.transport.drivers.aiokafka.Consumer(*args: Any, **kwargs: Any)[source]¶
Kafka consumer using https://pypi.org/project/aiokafka/.
- logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>¶
- RebalanceListener¶
alias of
ConsumerRebalanceListener
- consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = (<class 'aiokafka.errors.ConsumerStoppedError'>,)¶
Tuple of exception types that may be raised when the underlying consumer driver is stopped.
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 30.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None [source]¶
Create/declare topic on server.
- Return type:
None
- class faust.transport.drivers.aiokafka.Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]¶
Kafka producer using https://pypi.org/project/aiokafka/.
- logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>¶
- async begin_transaction(transactional_id: str) None [source]¶
Begin transaction by id.
- Return type:
None
- async commit_transaction(transactional_id: str) None [source]¶
Commit transaction by id.
- Return type:
None
- async abort_transaction(transactional_id: str) None [source]¶
Abort and rollback transaction by id.
- Return type:
None
- async stop_transaction(transactional_id: str) None [source]¶
Stop transaction by id.
- Return type:
None
- async maybe_begin_transaction(transactional_id: str) None [source]¶
Begin transaction (if one does not already exist).
- Return type:
None
- async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None [source]¶
Commit transactions.
- Return type:
None
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 20.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None [source]¶
Create/declare topic on server.
- Return type:
None
- async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata] [source]¶
Schedule message to be transmitted by producer.
- Return type:
_GenericAlias
[RecordMetadata
]
- async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata [source]¶
Send message and wait for it to be transmitted.
- Return type:
- async flush() None [source]¶
Wait for producer to finish transmitting all buffered messages.
- Return type:
None
- class faust.transport.drivers.aiokafka.Transport(*args: Any, **kwargs: Any)[source]¶
Kafka transport using https://pypi.org/project/aiokafka/.
- class Consumer(*args: Any, **kwargs: Any)¶
Kafka consumer using https://pypi.org/project/aiokafka/.
- RebalanceListener¶
alias of
ConsumerRebalanceListener
- consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = (<class 'aiokafka.errors.ConsumerStoppedError'>,)¶
Tuple of exception types that may be raised when the underlying consumer driver is stopped.
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 30.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None ¶
Create/declare topic on server.
- Return type:
None
- logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>¶
- log: CompositeLogger¶
- transport: TransportT¶
The transport that created this Consumer.
- transactions: TransactionManagerT¶
- commit_interval: float¶
How often we commit topic offsets. See
broker_commit_interval
.
- randomly_assigned_topics: Set[str]¶
Set of topic names that are considered “randomly assigned”. This means we don’t crash if it’s not part of our assignment. Used by e.g. the leader assignor service.
- scheduler: SchedulingStrategyT¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)¶
Kafka producer using https://pypi.org/project/aiokafka/.
- async abort_transaction(transactional_id: str) None ¶
Abort and rollback transaction by id.
- Return type:
None
- async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None ¶
Commit transactions.
- Return type:
None
- create_threaded_producer()¶
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 20.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None ¶
Create/declare topic on server.
- Return type:
None
- async flush() None ¶
Wait for producer to finish transmitting all buffered messages.
- Return type:
None
- key_partition(topic: str, key: bytes) TP ¶
Hash key to determine partition destination.
- Return type:
- logger: logging.Logger = <Logger faust.transport.drivers.aiokafka (WARNING)>¶
- async maybe_begin_transaction(transactional_id: str) None ¶
Begin transaction (if one does not already exist).
- Return type:
None
- async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata] ¶
Schedule message to be transmitted by producer.
- Return type:
_GenericAlias
[RecordMetadata
]
- async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata ¶
Send message and wait for it to be transmitted.
- Return type:
- log: CompositeLogger¶
- transport: TransportT¶
The transport that created this Producer.
- buffer: ProducerBufferT¶
- ssl_context: Optional[ssl.SSLContext]¶
- partitioner: Optional[PartitionerT]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- default_port = 9092¶
- driver_version: str = 'aiokafka=0.12.0'¶
String identifying the underlying driver used for this transport. E.g. for https://pypi.org/project/aiokafka/ this could be
aiokafka 0.4.1
.