faust.transport.base

Base message transport implementation.

The Transport is responsible for:

  • Holds reference to the app that created it.

  • Creates new consumers/producers.

To see a reference transport implementation go to: faust/transport/drivers/aiokafka.py

class faust.transport.base.Conductor(app: AppT, **kwargs: Any)[source]

Manages the channels that subscribe to topics.

  • Consumes messages from topic using a single consumer.

  • Forwards messages to all channels subscribing to a topic.

logger: logging.Logger = <Logger faust.transport.conductor (WARNING)>
async commit(topics: AbstractSet[Union[str, TP]]) bool[source]

Commit offsets in topics.

Return type:

bool

acks_enabled_for(topic: str) bool[source]

Return True if acks are enabled for topic by name.

Return type:

bool

async wait_for_subscriptions() None[source]

Wait for consumer to be subscribed.

Return type:

None

async maybe_wait_for_subscriptions() None[source]
Return type:

None

async on_partitions_assigned(assigned: Set[TP]) None[source]

Call when cluster is rebalancing and partitions are assigned.

Return type:

None

async on_client_only_start() None[source]
Return type:

None

clear() None[source]

Clear all subscriptions.

Return type:

None

add(topic: TopicT) None[source]

Register topic to be subscribed.

Return type:

None

discard(topic: Any) None[source]

Unregister topic from conductor.

Return type:

None

property label: str

Return label for use in logs. :rtype: str

property shortlabel: str

Return short label for use in logs. :rtype: str

property acking_topics: Set[str]
Return type:

_GenericAlias[str]

class faust.transport.base.Consumer(transport: TransportT, callback: Callable[[Message], Awaitable], on_partitions_revoked: Callable[[Set[TP]], Awaitable[None]], on_partitions_assigned: Callable[[Set[TP]], Awaitable[None]], *, commit_interval: Optional[float] = None, commit_livelock_soft_timeout: Optional[float] = None, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]

Base Consumer.

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

flow_active: bool = True
app: AppT
can_resume_flow: Event
suspend_flow: Event
not_waiting_next_records: Event
on_init_dependencies() Iterable[ServiceT][source]

Return list of services this consumer depends on.

Return type:

_GenericAlias[ServiceT]

async on_restart() None[source]

Call when the consumer is restarted.

Return type:

None

on_buffer_full(tp: TP) None[source]
Return type:

None

on_buffer_drop(tp: TP) None[source]
Return type:

None

async perform_seek() None[source]

Seek all partitions to their current committed position.

Return type:

None

abstract async seek_to_committed() Mapping[TP, int][source]

Seek all partitions to their committed offsets.

Return type:

_GenericAlias[TP, int]

async seek(partition: TP, offset: int) None[source]

Seek partition to specific offset.

Return type:

None

stop_flow() None[source]

Block consumer from processing any more messages.

Return type:

None

resume_flow() None[source]

Allow consumer to process messages.

Return type:

None

async wait_for_stopped_flow() None[source]

Wait until the consumer is not waiting on any newly fetched records.

Useful for scenarios where the consumer needs to be stopped to change the position of the fetcher to something other than the committed offset. There is a chance that getmany forces a seek to the committed offsets if the fetcher returns while the consumer is stopped. This can be prevented by waiting for the fetcher to finish (by default every second).

Return type:

None

pause_partitions(tps: Iterable[TP]) None[source]

Pause fetching from partitions.

Return type:

None

resume_partitions(tps: Iterable[TP]) None[source]

Resume fetching from partitions.

Return type:

None

async on_partitions_revoked(revoked: Set[TP]) None[source]

Call during rebalancing when partitions are being revoked.

Return type:

None

async on_partitions_assigned(assigned: Set[TP], generation_id: int = 0) None[source]

Call during rebalancing when partitions are being assigned.

Return type:

None

async getmany(timeout: float) AsyncIterator[Tuple[TP, Message]][source]

Fetch batch of messages from server.

Return type:

_GenericAlias[_GenericAlias[TP, Message]]

track_message(message: Message) None[source]

Track message and mark it as pending ack.

Return type:

None

ack(message: Message) bool[source]

Mark message as being acknowledged by stream.

Return type:

bool

async wait_empty() None[source]

Wait for all messages that started processing to be acked.

Return type:

None

async commit_and_end_transactions() None[source]

Commit all safe offsets and end transaction.

Return type:

None

async on_stop() None[source]

Call when consumer is stopping.

Return type:

None

async verify_all_partitions_active() None[source]
Return type:

None

verify_event_path(now: float, tp: TP) None[source]
Return type:

None

verify_recovery_event_path(now: float, tp: TP) None[source]
Return type:

None

async commit(topics: Optional[AbstractSet[Union[str, TP]]] = None, start_new_transaction: bool = True) bool[source]

Maybe commit the offset for all or specific topics.

Parameters:

topics (_UnionGenericAlias[_GenericAlias[_UnionGenericAlias[str, TP]], None]) – Set containing topics and/or TopicPartitions to commit.

Return type:

bool

async maybe_wait_for_commit_to_finish() bool[source]

Wait for any existing commit operation to finish.

Return type:

bool

async force_commit(topics: AbstractSet[Union[str, TP]] = None, start_new_transaction: bool = True) bool[source]

Force offset commit.

Return type:

bool

async on_task_error(exc: BaseException) None[source]

Call when processing a message failed.

Return type:

None

close() None[source]

Close consumer for graceful shutdown.

Return type:

None

property unacked: Set[Message]

Return the set of currently unacknowledged messages. :rtype: _GenericAlias[Message]

class faust.transport.base.Fetcher(app: AppT, **kwargs: Any)[source]

Service fetching messages from Kafka.

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
app: AppT
async on_stop() None[source]

Call when the fetcher is stopping.

Return type:

None

class faust.transport.base.Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]

Base Producer.

app: AppT
threaded_producer: Optional[ServiceThread] = None
async on_start() None[source]

Service is starting.

Return type:

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata][source]

Schedule message to be sent by producer.

Return type:

_GenericAlias[RecordMetadata]

send_soon(fut: FutureMessage) None[source]
Return type:

None

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata[source]

Send message and wait for it to be transmitted.

Return type:

RecordMetadata

async flush() None[source]

Flush all in-flight messages.

Return type:

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None[source]

Create/declare topic on server.

Return type:

None

key_partition(topic: str, key: bytes) TP[source]

Hash key to determine partition.

Return type:

TP

async begin_transaction(transactional_id: str) None[source]

Begin transaction by id.

Return type:

None

async commit_transaction(transactional_id: str) None[source]

Commit transaction by id.

Return type:

None

async abort_transaction(transactional_id: str) None[source]

Abort and rollback transaction by id.

Return type:

None

async stop_transaction(transactional_id: str) None[source]

Stop transaction by id.

Return type:

None

async maybe_begin_transaction(transactional_id: str) None[source]

Begin transaction by id, if not already started.

Return type:

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None[source]

Commit transactions.

Return type:

None

logger: logging.Logger = <Logger faust.transport.producer (WARNING)>
supports_headers() bool[source]

Return True if headers are supported by this transport.

Return type:

bool

class faust.transport.base.Transport(url: List[URL], app: AppT, loop: Optional[AbstractEventLoop] = None)[source]

Message transport implementation.

class Consumer(transport: TransportT, callback: Callable[[Message], Awaitable], on_partitions_revoked: Callable[[Set[TP]], Awaitable[None]], on_partitions_assigned: Callable[[Set[TP]], Awaitable[None]], *, commit_interval: Optional[float] = None, commit_livelock_soft_timeout: Optional[float] = None, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)

Base Consumer.

ack(message: Message) bool

Mark message as being acknowledged by stream.

Return type:

bool

close() None

Close consumer for graceful shutdown.

Return type:

None

async commit(topics: Optional[AbstractSet[Union[str, TP]]] = None, start_new_transaction: bool = True) bool

Maybe commit the offset for all or specific topics.

Parameters:

topics (_UnionGenericAlias[_GenericAlias[_UnionGenericAlias[str, TP]], None]) – Set containing topics and/or TopicPartitions to commit.

Return type:

bool

