Skip to content

mode.utils.futures

Async I/O Future utilities.

stampede

Descriptor for cached async operations providing stampede protection.

See also thundering herd problem.

Adding the decorator to an async callable method:

Examples:

Here's an example coroutine method connecting a network client:

class Client:

    @stampede
    async def maybe_connect(self):
        await self._connect()

    async def _connect(self):
        return Connection()

In the above example, if multiple coroutines call maybe_connect at the same time, then only one of them will actually perform the operation. The rest of the coroutines will wait for the result, and return it once the first caller returns.

Source code in mode/utils/futures.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class stampede:
    """Descriptor for cached async operations providing stampede protection.

    See also thundering herd problem.

    Adding the decorator to an async callable method:

    Examples:
        Here's an example coroutine method connecting a network client:

        ```python
        class Client:

            @stampede
            async def maybe_connect(self):
                await self._connect()

            async def _connect(self):
                return Connection()
        ```

        In the above example, if multiple coroutines call ``maybe_connect``
        at the same time, then only one of them will actually perform the
        operation. The rest of the coroutines will wait for the result,
        and return it once the first caller returns.
    """

    def __init__(self, fget: Callable, *, doc: Optional[str] = None) -> None:
        self.__get = fget
        self.__doc__ = doc or fget.__doc__
        self.__name__ = fget.__name__
        self.__module__ = fget.__module__
        self.__wrapped__ = fget

    def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
        # here to support inspect.signature
        raise NotImplementedError()

    def __get__(self, obj: Any, type: Optional[Type] = None) -> Any:
        if obj is None:
            return self
        try:
            w = obj.__dict__[self.__name__]
        except KeyError:
            w = obj.__dict__[self.__name__] = StampedeWrapper(self.__get, obj)
        return w

done_future(result=None, *, loop=None)

Return asyncio.Future that is already evaluated.

Source code in mode/utils/futures.py
121
122
123
124
125
126
127
128
129
def done_future(
    result: Any = None, *, loop: Optional[asyncio.AbstractEventLoop] = None
) -> asyncio.Future:
    """Return `asyncio.Future` that is already evaluated."""
    f = (
        loop or asyncio.get_event_loop_policy().get_event_loop()
    ).create_future()
    f.set_result(result)
    return f

maybe_async(res) async

Await future if argument is Awaitable.

Examples:

>>> await maybe_async(regular_function(arg))
>>> await maybe_async(async_function(arg))
Source code in mode/utils/futures.py
132
133
134
135
136
137
138
139
140
141
async def maybe_async(res: Any) -> Any:
    """Await future if argument is Awaitable.

    Examples:
        >>> await maybe_async(regular_function(arg))
        >>> await maybe_async(async_function(arg))
    """
    if isawaitable(res):
        return await res
    return res

maybe_cancel(fut)

Cancel future if it is cancellable.

Source code in mode/utils/futures.py
144
145
146
147
148
def maybe_cancel(fut: Optional[asyncio.Future]) -> bool:
    """Cancel future if it is cancellable."""
    if fut is not None and not fut.done():
        return fut.cancel()
    return False

maybe_set_exception(fut, exc)

Set future exception if not already done.

Source code in mode/utils/futures.py
151
152
153
154
155
156
157
158
def maybe_set_exception(
    fut: Union[asyncio.Future, None], exc: BaseException
) -> bool:
    """Set future exception if not already done."""
    if fut is not None and not fut.done():
        fut.set_exception(exc)
        return True
    return False

maybe_set_result(fut, result)

Set future result if not already done.

Source code in mode/utils/futures.py
161
162
163
164
165
166
def maybe_set_result(fut: Optional[asyncio.Future], result: Any) -> bool:
    """Set future result if not already done."""
    if fut is not None and not fut.done():
        fut.set_result(result)
        return True
    return False

notify(fut, result=None)

Set asyncio.Future result if future exists and is not done.

Source code in mode/utils/futures.py
169
170
171
172
173
174
def notify(fut: Optional[asyncio.Future], result: Any = None) -> None:
    """Set `asyncio.Future` result if future exists and is not done."""
    # can be used to turn a Future into a lockless, single-consumer condition,
    # for multi-consumer use asyncio.Condition
    if fut is not None and not fut.done():
        fut.set_result(result)