Source code for faust.joins

"""Join strategies."""

from typing import Any, Optional, Tuple

from .types import EventT, FieldDescriptorT, JoinableT, JoinT

__all__ = [
    "Join",
    "RightJoin",
    "LeftJoin",
    "InnerJoin",
    "OuterJoin",
]


[docs]class Join(JoinT): """Base class for join strategies.""" def __init__( self, *, stream: JoinableT, fields: Tuple[FieldDescriptorT, ...] ) -> None: self.fields = {field.model: field for field in fields} self.stream = stream
[docs] async def process(self, event: EventT) -> Optional[EventT]: """Process event to be joined with another event.""" raise NotImplementedError()
def __eq__(self, other: Any) -> bool: if isinstance(other, type(self)): return other.fields == self.fields and other.stream is self.stream return False def __ne__(self, other: Any) -> bool: return not self.__eq__(other)
[docs]class RightJoin(Join): """Right-join strategy."""
[docs]class LeftJoin(Join): """Left-join strategy."""
[docs]class InnerJoin(Join): """Inner-join strategy."""
[docs]class OuterJoin(Join): """Outer-join strategy."""