Skip to content

mode.types.services

Type classes for mode.services.

DiagT

Bases: ABC

Diag keeps track of a services diagnostic flags.

Source code in mode/types/services.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class DiagT(abc.ABC):
    """Diag keeps track of a services diagnostic flags."""

    flags: Set[str]
    last_transition: MutableMapping[str, float]

    @abc.abstractmethod
    def __init__(self, service: "ServiceT") -> None: ...

    @abc.abstractmethod
    def set_flag(self, flag: str) -> None: ...

    @abc.abstractmethod
    def unset_flag(self, flag: str) -> None: ...

ServiceT

Bases: AsyncContextManager

Abstract type for an asynchronous service that can be started/stopped.

See Also

mode.Service.

Source code in mode/types/services.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class ServiceT(AsyncContextManager):
    """Abstract type for an asynchronous service that can be started/stopped.

    See Also:
        `mode.Service`.
    """

    Diag: Type[DiagT]
    diag: DiagT
    async_exit_stack: AsyncExitStack
    exit_stack: ExitStack

    shutdown_timeout: float
    wait_for_shutdown = False
    _loop: Optional[asyncio.AbstractEventLoop]
    restart_count: int = 0
    supervisor: Optional[SupervisorStrategyT] = None

    @abc.abstractmethod
    def __init__(
        self,
        *,
        beacon: NodeT = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None: ...

    @abc.abstractmethod
    def add_dependency(self, service: "ServiceT") -> "ServiceT": ...

    @abc.abstractmethod
    async def add_runtime_dependency(
        self, service: "ServiceT"
    ) -> "ServiceT": ...

    @abc.abstractmethod
    async def add_async_context(self, context: AsyncContextManager) -> Any: ...

    @abc.abstractmethod
    def add_context(self, context: ContextManager) -> Any: ...

    @abc.abstractmethod
    async def start(self) -> None: ...

    @abc.abstractmethod
    async def maybe_start(self) -> bool: ...

    @abc.abstractmethod
    async def crash(self, reason: BaseException) -> None: ...

    @abc.abstractmethod
    def _crash(self, reason: BaseException) -> None: ...

    @abc.abstractmethod
    async def stop(self) -> None: ...

    @abc.abstractmethod
    def service_reset(self) -> None: ...

    @abc.abstractmethod
    async def restart(self) -> None: ...

    @abc.abstractmethod
    async def wait_until_stopped(self) -> None: ...

    @abc.abstractmethod
    def set_shutdown(self) -> None: ...

    @abc.abstractmethod
    def _repr_info(self) -> str: ...

    @property
    @abc.abstractmethod
    def started(self) -> bool: ...

    @property
    @abc.abstractmethod
    def crashed(self) -> bool: ...

    @property
    @abc.abstractmethod
    def should_stop(self) -> bool: ...

    @property
    @abc.abstractmethod
    def state(self) -> str: ...

    @property
    @abc.abstractmethod
    def label(self) -> str: ...

    @property
    @abc.abstractmethod
    def shortlabel(self) -> str: ...

    @property
    def beacon(self) -> NodeT: ...

    @beacon.setter
    def beacon(self, beacon: NodeT) -> None: ...

    @property
    @abc.abstractmethod
    def loop(self) -> asyncio.AbstractEventLoop: ...

    @loop.setter
    def loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None: ...

    @property
    @abc.abstractmethod
    def crash_reason(self) -> Optional[BaseException]: ...

    @crash_reason.setter
    def crash_reason(self, reason: Optional[BaseException]) -> None: ...