faust.transport.producer

Producer.

The Producer is responsible for:

  • Holds reference to the transport that created it

  • … and the app via self.transport.app.

  • Sending messages.

class faust.transport.producer.Producer(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]

Base Producer.

transport: TransportT

The transport that created this Producer.

app: AppT
client_id: str
linger_ms: int
max_batch_size: int
acks: int
max_request_size: int
compression_type: Optional[str]
request_timeout: float
ssl_context: Optional[ssl.SSLContext]
partitioner: Optional[PartitionerT]
buffer: ProducerBufferT
threaded_producer: Optional[ServiceThread] = None
async on_start() None[source]

Service is starting.

Return type:

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata][source]

Schedule message to be sent by producer.

Return type:

_GenericAlias[RecordMetadata]

send_soon(fut: FutureMessage) None[source]
Return type:

None

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata[source]

Send message and wait for it to be transmitted.

Return type:

RecordMetadata

async flush() None[source]

Flush all in-flight messages.

Return type:

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None[source]

Create/declare topic on server.

Return type:

None

key_partition(topic: str, key: bytes) TP[source]

Hash key to determine partition.

Return type:

TP

async begin_transaction(transactional_id: str) None[source]

Begin transaction by id.

Return type:

None

async commit_transaction(transactional_id: str) None[source]

Commit transaction by id.

Return type:

None

async abort_transaction(transactional_id: str) None[source]

Abort and rollback transaction by id.

Return type:

None

async stop_transaction(transactional_id: str) None[source]

Stop transaction by id.

Return type:

None

async maybe_begin_transaction(transactional_id: str) None[source]

Begin transaction by id, if not already started.

Return type:

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None[source]

Commit transactions.

Return type:

None

logger: logging.Logger = <Logger faust.transport.producer (WARNING)>
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
supports_headers() bool[source]

Return True if headers are supported by this transport.

Return type:

bool