Source code for faust.agents.actor

"""Actor - Individual Agent instances."""

from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Optional, Set, cast

from mode import Service
from mode.utils.tracebacks import format_agen_stack, format_coro_stack

from faust.types import TP, ChannelT, StreamT
from faust.types.agents import _T, ActorT, AgentT, AsyncIterableActorT, AwaitableActorT

__all__ = ["Actor", "AsyncIterableActor", "AwaitableActor"]


[docs]class Actor(ActorT, Service): """An actor is a specific agent instance.""" mundane_level = "debug" # Agent will start n * concurrency actors. def __init__( self, agent: AgentT, stream: StreamT, it: _T, index: Optional[int] = None, active_partitions: Optional[Set[TP]] = None, **kwargs: Any, ) -> None: self.agent = agent self.stream = stream self.it = it self.index = index self.active_partitions = active_partitions self.actor_task = None Service.__init__(self, **kwargs)
[docs] async def on_start(self) -> None: """Call when actor is starting.""" assert self.actor_task self.add_future(self.actor_task)
[docs] async def on_stop(self) -> None: """Call when actor is being stopped.""" self.cancel()
[docs] async def on_isolated_partition_revoked(self, tp: TP) -> None: """Call when an isolated partition is being revoked.""" self.log.debug("Cancelling current task in actor for partition %r", tp) self.cancel() self.log.info("Stopping actor for revoked partition %r...", tp) await self.stop() self.log.debug("Actor for revoked partition %r stopped")
[docs] async def on_isolated_partition_assigned(self, tp: TP) -> None: """Call when an isolated partition is being assigned.""" self.log.dev("Actor was assigned to %r", tp)
[docs] def cancel(self) -> None: """Tell actor to stop reading from the stream.""" cast(ChannelT, self.stream.channel)._throw(StopAsyncIteration())
def __repr__(self) -> str: return f"<{self.shortlabel}>" @property def label(self) -> str: """Return human readable description of actor.""" s = self.agent._agent_label(name_suffix="*") if self.stream.active_partitions: partitions = {tp.partition for tp in self.stream.active_partitions} s += f" isolated={partitions}" return s
[docs]class AsyncIterableActor(AsyncIterableActorT, Actor): """Used for agent function that yields.""" def __aiter__(self) -> AsyncIterator: return self.it.__aiter__()
[docs] def traceback(self) -> str: return format_agen_stack(cast(AsyncGenerator, self.it))
[docs]class AwaitableActor(AwaitableActorT, Actor): """Used for actor function that do not yield.""" def __await__(self) -> Any: return self.it.__await__()
[docs] def traceback(self) -> str: return format_coro_stack(cast(Coroutine, self.it))