Skip to content

mode.utils.imports

Importing utilities.

FactoryMapping

Bases: FastUserDict, Generic[_T]

Class plugin system.

This is an utility to maintain a mapping from name to fully qualified Python attribute path, and also supporting the use of these in URLs.

>>> # Specifying the type enables mypy to know that
>>> # this factory returns Driver subclasses.
>>> drivers: FactoryMapping[Type[Driver]]
>>> drivers = FactoryMapping({
...    'rabbitmq': 'my.drivers.rabbitmq:Driver',
...    'kafka': 'my.drivers.kafka:Driver',
...    'redis': 'my.drivers.redis:Driver',
... })

>>> drivers.by_url('rabbitmq://localhost:9090')
<class 'my.drivers.rabbitmq.Driver'>

>>> drivers.by_name('redis')
<class 'my.drivers.redis.Driver'>
Source code in mode/utils/imports.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class FactoryMapping(FastUserDict, Generic[_T]):
    """Class plugin system.

    This is an utility to maintain a mapping from name to fully
    qualified Python attribute path, and also supporting the use
    of these in URLs.

    ```sh
    >>> # Specifying the type enables mypy to know that
    >>> # this factory returns Driver subclasses.
    >>> drivers: FactoryMapping[Type[Driver]]
    >>> drivers = FactoryMapping({
    ...    'rabbitmq': 'my.drivers.rabbitmq:Driver',
    ...    'kafka': 'my.drivers.kafka:Driver',
    ...    'redis': 'my.drivers.redis:Driver',
    ... })

    >>> drivers.by_url('rabbitmq://localhost:9090')
    <class 'my.drivers.rabbitmq.Driver'>

    >>> drivers.by_name('redis')
    <class 'my.drivers.redis.Driver'>
    ```
    """

    aliases: MutableMapping[str, str]
    namespaces: Set
    _finalized: bool = False

    def __init__(self, *args: Mapping, **kwargs: str) -> None:
        self.aliases = dict(*args, **kwargs)
        self.namespaces = set()

    def iterate(self) -> Iterator[_T]:
        self._maybe_finalize()
        for name in self.aliases:
            yield self.by_name(name)

    def by_url(self, url: Union[str, URL]) -> _T:
        """Get class associated with URL (scheme is used as alias key)."""
        # we remove anything after ; so urlparse can recognize the url.
        return self.by_name(URL(url).scheme)

    def by_name(self, name: SymbolArg[_T]) -> _T:
        self._maybe_finalize()
        try:
            return symbol_by_name(name, aliases=self.aliases)
        except ModuleNotFoundError as exc:
            name_ = cast(str, name)
            if "." in name_:
                raise
            alt = didyoumean(
                self.aliases,
                name_,
                fmt_none=f'Available choices: {", ".join(self.aliases)}',
            )
            raise ModuleNotFoundError(
                f"{name!r} is not a valid name. {alt}"
            ) from exc

    def get_alias(self, name: str) -> str:
        self._maybe_finalize()
        return self.aliases[name]

    def include_setuptools_namespace(self, namespace: str) -> None:
        self.namespaces.add(namespace)

    def _maybe_finalize(self) -> None:
        if not self._finalized:
            self._finalized = True
            self._finalize()

    def _finalize(self) -> None:
        for namespace in self.namespaces:
            self.aliases.update(dict(load_extension_class_names(namespace)))

    @cached_property
    def data(self) -> MutableMapping:  # type: ignore
        return self.aliases

by_url(url)

Get class associated with URL (scheme is used as alias key).

Source code in mode/utils/imports.py
100
101
102
103
def by_url(self, url: Union[str, URL]) -> _T:
    """Get class associated with URL (scheme is used as alias key)."""
    # we remove anything after ; so urlparse can recognize the url.
    return self.by_name(URL(url).scheme)

ParsedSymbol

Bases: NamedTuple

Tuple returned by parse_symbol.

Source code in mode/utils/imports.py
151
152
153
154
155
class ParsedSymbol(NamedTuple):
    """Tuple returned by `parse_symbol`."""

    module_name: Optional[str]
    attribute_name: Optional[str]

cwd_in_path()

Context adding the current working directory to sys.path.

Source code in mode/utils/imports.py
381
382
383
384
385
386
387
388
389
390
391
392
393
@contextmanager
def cwd_in_path() -> Generator:
    """Context adding the current working directory to sys.path."""
    cwd = os.getcwd()
    if cwd in sys.path:
        yield
    else:
        sys.path.insert(0, cwd)
        try:
            yield cwd
        finally:
            with suppress(ValueError):
                sys.path.remove(cwd)

import_from_cwd(module, *, imp=None, package=None)

Import module, temporarily including modules in the current directory.

Modules located in the current directory has precedence over modules located in sys.path.

