faust.transport.base
¶
Base message transport implementation.
The Transport is responsible for:
Holds reference to the app that created it.
Creates new consumers/producers.
To see a reference transport implementation go to:
faust/transport/drivers/aiokafka.py
- class faust.transport.base.Conductor(app: AppT, **kwargs: Any)[source]¶
Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
- logger: logging.Logger = <Logger faust.transport.conductor (WARNING)>¶
- async commit(topics: AbstractSet[Union[str, TP]]) bool [source]¶
Commit offsets in topics.
- Return type:
- acks_enabled_for(topic: str) bool [source]¶
Return
True
if acks are enabled for topic by name.- Return type:
- class faust.transport.base.Consumer(transport: TransportT, callback: Callable[[Message], Awaitable], on_partitions_revoked: Callable[[Set[TP]], Awaitable[None]], on_partitions_assigned: Callable[[Set[TP]], Awaitable[None]], *, commit_interval: Optional[float] = None, commit_livelock_soft_timeout: Optional[float] = None, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]¶
Base Consumer.
- logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>¶
- consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()¶
Tuple of exception types that may be raised when the underlying consumer driver is stopped.
- can_resume_flow: Event¶
- suspend_flow: Event¶
- not_waiting_next_records: Event¶
- on_init_dependencies() Iterable[ServiceT] [source]¶
Return list of services this consumer depends on.
- Return type:
_GenericAlias
[ServiceT
]
- async perform_seek() None [source]¶
Seek all partitions to their current committed position.
- Return type:
None
- abstract async seek_to_committed() Mapping[TP, int] [source]¶
Seek all partitions to their committed offsets.
- async seek(partition: TP, offset: int) None [source]¶
Seek partition to specific offset.
- Return type:
None
- async wait_for_stopped_flow() None [source]¶
Wait until the consumer is not waiting on any newly fetched records.
Useful for scenarios where the consumer needs to be stopped to change the position of the fetcher to something other than the committed offset. There is a chance that getmany forces a seek to the committed offsets if the fetcher returns while the consumer is stopped. This can be prevented by waiting for the fetcher to finish (by default every second).
- Return type:
None
- pause_partitions(tps: Iterable[TP]) None [source]¶
Pause fetching from partitions.
- Return type:
None
- resume_partitions(tps: Iterable[TP]) None [source]¶
Resume fetching from partitions.
- Return type:
None
- async on_partitions_revoked(revoked: Set[TP]) None [source]¶
Call during rebalancing when partitions are being revoked.
- Return type:
None
- async on_partitions_assigned(assigned: Set[TP], generation_id: int = 0) None [source]¶
Call during rebalancing when partitions are being assigned.
- Return type:
None
- async getmany(timeout: float) AsyncIterator[Tuple[TP, Message]] [source]¶
Fetch batch of messages from server.
- track_message(message: Message) None [source]¶
Track message and mark it as pending ack.
- Return type:
None
- async wait_empty() None [source]¶
Wait for all messages that started processing to be acked.
- Return type:
None
- async commit_and_end_transactions() None [source]¶
Commit all safe offsets and end transaction.
- Return type:
None
- async commit(topics: Optional[AbstractSet[Union[str, TP]]] = None, start_new_transaction: bool = True) bool [source]¶
Maybe commit the offset for all or specific topics.
- async maybe_wait_for_commit_to_finish() bool [source]¶
Wait for any existing commit operation to finish.
- Return type:
- async force_commit(topics: AbstractSet[Union[str, TP]] = None, start_new_transaction: bool = True) bool [source]¶
Force offset commit.
- Return type:
- async on_task_error(exc: BaseException) None [source]¶
Call when processing a message failed.
- Return type:
None
- class faust.transport.base.Fetcher(app: AppT, **kwargs: Any)[source]¶
Service fetching messages from Kafka.
- logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>¶
- class faust.transport.base.Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]¶
Base Producer.
- threaded_producer: Optional[ServiceThread] = None¶
- async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata] [source]¶
Schedule message to be sent by producer.
- Return type:
_GenericAlias
[RecordMetadata
]
- send_soon(fut: FutureMessage) None [source]¶
- Return type:
None
- async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata [source]¶
Send message and wait for it to be transmitted.
- Return type:
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None [source]¶
Create/declare topic on server.
- Return type:
None
- async begin_transaction(transactional_id: str) None [source]¶
Begin transaction by id.
- Return type:
None
- async commit_transaction(transactional_id: str) None [source]¶
Commit transaction by id.
- Return type:
None
- async abort_transaction(transactional_id: str) None [source]¶
Abort and rollback transaction by id.
- Return type:
None
- async stop_transaction(transactional_id: str) None [source]¶
Stop transaction by id.
- Return type:
None
- async maybe_begin_transaction(transactional_id: str) None [source]¶
Begin transaction by id, if not already started.
- Return type:
None
- async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None [source]¶
Commit transactions.
- Return type:
None
- logger: logging.Logger = <Logger faust.transport.producer (WARNING)>¶
- class faust.transport.base.Transport(url: List[URL], app: AppT, loop: Optional[AbstractEventLoop] = None)[source]¶
Message transport implementation.
- class Consumer(transport: TransportT, callback: Callable[[Message], Awaitable], on_partitions_revoked: Callable[[Set[TP]], Awaitable[None]], on_partitions_assigned: Callable[[Set[TP]], Awaitable[None]], *, commit_interval: Optional[float] = None, commit_livelock_soft_timeout: Optional[float] = None, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)¶
Base Consumer.
- async commit(topics: Optional[AbstractSet[Union[str, TP]]] = None, start_new_transaction: bool = True) bool ¶
Maybe commit the offset for all or specific topics.
- async commit_and_end_transactions() None ¶
Commit all safe offsets and end transaction.
- Return type:
None
- consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()¶
Tuple of exception types that may be raised when the underlying consumer driver is stopped.
- async force_commit(topics: AbstractSet[Union[str, TP]] = None, start_new_transaction: bool = True) bool ¶
Force offset commit.
- Return type:
- async getmany(timeout: float) AsyncIterator[Tuple[TP, Message]] ¶
Fetch batch of messages from server.
- logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>¶
- async maybe_wait_for_commit_to_finish() bool ¶
Wait for any existing commit operation to finish.
- Return type:
- on_init_dependencies() Iterable[ServiceT] ¶
Return list of services this consumer depends on.
- Return type:
_GenericAlias
[ServiceT
]
- async on_partitions_assigned(assigned: Set[TP], generation_id: int = 0) None ¶
Call during rebalancing when partitions are being assigned.
- Return type:
None
- async on_partitions_revoked(revoked: Set[TP]) None ¶
Call during rebalancing when partitions are being revoked.
- Return type:
None
- async on_task_error(exc: BaseException) None ¶
Call when processing a message failed.
- Return type:
None
- async perform_seek() None ¶
Seek all partitions to their current committed position.
- Return type:
None
- abstract async seek_to_committed() Mapping[TP, int] ¶
Seek all partitions to their committed offsets.
- property unacked: Set[Message]¶
Return the set of currently unacknowledged messages. :rtype:
_GenericAlias
[Message
]
- async wait_empty() None ¶
Wait for all messages that started processing to be acked.
- Return type:
None
- async wait_for_stopped_flow() None ¶
Wait until the consumer is not waiting on any newly fetched records.
Useful for scenarios where the consumer needs to be stopped to change the position of the fetcher to something other than the committed offset. There is a chance that getmany forces a seek to the committed offsets if the fetcher returns while the consumer is stopped. This can be prevented by waiting for the fetcher to finish (by default every second).
- Return type:
None
- can_resume_flow: Event¶
- suspend_flow: Event¶
- not_waiting_next_records: Event¶
- log: CompositeLogger¶
- transport: TransportT¶
The transport that created this Consumer.
- transactions: TransactionManagerT¶
- commit_interval: float¶
How often we commit topic offsets. See
broker_commit_interval
.
- randomly_assigned_topics: Set[str]¶
Set of topic names that are considered “randomly assigned”. This means we don’t crash if it’s not part of our assignment. Used by e.g. the leader assignor service.
- scheduler: SchedulingStrategyT¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)¶
Base Producer.
- async abort_transaction(transactional_id: str) None ¶
Abort and rollback transaction by id.
- Return type:
None
- async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None ¶
Commit transactions.
- Return type:
None
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None ¶
Create/declare topic on server.
- Return type:
None
- logger: logging.Logger = <Logger faust.transport.producer (WARNING)>¶
- async maybe_begin_transaction(transactional_id: str) None ¶
Begin transaction by id, if not already started.
- Return type:
None
- async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata] ¶
Schedule message to be sent by producer.
- Return type:
_GenericAlias
[RecordMetadata
]
- async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata ¶
Send message and wait for it to be transmitted.
- Return type:
- send_soon(fut: FutureMessage) None ¶
- Return type:
None
- threaded_producer: Optional[ServiceThread] = None¶
- log: CompositeLogger¶
- transport: TransportT¶
The transport that created this Producer.
- buffer: ProducerBufferT¶
- ssl_context: Optional[ssl.SSLContext]¶
- partitioner: Optional[PartitionerT]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class TransactionManager(transport: TransportT, *, consumer: ConsumerT, producer: ProducerT, **kwargs: Any)¶
Manage producer transactions.
- async commit(offsets: Mapping[TP, int], start_new_transaction: bool = True) bool ¶
Commit offsets for partitions.
- Return type:
- async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 30.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None ¶
Create/declare topic on server.
- Return type:
None
- logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>¶
- async on_partitions_revoked(revoked: Set[TP]) None ¶
Call when the cluster is rebalancing and partitions are revoked.
- Return type:
None
- async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) None ¶
Call when the cluster is rebalancing.
- Return type:
None
- async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata] ¶
Schedule message to be sent by producer.
- Return type:
_GenericAlias
[RecordMetadata
]
- async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata ¶
Send message and wait for it to be transmitted.
- Return type:
- send_soon(fut: FutureMessage) None ¶
- Return type:
None
- transactional_id_format = '{group_id}-{tpg.group}-{tpg.partition}'¶
- class Conductor(app: AppT, **kwargs: Any)¶
Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
- logger: logging.Logger = <Logger faust.transport.conductor (WARNING)>¶
- async on_partitions_assigned(assigned: Set[TP]) None ¶
Call when cluster is rebalancing and partitions are assigned.
- Return type:
None
- on_message: ConsumerCallback¶
- app: _AppT¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class Fetcher(app: AppT, **kwargs: Any)¶
Service fetching messages from Kafka.
- logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- driver_version: str¶
String identifying the underlying driver used for this transport. E.g. for https://pypi.org/project/aiokafka/ this could be
aiokafka 0.4.1
.
- create_consumer(callback: Callable[[Message], Awaitable], **kwargs: Any) ConsumerT [source]¶
Create new consumer.
- Return type:
- create_transaction_manager(consumer: ConsumerT, producer: ProducerT, **kwargs: Any) TransactionManagerT [source]¶
Create new transaction manager.
- Return type:
- create_conductor(**kwargs: Any) ConductorT [source]¶
Create new consumer conductor.
- Return type: