Source code for faust.utils.venusian
"""Venusian (see :pypi:`venusian`).
We define our own interface so we don't have to specify the
callback argument.
"""
from typing import Any, Callable
import venusian
from venusian import Scanner, attach as _attach
__all__ = ["Scanner", "attach"]
[docs]def attach(
fun: Callable,
category: str,
*,
callback: Callable[[Scanner, str, Any], None] = None,
**kwargs: Any,
) -> None:
"""Shortcut for :func:`venusian.attach`.
This shortcut makes the callback argument optional.
"""
callback = _on_found if callback is None else callback
return _attach(fun, callback, category=category, **kwargs)
def _on_found(scanner: venusian.Scanner, name: str, obj: Any) -> None: ...