Source code for faust.assignor.cluster_assignment
"""Cluster assignment."""
from typing import List, MutableMapping, Sequence, Set, cast
from faust.models import Record
from .client_assignment import ClientAssignment, ClientMetadata, CopartitionedAssignment
__all__ = ["CopartMapping", "ClusterAssignment"]
CopartMapping = MutableMapping[str, CopartitionedAssignment]
[docs]class ClusterAssignment(
Record, serializer="json", include_metadata=False, namespace="@ClusterAssignment"
):
"""Cluster assignment state."""
# These are optional, but should never be set to None
subscriptions: MutableMapping[str, Sequence[str]] = cast(
MutableMapping[str, Sequence[str]], None
)
assignments: MutableMapping[str, ClientAssignment] = cast(
MutableMapping[str, ClientAssignment], None
)
def __post_init__(self) -> None:
if self.subscriptions is None:
self.subscriptions = {}
if self.assignments is None:
self.assignments = {}
[docs] def topics(self) -> Set[str]:
# All topics subscribed to in the cluster
return {topic for sub in self.subscriptions.values() for topic in sub}
[docs] def add_client(
self, client: str, subscription: List[str], metadata: ClientMetadata
) -> None:
self.subscriptions[client] = list(subscription)
self.assignments[client] = metadata.assignment
[docs] def copartitioned_assignments(
self, copartitioned_topics: Set[str]
) -> CopartMapping:
# We only pick clients that subscribe to all copartitioned topics
subscribed_clis = {
cli
for cli, sub in self.subscriptions.items()
if copartitioned_topics.issubset(sub)
}
return {
cli: assignment.copartitioned_assignment(copartitioned_topics)
for cli, assignment in self.assignments.items()
if cli in subscribed_clis
}