Source code in mode/utils/imports.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def import_from_cwd(
    module: str,
    *,
    imp: Optional[Callable] = None,
    package: Optional[str] = None,
) -> ModuleType:
    """Import module, temporarily including modules in the current directory.

    Modules located in the current directory has
    precedence over modules located in `sys.path`.
    """
    if imp is None:
        imp = importlib.import_module
    with cwd_in_path():
        return imp(module, package=package)

load_extension_class_names(namespace)

Get setuptools entrypoint extension class names.

If the entrypoint is defined in setup.py as:

entry_points={
    'faust.codecs': [
        'msgpack = faust_msgpack:msgpack',
    ],

Iterating over the 'faust.codecs' namespace will yield the name:

>>> list(load_extension_class_names('faust.codecs'))
[('msgpack', 'faust_msgpack:msgpack')]
Source code in mode/utils/imports.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def load_extension_class_names(
    namespace: str,
) -> Iterable[RawEntrypointExtension]:
    """Get setuptools entrypoint extension class names.

    If the entrypoint is defined in `setup.py` as:

    ```python
    entry_points={
        'faust.codecs': [
            'msgpack = faust_msgpack:msgpack',
        ],
    ```

    Iterating over the 'faust.codecs' namespace will yield the name:

    ```sh
    >>> list(load_extension_class_names('faust.codecs'))
    [('msgpack', 'faust_msgpack:msgpack')]
    ```
    """
    try:
        from pkg_resources import iter_entry_points
    except ImportError:
        return

    for ep in iter_entry_points(namespace):
        yield RawEntrypointExtension(
            ep.name, ":".join([ep.module_name, ep.attrs[0]])
        )

load_extension_classes(namespace)

Yield extension classes for setuptools entrypoint namespace.

If an entrypoint is defined in setup.py:

entry_points={
    'faust.codecs': [
        'msgpack = faust_msgpack:msgpack',
    ],

Iterating over the 'faust.codecs' namespace will yield the actual attributes specified in the path (faust_msgpack:msgpack):

from faust_msgpack import msgpack
attrs = list(load_extension_classes('faust.codecs'))
assert msgpack in attrs
Source code in mode/utils/imports.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def load_extension_classes(namespace: str) -> Iterable[EntrypointExtension]:
    """Yield extension classes for setuptools entrypoint namespace.

    If an entrypoint is defined in `setup.py`:

    ```python
    entry_points={
        'faust.codecs': [
            'msgpack = faust_msgpack:msgpack',
        ],
    ```

    Iterating over the 'faust.codecs' namespace will yield
    the actual attributes specified in the path (`faust_msgpack:msgpack`):

    ```python
    from faust_msgpack import msgpack
    attrs = list(load_extension_classes('faust.codecs'))
    assert msgpack in attrs
    ```
    """
    for name, cls_name in load_extension_class_names(namespace):
        try:
            cls: Type = symbol_by_name(cls_name)
        except (ImportError, SyntaxError) as exc:
            warnings.warn(
                f"Cannot load {namespace} extension {cls_name!r}: {exc!r}",
                stacklevel=1,
            )
        else:
            yield EntrypointExtension(name, cls)

parse_symbol(s, *, package=None, strict_separator=':', relative_separator='.')

Parse symbol_by_name argument into components.

Returns:

Name Type Description
ParsedSymbol ParsedSymbol

Tuple of (module_name, attribute_name)

Raises:

Type Description
ValueError

if relative import (arg starts with '.') and no package argument is specified.

Examples:

>>> parse_symbol('mode.services')
ParsedSymbol(module_name='mode.services', attribute_name=None)

>>> parse_symbol('.services', package='mode')
ParsedSymbol(module_name='.services', attribute_name=None)

>>> parse_symbol('mode.services.Service')
ParsedSymbol(module_name='mode.services', attribute_name='Service')

>>> parse_symbol('mode.services:Service')
ParsedSymbol(module_name='mode.services', attribute_name='Service')
Source code in mode/utils/imports.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def parse_symbol(
    s: str,
    *,
    package: Optional[str] = None,
    strict_separator: str = ":",
    relative_separator: str = ".",
) -> ParsedSymbol:
    """Parse `symbol_by_name` argument into components.

    Returns:
        ParsedSymbol: Tuple of ``(module_name, attribute_name)``

    Raises:
        ValueError: if relative import (arg starts with '.') and
            no ``package`` argument is specified.

    Examples:

    ```sh
    >>> parse_symbol('mode.services')
    ParsedSymbol(module_name='mode.services', attribute_name=None)

    >>> parse_symbol('.services', package='mode')
    ParsedSymbol(module_name='.services', attribute_name=None)

    >>> parse_symbol('mode.services.Service')
    ParsedSymbol(module_name='mode.services', attribute_name='Service')

    >>> parse_symbol('mode.services:Service')
    ParsedSymbol(module_name='mode.services', attribute_name='Service')
    ```
    """
    module_name: Optional[str]
    attribute_name: Optional[str]
    partition_by = (
        strict_separator if strict_separator in s else relative_separator
    )

    module_name, used_separator, attribute_name = s.rpartition(partition_by)
    if not module_name:
        # Module name is missing must be either ".foo" or ":foo",
        # and is a relative import.
        if used_separator == ":":
            # ":foo" is illegal and will result in ValueError below.
            raise ValueError(f'Missing module name with ":" separator: {s!r}')
        elif used_separator == ".":
            # ".foo" is legal but requires a ``package`` argument.
            if not package:
                raise ValueError(
                    f"Relative import {s!r} but package=None (required)"
                )
            module_name, attribute_name = s, None
        else:
            attribute_name, module_name = (
                None,
                package if package else attribute_name,
            )

    if attribute_name:
        _ensure_identifier(attribute_name, full=s)
    if module_name:  # pragma: no cover
        _ensure_identifier(module_name.strip(relative_separator), full=s)

    return ParsedSymbol(module_name, attribute_name)

smart_import(path, imp=None)

Import module if module, otherwise same as symbol_by_name.

Source code in mode/utils/imports.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def smart_import(path: str, imp: Any = None) -> Any:
    """Import module if module, otherwise same as `symbol_by_name`."""
    imp = importlib.import_module if imp is None else imp
    if ":" in path:
        # Path includes attribute so can just jump
        # here (e.g., ``os.path:abspath``).
        return symbol_by_name(path, imp=imp)

    # Not sure if path is just a module name or if it includes an
    # attribute name (e.g., ``os.path``, vs, ``os.path.abspath``).
    try:
        return imp(path)
    except ImportError:
        # Not a module name, so try module + attribute.
        return symbol_by_name(path, imp=imp)

symbol_by_name(name, aliases=None, imp=None, package=None, sep='.', default=None, **kwargs)

Get symbol by qualified name.

The name should be the full dot-separated path to the class:

modulename.ClassName

Example:

mazecache.backends.redis.RedisBackend
                        ^- class name

or using ':' to separate module and symbol:

mazecache.backends.redis:RedisBackend

If aliases is provided, a dict containing short name/long name mappings, the name is looked up in the aliases first.

Examples:

>>> symbol_by_name('mazecache.backends.redis:RedisBackend')
<class 'mazecache.backends.redis.RedisBackend'>

>>> symbol_by_name('default', {
...     'default': 'mazecache.backends.redis:RedisBackend'})
<class 'mazecache.backends.redis.RedisBackend'>

# Does not try to look up non-string names.
>>> from mazecache.backends.redis import RedisBackend
>>> symbol_by_name(RedisBackend) is RedisBackend
True
Source code in mode/utils/imports.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def symbol_by_name(
    name: SymbolArg[_T],
    aliases: Optional[Mapping[str, str]] = None,
    imp: Any = None,
    package: Optional[str] = None,
    sep: str = ".",
    default: _T = None,
    **kwargs: Any,
) -> _T:
    """Get symbol by qualified name.

    The name should be the full dot-separated path to the class:

        modulename.ClassName

    Example:

    ```python
    mazecache.backends.redis.RedisBackend
                            ^- class name
    ```

    or using ':' to separate module and symbol:

    ```python
    mazecache.backends.redis:RedisBackend
    ```

    If `aliases` is provided, a dict containing short name/long name
    mappings, the name is looked up in the aliases first.

    Examples:

    ```sh
    >>> symbol_by_name('mazecache.backends.redis:RedisBackend')
    <class 'mazecache.backends.redis.RedisBackend'>

    >>> symbol_by_name('default', {
    ...     'default': 'mazecache.backends.redis:RedisBackend'})
    <class 'mazecache.backends.redis.RedisBackend'>

    # Does not try to look up non-string names.
    >>> from mazecache.backends.redis import RedisBackend
    >>> symbol_by_name(RedisBackend) is RedisBackend
    True
    ```
    """
    # This code was copied from kombu.utils.symbol_by_name
    imp = importlib.import_module if imp is None else imp

    if not isinstance(name, str):
        return name  # already a class
    name = (aliases or {}).get(name) or name

    module_name, attribute_name = parse_symbol(name, package=package)

    try:
        try:
            module = imp(  # type: ignore
                module_name or "",
                package=package,
                # kwargs can be used to extend symbol_by_name when a custom
                # `imp` function is used.
                # importib does not support additional arguments
                # beyond (name, package=None), so we have to silence
                # mypy error here.
                **kwargs,
            )
        except ValueError as exc:
            raise ValueError(f"Cannot import {name!r}: {exc}").with_traceback(
                sys.exc_info()[2]
            ) from exc
        if attribute_name:
            return cast(_T, getattr(module, attribute_name))
        else:
            return cast(_T, module)
    except (ImportError, AttributeError):
        if default is None:
            raise
    return default