"""Copartitioned Assignor."""
from itertools import cycle
from math import ceil, floor
from typing import Counter, Iterable, Iterator, MutableMapping, Optional, Sequence, Set
from .client_assignment import CopartitionedAssignment
__all__ = ["CopartitionedAssignor"]
[docs]class CopartitionedAssignor:
"""Copartitioned Assignor.
All copartitioned topics must have the same number of partitions
The assignment is sticky which uses the following heuristics:
- Maintain existing assignments as long as within max_capacity for each client
- Assign actives to standbys when possible (within max_capacity)
- Assign in order to fill max_capacity of the clients
We optimize for not over utilizing resources instead of under-utilizing
resources. This results in a balanced assignment when max_capacity is the
default value which is ``ceil(num partitions / num clients)``
Notes:
Currently we raise an exception if number of clients is not enough
for the desired `replication`.
"""
max_capacity: int
min_capacity: int
num_partitions: int
replicas: int
topics: Set[str]
_num_clients: int
_client_assignments: MutableMapping[str, CopartitionedAssignment]
def __init__(
self,
topics: Iterable[str],
cluster_asgn: MutableMapping[str, CopartitionedAssignment],
num_partitions: int,
replicas: int,
capacity: Optional[int] = None,
) -> None:
self._num_clients = len(cluster_asgn)
assert self._num_clients, "Should assign to at least 1 client"
self.num_partitions = num_partitions
self.replicas = min(replicas, self._num_clients - 1)
self.max_capacity = (
int(ceil(float(self.num_partitions) / self._num_clients))
if capacity is None
else capacity
)
self.min_capacity = (
int(floor(float(self.num_partitions) / self._num_clients))
if capacity is None
else capacity
)
self.topics = set(topics)
assert (
self.max_capacity * self._num_clients >= self.num_partitions
), "Not enough max_capacity"
self._client_assignments = cluster_asgn
[docs] def get_assignment(self) -> MutableMapping[str, CopartitionedAssignment]:
# If there are no replicas required, try to ensure a balanced distribution with
# at least one partition per client by removing partitions down to the
# min_capacity for each client
capacity = (
self.min_capacity
if (
self.replicas == 0
and self.num_partitions > 0
and self.num_partitions >= self._num_clients
)
else self.max_capacity
)
for copartitioned in self._client_assignments.values():
copartitioned.unassign_extras(capacity, self.replicas)
self._assign(active=True)
self._assign(active=False)
return self._client_assignments
def _all_assigned(self, active: bool) -> bool:
assigned_counts = self._assigned_partition_counts(active)
total_assigns = self._total_assigns_per_partition(active)
return all(
assigned_counts[partition] == total_assigns
for partition in range(self.num_partitions)
)
def _assign(self, active: bool) -> None:
self._unassign_overassigned(active)
unassigned = self._get_unassigned(active)
self._assign_round_robin(unassigned, active)
assert self._all_assigned(active)
def _assigned_partition_counts(self, active: bool) -> Counter[int]:
return Counter(
partition
for copartitioned in self._client_assignments.values()
for partition in copartitioned.get_assigned_partitions(active)
)
def _get_client_limit(self, active: bool) -> int:
return self.max_capacity * self._total_assigns_per_partition(active)
def _total_assigns_per_partition(self, active: bool) -> int:
return 1 if active else self.replicas
def _unassign_overassigned(self, active: bool) -> None:
# There are cases when multiple clients could have the same
# assignment (zombies). We need to handle that appropriately.
partition_counts = self._assigned_partition_counts(active)
total_assigns = self._total_assigns_per_partition(active=active)
for partition in range(self.num_partitions):
extras = partition_counts[partition] - total_assigns
for _ in range(extras):
assgn = next(
assgn
for assgn in self._client_assignments.values()
if assgn.partition_assigned(partition, active=active)
)
assgn.unassign_partition(partition, active=active)
def _get_unassigned(self, active: bool) -> Sequence[int]:
partition_counts = self._assigned_partition_counts(active)
total_assigns = self._total_assigns_per_partition(active=active)
assert all(
partition_counts[partition] <= total_assigns
for partition in range(self.num_partitions)
)
return [
partition
for partition in range(self.num_partitions)
for _ in range(total_assigns - partition_counts[partition])
]
def _can_assign(
self, assignment: CopartitionedAssignment, partition: int, active: bool
) -> bool:
return not self._client_exhausted(assignment, active) and assignment.can_assign(
partition, active
)
def _client_exhausted(
self,
assignemnt: CopartitionedAssignment,
active: bool,
client_limit: Optional[int] = None,
) -> bool:
if client_limit is None:
client_limit = self._get_client_limit(active)
return assignemnt.num_assigned(active) == client_limit
def _find_promotable_standby(
self,
partition: int,
candidates: Iterator[CopartitionedAssignment],
) -> Optional[CopartitionedAssignment]:
# Round robin to find standby until we make a full cycle
for _ in range(self._num_clients):
assignment = next(candidates)
can_assign = assignment.partition_assigned(
partition, active=False
) and self._can_assign(assignment, partition, active=True)
if can_assign:
return assignment
return None
def _find_round_robin_assignable(
self,
partition: int,
candidates: Iterator[CopartitionedAssignment],
active: bool,
) -> Optional[CopartitionedAssignment]:
# Round robin and assign until we make a full circle
for _ in range(self._num_clients):
assignment = next(candidates)
if self._can_assign(assignment, partition, active):
return assignment
return None
def _assign_round_robin(self, unassigned: Iterable[int], active: bool) -> None:
# We do round robin assignment as follows:
# - Sort the candidate clients by the number of partitions already assigned,
# to improve the overall balance of the assignment process
# - For actives, we first try to assign to a standby
# - For standby, we offset the start for round robin to evenly
# distribute standbys for colocated actives
# - We do round robin over the sorted clients
# - If no assignment found, it must be a standby and the only unfilled
# client(s) must be actives/standbys for the partition
# - If no assignment found, we unassign and arbitrary partition from a
# filled assignment such that the partition can be assigned to it
# - This guarantees eventual assignment of all partitions
client_limit = self._get_client_limit(active)
if active:
candidates_list = sorted(
self._client_assignments.values(), key=lambda x: len(x.actives)
)
else:
candidates_list = sorted(
self._client_assignments.values(), key=lambda x: len(x.standbys)
)
candidates = cycle(iter(candidates_list))
unassigned = list(unassigned)
while unassigned:
partition = unassigned.pop(0)
assign_to = None
if active:
# For actives we first try to find a standby to assign to
assign_to = self._find_promotable_standby(partition, candidates)
if assign_to is not None:
# Unassign standby which will be promoted
assign_to.unassign_partition(partition, active=False)
else:
# For standbys we offset to round robin start to shuffle
# assignment of standbys
for _ in range(partition):
next(candidates)
assert assign_to is None or active
assign_to = assign_to or self._find_round_robin_assignable(
partition, candidates, active
)
# If round robin assignment didn't work then we must be
# assigning a standby and the only un-exhausted clients
# are actives for the partition
assert assign_to is not None or (
not active
and all(
assgn.partition_assigned(partition, active=True)
or assgn.partition_assigned(partition, active=False)
or self._client_exhausted(assgn, active, client_limit)
for assgn in self._client_assignments.values()
)
)
# If round robin didn't work, we free up the first full standby
# assignment to which the partition can be assigned.
if assign_to is None:
assign_to = next(
assigment
for assigment in self._client_assignments.values()
if (
self._client_exhausted(assigment, active)
and assigment.can_assign(partition, active)
)
) # By above assertion, should never throw error
unassigned_partition = assign_to.pop_partition(active)
unassigned.append(unassigned_partition)
# Assign partition
assign_to.assign_partition(partition, active)