faust.types.streams

faust.types.streams.GroupByKeyArg

Type of the key argument to Stream.group_by()

alias of Union[FieldDescriptorT, Callable[[T], Optional[Union[bytes, _ModelT, Any]]]]

class faust.types.streams.StreamT(channel: Optional[AsyncIterator[T_co]] = None, *, app: Optional[_AppT] = None, processors: Optional[Iterable[Callable[[T], Union[T, Awaitable[T]]]]] = None, combined: Optional[List[JoinableT]] = None, on_start: Optional[Callable] = None, join_strategy: Optional[_JoinT] = None, beacon: Optional[NodeT] = None, concurrency_index: Optional[int] = None, prev: Optional[StreamT] = None, active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = '', loop: Optional[AbstractEventLoop] = None)[source]
app: _AppT
channel: AsyncIterator[T_co]
outbox: Optional[Queue] = None
join_strategy: Optional[_JoinT] = None
task_owner: Optional[Task] = None
current_event: Optional[EventT] = None
active_partitions: Optional[Set[TP]] = None
concurrency_index: Optional[int] = None
enable_acks: bool = True
prefix: str = ''
combined: List[JoinableT]
abstract get_active_stream() StreamT[source]
Return type:

StreamT

abstract add_processor(processor: Callable[[T], Union[T, Awaitable[T]]]) None[source]
Return type:

None

abstract info() Mapping[str, Any][source]
Return type:

_GenericAlias[str, Any]

abstract clone(**kwargs: Any) StreamT[source]
Return type:

StreamT

abstract async items() AsyncIterator[Tuple[Optional[Union[bytes, _ModelT, Any]], T_co]][source]
abstract async events() AsyncIterable[EventT][source]
abstract async take(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[T_co]][source]
abstract enumerate(start: int = 0) AsyncIterable[Tuple[int, T_co]][source]
Return type:

_GenericAlias[_GenericAlias[int, +T_co]]

abstract through(channel: Union[str, ChannelT]) StreamT[source]
Return type:

StreamT

abstract echo(*channels: Union[str, ChannelT]) StreamT[source]
Return type:

StreamT

abstract group_by(key: Union[FieldDescriptorT, Callable[[T], Optional[Union[bytes, _ModelT, Any]]]], *, name: Optional[str] = None, topic: Optional[TopicT] = None) StreamT[source]
Return type:

StreamT

abstract derive_topic(name: str, *, schema: Optional[_SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, prefix: str = '', suffix: str = '') TopicT[source]
Return type:

TopicT

abstract async throw(exc: BaseException) None[source]
Return type:

None

Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
abstract async ack(event: EventT) bool[source]
Return type:

bool