Skip to content

mode.utils.queues

Queue utilities - variations of asyncio.Queue.

FlowControlEvent

Manage flow control FlowControlQueue instances.

The FlowControlEvent manages flow in one or many queue instances at the same time.

To flow control queues, first create the shared event:

>>> flow_control = FlowControlEvent()

Then pass that shared event to the queues that should be managed by it:

>>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control)
>>> q2 = FlowControlQueue(flow_control=flow_control)

If you want the contents of the queue to be cleared when flow is resumed, then specify that by using the clear_on_resume flag:

>>> q3 = FlowControlQueue(clear_on_resume=True,
...                       flow_control=flow_control)

To suspend production into queues, use flow_control.suspend:

>>> flow_control.suspend()

While the queues are suspend, any producer attempting to send something to the queue will hang until flow is resumed.

To resume production into queues, use flow_control.resume:

>>> flow_control.resume()
Notes

In Faust queues are managed by the app.flow_control event.

Source code in mode/utils/queues.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class FlowControlEvent:
    """Manage flow control `FlowControlQueue` instances.

    The FlowControlEvent manages flow in one or many queue instances
    at the same time.

    To flow control queues, first create the shared event:

    ```sh
    >>> flow_control = FlowControlEvent()
    ```

    Then pass that shared event to the queues that should be managed by it:

    ```sh
    >>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control)
    >>> q2 = FlowControlQueue(flow_control=flow_control)
    ```

    If you want the contents of the queue to be cleared when flow is resumed,
    then specify that by using the `clear_on_resume` flag:

    ```sh
    >>> q3 = FlowControlQueue(clear_on_resume=True,
    ...                       flow_control=flow_control)
    ```

    To suspend production into queues, use `flow_control.suspend`:

    ```sh
    >>> flow_control.suspend()
    ```

    While the queues are suspend, any producer attempting to send something
    to the queue will hang until flow is resumed.

    To resume production into queues, use `flow_control.resume`:

    ```sh
    >>> flow_control.resume()
    ```

    Notes:
        In Faust queues are managed by the ``app.flow_control`` event.
    """

    if typing.TYPE_CHECKING:
        _queues: WeakSet["FlowControlQueue"]
    _queues = None

    def __init__(
        self,
        *,
        initially_suspended: bool = True,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        self.loop = loop
        self._resume = Event(loop=self.loop)
        self._suspend = Event(loop=self.loop)
        if initially_suspended:
            self._suspend.set()
        self._queues = WeakSet()

    def manage_queue(self, queue: "FlowControlQueue") -> None:
        """Add `FlowControlQueue` to be cleared on resume."""
        self._queues.add(queue)

    def suspend(self) -> None:
        """Suspend production into queues managed by this event."""
        self._resume.clear()
        self._suspend.set()

    def resume(self) -> None:
        """Resume production into queues managed by this event."""
        self._suspend.clear()
        self._resume.set()
        self.clear()

    def is_active(self) -> bool:
        return not self._suspend.is_set()

    def clear(self) -> None:
        for queue in self._queues:
            queue.clear()

    async def acquire(self) -> None:
        """Wait until flow control is resumed."""
        if self._suspend.is_set():
            await self._resume.wait()

acquire() async

Wait until flow control is resumed.

Source code in mode/utils/queues.py
111
112
113
114
async def acquire(self) -> None:
    """Wait until flow control is resumed."""
    if self._suspend.is_set():
        await self._resume.wait()

manage_queue(queue)

Add FlowControlQueue to be cleared on resume.

Source code in mode/utils/queues.py
89
90
91
def manage_queue(self, queue: "FlowControlQueue") -> None:
    """Add `FlowControlQueue` to be cleared on resume."""
    self._queues.add(queue)

resume()

Resume production into queues managed by this event.

Source code in mode/utils/queues.py
 98
 99
100
101
102
def resume(self) -> None:
    """Resume production into queues managed by this event."""
    self._suspend.clear()
    self._resume.set()
    self.clear()

suspend()

Suspend production into queues managed by this event.

Source code in mode/utils/queues.py
93
94
95
96
def suspend(self) -> None:
    """Suspend production into queues managed by this event."""
    self._resume.clear()
    self._suspend.set()

FlowControlQueue

Bases: Queue

asyncio.Queue managed by FlowControlEvent.

See Also

FlowControlEvent.

Source code in mode/utils/queues.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class FlowControlQueue(asyncio.Queue):
    """`asyncio.Queue` managed by `FlowControlEvent`.

    See Also:
        `FlowControlEvent`.
    """

    pressure_high_ratio = 1.25  # divided by
    pressure_drop_ratio = 0.40  # multiplied by

    _pending_pressure_drop_callbacks: Set[Callable]

    def __init__(
        self,
        maxsize: int = 0,
        *,
        flow_control: FlowControlEvent,
        clear_on_resume: bool = False,
        **kwargs: Any,
    ) -> None:
        self._flow_control: FlowControlEvent = flow_control
        self._clear_on_resume: bool = clear_on_resume
        if self._clear_on_resume:
            self._flow_control.manage_queue(self)
        self._pending_pressure_drop_callbacks = set()
        super().__init__(maxsize, **kwargs)

    def clear(self) -> None:
        self._queue.clear()  # type: ignore

    def put_nowait_enhanced(
        self,
        value: _T,
        *,
        on_pressure_high: Callable,
        on_pressure_drop: Callable,
    ) -> bool:
        in_pressure_high_state = self.in_pressure_high_state(on_pressure_drop)
        if in_pressure_high_state:
            on_pressure_high()
        self.force_put_nowait(value)
        return in_pressure_high_state

    def in_pressure_high_state(self, callback: Callable) -> bool:
        pending = self._pending_pressure_drop_callbacks
        if callback not in pending:
            qsize = self.qsize()
            pressure_high_size = self.pressure_high_size
            if qsize >= pressure_high_size:
                pending.add(callback)
                self.on_pressure_high()
                return True
        return False

    def on_pressure_high(self) -> None: ...

    def on_pressure_drop(self) -> None: ...

    def maybe_endorse_pressure_drop(self) -> None:
        pending = self._pending_pressure_drop_callbacks
        if pending:
            size = self.qsize()
            if size:
                pressure_drop_size = self.pressure_drop_size
                if size <= pressure_drop_size:
                    # still have items left in the queue that
                    # will eventually call the rest of the callbacks.
                    pressure_drop_callback = pending.pop()
                    pressure_drop_callback()
                    self.on_pressure_drop()
            else:
                # if the queue is empty we have to process all of
                # the remaining sentinels.
                for pressure_drop_callback in pending:
                    pressure_drop_callback()
                pending.clear()
                self.on_pressure_drop()

    async def get(self) -> _T:
        if self.empty():
            self.maybe_endorse_pressure_drop()
        return cast(_T, await super().get())

    def get_nowait(self) -> _T:
        self.maybe_endorse_pressure_drop()
        return cast(_T, super().get_nowait())

    async def put(self, value: _T) -> None:
        await self._flow_control.acquire()
        await super().put(value)

    @no_type_check
    def force_put_nowait(self, item: _T) -> None:
        self._put(item)
        self._unfinished_tasks += 1
        self._finished.clear()
        self._wakeup_next(self._getters)

    @cached_property
    def pressure_high_size(self) -> int:
        return math.floor(self.maxsize / self.pressure_high_ratio)

    @cached_property
    def pressure_drop_size(self) -> int:
        return math.floor(self.maxsize * self.pressure_drop_ratio)

ThrowableQueue

Bases: FlowControlQueue

Queue that can be notified of errors.

Source code in mode/utils/queues.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class ThrowableQueue(FlowControlQueue):
    """Queue that can be notified of errors."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        self._putters: List[asyncio.Future]
        super().__init__(*args, **kwargs)
        self._errors: Deque[BaseException] = deque()

    @typing.no_type_check
    async def get(self) -> _T:
        if self._errors:
            raise self._errors.popleft()
        return await super().get()

    def empty(self) -> bool:
        return super().empty() and not self._errors

    def clear(self) -> None:
        self._queue.clear()  # type: ignore
        self._errors.clear()
        for putter in self._putters:
            putter.cancel()
        self._putters.clear()

    def get_nowait(self) -> _T:
        if self._errors:
            raise self._errors.popleft()
        return cast(_T, super().get_nowait())

    async def throw(self, exc: BaseException) -> None:
        self._throw(exc)

    def _throw(self, exc: BaseException) -> None:
        waiters = self._getters  # type: ignore
        while waiters:
            waiter = waiters.popleft()
            if not waiter.done():
                waiter.set_exception(exc)
                break
        else:
            self._errors.append(exc)