import abc
import asyncio
import typing
from datetime import tzinfo
from typing import (
Any,
AsyncIterable,
Awaitable,
Callable,
ClassVar,
ContextManager,
Iterable,
Mapping,
MutableSequence,
NoReturn,
Optional,
Pattern,
Set,
Tuple,
Type,
TypeVar,
Union,
no_type_check,
)
import opentracing
from mode import Seconds, ServiceT, Signal, SupervisorStrategyT, SyncSignal
from mode.utils.futures import stampede
from mode.utils.objects import cached_property
from mode.utils.queues import FlowControlEvent, ThrowableQueue
from mode.utils.types.trees import NodeT
from .agents import AgentFun, AgentManagerT, AgentT, SinkT
from .assignor import PartitionAssignorT
from .codecs import CodecArg
from .core import HeadersArg, K, V
from .fixups import FixupT
from .router import RouterT
from .sensors import SensorDelegateT
from .serializers import RegistryT
from .streams import StreamT
from .tables import CollectionT, TableManagerT, TableT
from .topics import ChannelT, TopicT
from .transports import ConductorT, ConsumerT, ProducerT, TransportT
from .tuples import TP, Message, MessageSentCallback, RecordMetadata
from .web import (
CacheBackendT,
HttpClientT,
PageArg,
ResourceOptions,
View,
ViewDecorator,
Web,
)
from .windows import WindowT
if typing.TYPE_CHECKING:
from faust.cli.base import AppCommand as _AppCommand
from faust.livecheck.app import LiveCheck as _LiveCheck
from faust.sensors.monitor import Monitor as _Monitor
from faust.worker import Worker as _Worker
from .events import EventT as _EventT
from .models import ModelArg as _ModelArg
from .serializers import SchemaT as _SchemaT
from .settings import Settings as _Settings
else:
class _AppCommand: ... # noqa
class _SchemaT: ... # noqa
class _LiveCheck: ... # noqa
class _Monitor: ... # noqa
class _Worker: ... # noqa
class _EventT: ... # noqa
class _ModelArg: ... # noqa
class _Settings: ... # noqa
__all__ = [
"TaskArg",
"AppT",
]
TaskArg = Union[Callable[["AppT"], Awaitable], Callable[[], Awaitable]]
_T = TypeVar("_T")
class TracerT(abc.ABC):
@property
@abc.abstractmethod
def default_tracer(self) -> opentracing.Tracer: ...
@abc.abstractmethod
def trace(
self, name: str, sample_rate: Optional[float] = None, **extra_context: Any
) -> ContextManager: ...
@abc.abstractmethod
def get_tracer(self, service_name: str) -> opentracing.Tracer: ...
class BootStrategyT:
app: "AppT"
enable_kafka: bool = True
# We want these to take default from `enable_kafka`
# attribute, but still want to allow subclasses to define
# them like this:
# class MyBoot(BootStrategy):
# enable_kafka_consumer = False
enable_kafka_consumer: Optional[bool] = None
enable_kafka_producer: Optional[bool] = None
enable_web: Optional[bool] = None
enable_sensors: bool = True
@abc.abstractmethod
def __init__(
self,
app: "AppT",
*,
enable_web: Optional[bool] = None,
enable_kafka: bool = True,
enable_kafka_producer: Optional[bool] = None,
enable_kafka_consumer: Optional[bool] = None,
enable_sensors: bool = True,
) -> None: ...
@abc.abstractmethod
def server(self) -> Iterable[ServiceT]: ...
@abc.abstractmethod
def client_only(self) -> Iterable[ServiceT]: ...
@abc.abstractmethod
def producer_only(self) -> Iterable[ServiceT]: ...
[docs]class AppT(ServiceT):
"""Abstract type for the Faust application.
See Also:
:class:`faust.App`.
"""
Settings: ClassVar[Type[_Settings]]
BootStrategy: ClassVar[Type[BootStrategyT]]
boot_strategy: BootStrategyT
#: Set to true when the app is finalized (can read configuration).
finalized: bool = False
#: Set to true when the app has read configuration.
configured: bool = False
#: Set to true if the worker is currently rebalancing.
rebalancing: bool = False
rebalancing_count: int = 0
#: Set to true when the worker is in recovery
in_recovery: bool = False
consumer_generation_id: int = 0
#: Set to true if the assignment is empty
# This flag is set by App._on_partitions_assigned
unassigned: bool = False
#: Set to true when app is executing within a worker instance.
# This flag is set in faust/worker.py
in_worker: bool = False
on_configured: SyncSignal[_Settings] = SyncSignal()
on_before_configured: SyncSignal = SyncSignal()
on_after_configured: SyncSignal = SyncSignal()
on_partitions_assigned: Signal[Set[TP]] = Signal()
on_partitions_revoked: Signal[Set[TP]] = Signal()
on_rebalance_complete: Signal = Signal()
on_before_shutdown: Signal = Signal()
on_worker_init: SyncSignal = SyncSignal()
on_produce_message: SyncSignal = SyncSignal()
client_only: bool
producer_only: bool
agents: AgentManagerT
sensors: SensorDelegateT
fixups: MutableSequence[FixupT]
tracer: Optional[TracerT] = None
#: Original id argument + kwargs passed to App.__init__
_default_options: Tuple[str, Mapping[str, Any]]
@abc.abstractmethod
def __init__(
self, id: str, *, monitor: _Monitor, config_source: Any = None, **options: Any
) -> None:
self.on_startup_finished: Optional[Callable] = None
[docs] @abc.abstractmethod
def config_from_object(
self, obj: Any, *, silent: bool = False, force: bool = False
) -> None: ...
[docs] @abc.abstractmethod
def finalize(self) -> None: ...
[docs] @abc.abstractmethod
def main(self) -> NoReturn: ...
[docs] @abc.abstractmethod
def worker_init(self) -> None: ...
[docs] @abc.abstractmethod
def worker_init_post_autodiscover(self) -> None: ...
[docs] @abc.abstractmethod
def discover(
self,
*extra_modules: str,
categories: Iterable[str] = ("a", "b", "c"),
ignore: Iterable[Any] = ("foo", "bar"),
) -> None: ...
[docs] @abc.abstractmethod
def topic(
self,
*topics: str,
pattern: Union[str, Pattern] = None,
schema: Optional[_SchemaT] = None,
key_type: _ModelArg = None,
value_type: _ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
partitions: Optional[int] = None,
retention: Optional[Seconds] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
replicas: Optional[int] = None,
acks: bool = True,
internal: bool = False,
config: Optional[Mapping[str, Any]] = None,
maxsize: Optional[int] = None,
allow_empty: bool = False,
has_prefix: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> TopicT: ...
[docs] @abc.abstractmethod
def channel(
self,
*,
schema: Optional[_SchemaT] = None,
key_type: _ModelArg = None,
value_type: _ModelArg = None,
maxsize: Optional[int] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> ChannelT: ...
[docs] @abc.abstractmethod
def agent(
self,
channel: Union[str, ChannelT[_T]] = None,
*,
name: Optional[str] = None,
concurrency: int = 1,
supervisor_strategy: Type[SupervisorStrategyT] = None,
sink: Iterable[SinkT] = None,
isolated_partitions: bool = False,
use_reply_headers: bool = True,
**kwargs: Any,
) -> Callable[[AgentFun[_T]], AgentT[_T]]: ...
[docs] @abc.abstractmethod
@no_type_check
def task(
self, fun: TaskArg, *, on_leader: bool = False, traced: bool = True
) -> Callable: ...
[docs] @abc.abstractmethod
def timer(
self,
interval: Seconds,
on_leader: bool = False,
traced: bool = True,
name: Optional[str] = None,
max_drift_correction: float = 0.1,
) -> Callable: ...
[docs] @abc.abstractmethod
def crontab(
self,
cron_format: str,
*,
timezone: tzinfo = None,
on_leader: bool = False,
traced: bool = True,
) -> Callable: ...
[docs] @abc.abstractmethod
def service(self, cls: Type[ServiceT]) -> Type[ServiceT]: ...
[docs] @abc.abstractmethod
def stream(
self, channel: AsyncIterable, beacon: Optional[NodeT] = None, **kwargs: Any
) -> StreamT: ...
[docs] @abc.abstractmethod
def Table(
self,
name: str,
*,
default: Callable[[], Any] = None,
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
help: Optional[str] = None,
**kwargs: Any,
) -> TableT: ...
[docs] @abc.abstractmethod
def GlobalTable(
self,
name: str,
*,
default: Callable[[], Any] = None,
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
help: Optional[str] = None,
**kwargs: Any,
) -> TableT: ...
[docs] @abc.abstractmethod
def SetTable(
self,
name: str,
*,
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
start_manager: bool = False,
help: Optional[str] = None,
**kwargs: Any,
) -> TableT: ...
[docs] @abc.abstractmethod
def SetGlobalTable(
self,
name: str,
*,
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
start_manager: bool = False,
help: Optional[str] = None,
**kwargs: Any,
) -> TableT: ...
[docs] @abc.abstractmethod
def page(
self,
path: str,
*,
base: Type[View] = View,
cors_options: Mapping[str, ResourceOptions] = None,
name: Optional[str] = None,
) -> Callable[[PageArg], Type[View]]: ...
[docs] @abc.abstractmethod
def table_route(
self,
table: CollectionT,
shard_param: Optional[str] = None,
*,
query_param: Optional[str] = None,
match_info: Optional[str] = None,
exact_key: Optional[str] = None,
) -> ViewDecorator: ...
[docs] @abc.abstractmethod
def command(
self, *options: Any, base: Type[_AppCommand] = None, **kwargs: Any
) -> Callable[[Callable], Type[_AppCommand]]: ...
[docs] @abc.abstractmethod
def create_event(
self, key: K, value: V, headers: HeadersArg, message: Message
) -> _EventT: ...
[docs] @abc.abstractmethod
async def start_client(self) -> None: ...
[docs] @abc.abstractmethod
async def maybe_start_client(self) -> None: ...
[docs] @abc.abstractmethod
def trace(
self, name: str, trace_enabled: bool = True, **extra_context: Any
) -> ContextManager: ...
[docs] @abc.abstractmethod
async def send(
self,
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: Optional[_SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: Optional[MessageSentCallback] = None,
) -> Awaitable[RecordMetadata]: ...
[docs] @abc.abstractmethod
def LiveCheck(self, **kwargs: Any) -> _LiveCheck: ...
[docs] @stampede
@abc.abstractmethod
async def maybe_start_producer(self) -> ProducerT: ...
[docs] @abc.abstractmethod
def is_leader(self) -> bool: ...
[docs] @abc.abstractmethod
def FlowControlQueue(
self,
maxsize: Optional[int] = None,
*,
clear_on_resume: bool = False,
) -> ThrowableQueue: ...
[docs] @abc.abstractmethod
def Worker(self, **kwargs: Any) -> _Worker: ...
[docs] @abc.abstractmethod
def on_webserver_init(self, web: Web) -> None: ...
[docs] @abc.abstractmethod
def on_rebalance_start(self) -> None: ...
[docs] @abc.abstractmethod
def on_rebalance_return(self) -> None: ...
[docs] @abc.abstractmethod
def on_rebalance_end(self) -> None: ...
@property
def conf(self) -> _Settings: ...
@conf.setter
def conf(self, settings: _Settings) -> None: ...
@property
@abc.abstractmethod
def transport(self) -> TransportT: ...
@transport.setter
def transport(self, transport: TransportT) -> None: ...
@property
@abc.abstractmethod
def producer_transport(self) -> TransportT: ...
@producer_transport.setter
def producer_transport(self, transport: TransportT) -> None: ...
@property
@abc.abstractmethod
def cache(self) -> CacheBackendT: ...
@cache.setter
def cache(self, cache: CacheBackendT) -> None: ...
@property
@abc.abstractmethod
def producer(self) -> ProducerT: ...
@property
@abc.abstractmethod
def consumer(self) -> ConsumerT: ...
[docs] @cached_property
@abc.abstractmethod
def tables(self) -> TableManagerT: ...
[docs] @cached_property
@abc.abstractmethod
def topics(self) -> ConductorT: ...
@property
@abc.abstractmethod
def monitor(self) -> _Monitor: ...
@monitor.setter
def monitor(self, value: _Monitor) -> None: ...
[docs] @cached_property
@abc.abstractmethod
def flow_control(self) -> FlowControlEvent:
return FlowControlEvent(loop=self.loop)
@property
@abc.abstractmethod
def http_client(self) -> HttpClientT: ...
@http_client.setter
def http_client(self, client: HttpClientT) -> None: ...
[docs] @cached_property
@abc.abstractmethod
def assignor(self) -> PartitionAssignorT: ...
[docs] @cached_property
@abc.abstractmethod
def router(self) -> RouterT: ...
[docs] @cached_property
@abc.abstractmethod
def serializers(self) -> RegistryT: ...
[docs] @cached_property
@abc.abstractmethod
def web(self) -> Web: ...
[docs] @cached_property
@abc.abstractmethod
def in_transaction(self) -> bool: ...
@abc.abstractmethod
def _span_add_default_tags(self, span: opentracing.Span) -> None: ...
@abc.abstractmethod
def _start_span_from_rebalancing(self, name: str) -> opentracing.Span: ...