async commit_and_end_transactions() None

Commit all safe offsets and end transaction.

Return type:

None

consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

flow_active: bool = True
async force_commit(topics: AbstractSet[Union[str, TP]] = None, start_new_transaction: bool = True) bool

Force offset commit.

Return type:

bool

async getmany(timeout: float) AsyncIterator[Tuple[TP, Message]]

Fetch batch of messages from server.

Return type:

_GenericAlias[_GenericAlias[TP, Message]]

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
async maybe_wait_for_commit_to_finish() bool

Wait for any existing commit operation to finish.

Return type:

bool

on_buffer_drop(tp: TP) None
Return type:

None

on_buffer_full(tp: TP) None
Return type:

None

on_init_dependencies() Iterable[ServiceT]

Return list of services this consumer depends on.

Return type:

_GenericAlias[ServiceT]

async on_partitions_assigned(assigned: Set[TP], generation_id: int = 0) None

Call during rebalancing when partitions are being assigned.

Return type:

None

async on_partitions_revoked(revoked: Set[TP]) None

Call during rebalancing when partitions are being revoked.

Return type:

None

async on_restart() None

Call when the consumer is restarted.

Return type:

None

async on_stop() None

Call when consumer is stopping.

Return type:

None

async on_task_error(exc: BaseException) None

Call when processing a message failed.

Return type:

None

pause_partitions(tps: Iterable[TP]) None

Pause fetching from partitions.

Return type:

None

async perform_seek() None

Seek all partitions to their current committed position.

Return type:

None

resume_flow() None

Allow consumer to process messages.

Return type:

None

resume_partitions(tps: Iterable[TP]) None

Resume fetching from partitions.

Return type:

None

async seek(partition: TP, offset: int) None

Seek partition to specific offset.

Return type:

None

abstract async seek_to_committed() Mapping[TP, int]

Seek all partitions to their committed offsets.

Return type:

_GenericAlias[TP, int]

stop_flow() None

Block consumer from processing any more messages.

Return type:

None

track_message(message: Message) None

Track message and mark it as pending ack.

Return type:

None

property unacked: Set[Message]

Return the set of currently unacknowledged messages. :rtype: _GenericAlias[Message]

async verify_all_partitions_active() None
Return type:

None

verify_event_path(now: float, tp: TP) None
Return type:

None

verify_recovery_event_path(now: float, tp: TP) None
Return type:

None

async wait_empty() None

Wait for all messages that started processing to be acked.

Return type:

None

async wait_for_stopped_flow() None

Wait until the consumer is not waiting on any newly fetched records.

Useful for scenarios where the consumer needs to be stopped to change the position of the fetcher to something other than the committed offset. There is a chance that getmany forces a seek to the committed offsets if the fetcher returns while the consumer is stopped. This can be prevented by waiting for the fetcher to finish (by default every second).

Return type:

None

app: AppT
can_resume_flow: Event
suspend_flow: Event
not_waiting_next_records: Event
log: CompositeLogger
transport: TransportT

The transport that created this Consumer.

transactions: TransactionManagerT
commit_interval: float

How often we commit topic offsets. See broker_commit_interval.

randomly_assigned_topics: Set[str]

Set of topic names that are considered “randomly assigned”. This means we don’t crash if it’s not part of our assignment. Used by e.g. the leader assignor service.

in_transaction: bool
scheduler: SchedulingStrategyT
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)

Base Producer.

async abort_transaction(transactional_id: str) None

Abort and rollback transaction by id.

Return type:

None

async begin_transaction(transactional_id: str) None

Begin transaction by id.

Return type:

None

async commit_transaction(transactional_id: str) None

Commit transaction by id.

Return type:

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None

Commit transactions.

Return type:

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None

Create/declare topic on server.

Return type:

None

async flush() None

Flush all in-flight messages.

Return type:

None

key_partition(topic: str, key: bytes) TP

Hash key to determine partition.

Return type:

TP

logger: logging.Logger = <Logger faust.transport.producer (WARNING)>
async maybe_begin_transaction(transactional_id: str) None

Begin transaction by id, if not already started.

Return type:

None

async on_start() None

