Source code for faust.assignor.cluster_assignment

"""Cluster assignment."""

from typing import List, MutableMapping, Sequence, Set, cast

from faust.models import Record

from .client_assignment import ClientAssignment, ClientMetadata, CopartitionedAssignment

__all__ = ["CopartMapping", "ClusterAssignment"]

CopartMapping = MutableMapping[str, CopartitionedAssignment]


[docs]class ClusterAssignment( Record, serializer="json", include_metadata=False, namespace="@ClusterAssignment" ): """Cluster assignment state.""" # These are optional, but should never be set to None subscriptions: MutableMapping[str, Sequence[str]] = cast( MutableMapping[str, Sequence[str]], None ) assignments: MutableMapping[str, ClientAssignment] = cast( MutableMapping[str, ClientAssignment], None ) def __post_init__(self) -> None: if self.subscriptions is None: self.subscriptions = {} if self.assignments is None: self.assignments = {}
[docs] def topics(self) -> Set[str]: # All topics subscribed to in the cluster return {topic for sub in self.subscriptions.values() for topic in sub}
[docs] def add_client( self, client: str, subscription: List[str], metadata: ClientMetadata ) -> None: self.subscriptions[client] = list(subscription) self.assignments[client] = metadata.assignment
[docs] def copartitioned_assignments( self, copartitioned_topics: Set[str] ) -> CopartMapping: # We only pick clients that subscribe to all copartitioned topics subscribed_clis = { cli for cli, sub in self.subscriptions.items() if copartitioned_topics.issubset(sub) } return { cli: assignment.copartitioned_assignment(copartitioned_topics) for cli, assignment in self.assignments.items() if cli in subscribed_clis }