Service is starting.

Return type:

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata]

Schedule message to be sent by producer.

Return type:

_GenericAlias[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata

Send message and wait for it to be transmitted.

Return type:

RecordMetadata

send_soon(fut: FutureMessage) None
Return type:

None

async stop_transaction(transactional_id: str) None

Stop transaction by id.

Return type:

None

supports_headers() bool

Return True if headers are supported by this transport.

Return type:

bool

threaded_producer: Optional[ServiceThread] = None
app: AppT
log: CompositeLogger
transport: TransportT

The transport that created this Producer.

buffer: ProducerBufferT
client_id: str
linger_ms: int
max_batch_size: int
acks: int
max_request_size: int
compression_type: Optional[str]
ssl_context: Optional[ssl.SSLContext]
partitioner: Optional[PartitionerT]
request_timeout: float
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class TransactionManager(transport: TransportT, *, consumer: ConsumerT, producer: ProducerT, **kwargs: Any)

Manage producer transactions.

async commit(offsets: Mapping[TP, int], start_new_transaction: bool = True) bool

Commit offsets for partitions.

Return type:

bool

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 30.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None

Create/declare topic on server.

Return type:

None

async flush() None

Wait for producer to transmit all pending messages.

Return type:

None

key_partition(topic: str, key: bytes) TP
Return type:

TP

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
async on_partitions_revoked(revoked: Set[TP]) None

Call when the cluster is rebalancing and partitions are revoked.

Return type:

None

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) None

Call when the cluster is rebalancing.

Return type:

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata]

Schedule message to be sent by producer.

Return type:

_GenericAlias[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata

Send message and wait for it to be transmitted.

Return type:

RecordMetadata

send_soon(fut: FutureMessage) None
Return type:

None

supports_headers() bool

Return True if the Kafka server supports headers.

Return type:

bool

transactional_id_format = '{group_id}-{tpg.group}-{tpg.partition}'
app: AppT
class Conductor(app: AppT, **kwargs: Any)

Manages the channels that subscribe to topics.

  • Consumes messages from topic using a single consumer.

  • Forwards messages to all channels subscribing to a topic.

property acking_topics: Set[str]
Return type:

_GenericAlias[str]

acks_enabled_for(topic: str) bool

Return True if acks are enabled for topic by name.

Return type:

bool

add(topic: TopicT) None

Register topic to be subscribed.

Return type:

None

clear() None

Clear all subscriptions.

Return type:

None

async commit(topics: AbstractSet[Union[str, TP]]) bool

Commit offsets in topics.

Return type:

bool

discard(topic: Any) None

Unregister topic from conductor.

Return type:

None

property label: str

Return label for use in logs. :rtype: str

logger: logging.Logger = <Logger faust.transport.conductor (WARNING)>
async maybe_wait_for_subscriptions() None
Return type:

None

async on_client_only_start() None
Return type:

None

async on_partitions_assigned(assigned: Set[TP]) None

Call when cluster is rebalancing and partitions are assigned.

Return type:

None

property shortlabel: str

Return short label for use in logs. :rtype: str

async wait_for_subscriptions() None

Wait for consumer to be subscribed.

Return type:

None

on_message: ConsumerCallback
app: _AppT
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class Fetcher(app: AppT, **kwargs: Any)

Service fetching messages from Kafka.

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
async on_stop() None

Call when the fetcher is stopping.

Return type:

None

app: AppT
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
driver_version: str

String identifying the underlying driver used for this transport. E.g. for https://pypi.org/project/aiokafka/ this could be aiokafka 0.4.1.

create_consumer(callback: Callable[[Message], Awaitable], **kwargs: Any) ConsumerT[source]

Create new consumer.

Return type:

ConsumerT

create_producer(**kwargs: Any) ProducerT[source]

Create new producer.

Return type:

ProducerT

create_transaction_manager(consumer: ConsumerT, producer: ProducerT, **kwargs: Any) TransactionManagerT[source]

Create new transaction manager.

Return type:

TransactionManagerT

create_conductor(**kwargs: Any) ConductorT[source]

Create new consumer conductor.

Return type:

ConductorT