"""Parsing Type Expressions.
This module contains tools for parsing type expressions such as
``List[Mapping[str, Tuple[int, Tuple[str, str]]]]``,
then converting that to a generator expression that can be used
to deserialize such a structure.
"""
import abc
import os
import random
import string
import sys
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from enum import Enum
from itertools import count
from types import FrameType
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Counter,
Dict,
Iterator,
List,
NamedTuple,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)
from mode.utils.objects import (
DICT_TYPES,
LIST_TYPES,
SET_TYPES,
TUPLE_TYPES,
cached_property,
is_optional,
is_union,
qualname,
)
from faust.types.models import CoercionHandler, CoercionMapping, IsInstanceArgT, ModelT
from faust.utils import codegen
from faust.utils.functional import translate
from faust.utils.iso8601 import parse as parse_iso8601
from faust.utils.json import str_to_decimal
if TYPE_CHECKING: # pragma: no cover
from typing_extensions import Final
else: # pragma: no cover
try:
from typing import Final
except ImportError:
Final = object
__all__ = ["NodeType", "TypeExpression"]
T = TypeVar("T")
#: Used to denote an argument that is not present.
MISSING: Final = object()
#: Used to generate unique variable names.
TUPLE_NAME_COUNTER = count(0)
#: Tuple of types that are native to JSON.
JSON_TYPES: IsInstanceArgT = ( # XXX FIXME
str,
list,
dict,
int,
float,
Decimal,
)
#: Tuple of built-in scalar types.
LITERAL_TYPES: IsInstanceArgT = (str, bytes, float, int)
DEBUG = bool(os.environ.get("TYPEXPR_DEBUG", False))
#: Mapping of characters that are illegal in variable names
#: to a suitable replacement.
QUALNAME_TRANSLATION_TABLE = {
".": "__",
"@": "__",
">": "",
"<": "",
}
def qualname_to_identifier(s: str) -> str:
"""Translate `qualname(s)` to suitable variable name."""
return translate(QUALNAME_TRANSLATION_TABLE, s)
# we don't want linters/Python to complain that we are using this.
_getframe: Callable[[int], FrameType] = getattr(sys, "_getframe") # noqa
[docs]class NodeType(Enum):
ROOT = "ROOT"
UNION = "UNION"
ANY = "ANY"
LITERAL = "LITERAL"
DATETIME = "DATETIME"
DECIMAL = "DECIMAL"
NAMEDTUPLE = "NAMEDTUPLE"
TUPLE = "TUPLE"
SET = "SET"
DICT = "DICT"
LIST = "LIST"
MODEL = "MODEL"
USER = "USER"
#: Set of user node types.
USER_TYPES = frozenset(
{
NodeType.DATETIME,
NodeType.DECIMAL,
NodeType.USER,
NodeType.MODEL,
}
)
#: Set of generic node types (lists/dicts/etc.).
GENERIC_TYPES = frozenset(
{
NodeType.TUPLE,
NodeType.SET,
NodeType.DICT,
NodeType.LIST,
NodeType.NAMEDTUPLE,
}
)
#: Set of types that don't have a field descriptor class.
NONFIELD_TYPES = frozenset(
{
NodeType.NAMEDTUPLE,
NodeType.MODEL,
NodeType.USER,
}
)
class TypeInfo(NamedTuple):
type: Type
args: Tuple
is_optional: bool
def _TypeInfo_from_type(typ: Type, *, optional: bool = False) -> TypeInfo:
# Python 3.6.0 does not support classmethod in NamedTuple
return TypeInfo(
type=typ,
args=tuple(getattr(typ, "__args__", None) or ()),
is_optional=optional,
)
class Variable:
def __init__(self, name: str, *, getitem: Any = None) -> None:
self.name = name
self.getitem = getitem
def __str__(self) -> str:
if self.getitem is not None:
return f"{self.name}[{self.getitem}]"
else:
return self.name
def __repr__(self) -> str:
return f"<{type(self).__name__}: {self}>"
def __getitem__(self, name: Any) -> "Variable":
return self.clone(getitem=name)
def clone(
self, *, name: Optional[str] = None, getitem: Any = MISSING
) -> "Variable":
return type(self)(
name=name if name is not None else self.name,
getitem=getitem if getitem is not MISSING else self.getitem,
)
def next_identifier(self) -> "Variable":
name = self.name
next_ord = ord(name[-1]) + 1
if next_ord > 122:
name = name + "a"
return self.clone(
name=name[:-1] + chr(next_ord),
getitem=None,
)
class Node(abc.ABC):
BUILTIN_TYPES: ClassVar[Dict[NodeType, Type["Node"]]] = {}
type: ClassVar[NodeType]
use_origin: ClassVar[bool] = False
compatible_types: IsInstanceArgT
expr: Type
root: "RootNode"
def __init_subclass__(self) -> None:
self._register()
@classmethod
def _register(cls) -> None:
# NOTE The order in which we define node classes
# matter.
# For example LiteralNode must always be declared first
# as issubclass(str, Sequence) is True and ListNode will eat
# it up. The same is true for tuples: NamedTupleNode must
# be defined before TupleNode.
cls.BUILTIN_TYPES[cls.type] = cls
@classmethod
def create_if_compatible(cls, typ: Type, *, root: "RootNode") -> Optional["Node"]:
if cls.compatible_types:
target_type: Type = typ
if cls.use_origin:
target_type = getattr(typ, "__origin__", None) or typ
if cls._issubclass(target_type, cls.compatible_types):
return cls(typ, root=root)
return None
@classmethod
def _issubclass(cls, typ: Type, types: IsInstanceArgT) -> bool:
try:
return issubclass(typ, types)
except (AttributeError, TypeError):
return False
@classmethod
def inspect_type(cls, typ: Type) -> TypeInfo:
optional = is_optional(typ)
if optional:
args = getattr(typ, "__args__", ())
union_args: List[Type] = []
found_none = False
for arg in args:
if _is_NoneType(arg):
found_none = True
else:
union_args.append(arg)
if len(union_args) == 1:
assert found_none
return _TypeInfo_from_type(union_args[0], optional=True)
return _TypeInfo_from_type(typ, optional=True)
return _TypeInfo_from_type(typ, optional=False)
def __init__(self, expr: Type, root: "RootNode" = None) -> None:
assert root is not None
assert root.type is NodeType.ROOT
self.expr: Type = expr
self.root = root
self.root.type_stats[self.type] += 1
assert self.root.type_stats[NodeType.ROOT] == 1
self.__post_init__()
if DEBUG:
print(f"NODE {self!r}")
def __post_init__(self) -> None: # noqa: B027
...
def random_identifier(self, n: int = 8) -> str:
return "".join(
random.choice(string.ascii_letters) for _ in range(n) # nosec B311
)
@abc.abstractmethod
def build(self, var: Variable, *args: Type) -> str: ...
def __repr__(self) -> str:
return f"<{type(self).__name__}: {self.expr!r}>"
class AnyNode(Node):
type = NodeType.ANY
compatible_types = ()
@classmethod
def create_if_compatible(cls, typ: Type, *, root: "RootNode") -> Optional["Node"]:
if typ is Any:
return cls(typ, root=root)
return None
def build(self, var: Variable, *args: Type) -> str:
return f"{var}"
class UnionNode(Node):
type = NodeType.UNION
compatible_types = ()
use_origin = True
@classmethod
def _maybe_unroll_union(cls, info: TypeInfo) -> TypeInfo:
if is_union(info.type):
assert len(info.args) > 1
if cls._all_types_match(ModelT, info.args):
return _TypeInfo_from_type(ModelT, optional=info.is_optional)
elif cls._all_types_match(LITERAL_TYPES, info.args):
return _TypeInfo_from_type(info.args[0], optional=info.is_optional)
return info
@classmethod
def create_if_compatible(cls, typ: Type, *, root: "RootNode") -> Optional["Node"]:
if is_union(typ):
return cls(typ, root=root)
return None
@classmethod
def _all_types_match(cls, typ: IsInstanceArgT, union_args: Tuple) -> bool:
return all(cls._issubclass(x, typ) for x in cls._filter_NoneType(union_args))
@classmethod
def _filter_NoneType(self, union_args: Tuple) -> Iterator:
return (x for x in union_args if not _is_NoneType(x))
def build(self, var: Variable, *args: Type) -> str:
raise NotImplementedError(f"Union of types {args!r} not supported")
class LiteralNode(Node):
type = NodeType.LITERAL
compatible_types = LITERAL_TYPES
def build(self, var: Variable, *args: Type) -> str:
return f"{var}"
class DecimalNode(Node):
type = NodeType.DECIMAL
compatible_types = (Decimal,)
def __post_init__(self) -> None:
self.root.found_types[self.type].add(self.expr)
def build(self, var: Variable, *args: Type) -> str:
self.root.add_closure("_Decimal_", "__Decimal__", self._maybe_coerce)
return f"_Decimal_({var})"
@staticmethod
def _maybe_coerce(value: Union[str, Decimal] = None) -> Optional[Decimal]:
if value is not None:
if not isinstance(value, Decimal):
return str_to_decimal(value)
return value
return None
class DatetimeNode(Node):
type = NodeType.DATETIME
compatible_types = (datetime,)
def __post_init__(self) -> None:
self.root.found_types[self.type].add(self.expr)
def build(self, var: Variable, *args: Type) -> str:
self.root.add_closure(
"_iso8601_parse_", "__iso8601_parse__", self._maybe_coerce
)
return f"_iso8601_parse_({var})"
def _maybe_coerce(self, value: Union[str, datetime] = None) -> Optional[datetime]:
if value is not None:
if isinstance(value, str):
return self.root.date_parser(value)
return value
return None
class NamedTupleNode(Node):
type = NodeType.NAMEDTUPLE
compatible_types = TUPLE_TYPES
@classmethod
def create_if_compatible(cls, typ: Type, *, root: "RootNode") -> Optional["Node"]:
if cls._issubclass(typ, cls.compatible_types):
if (
"_asdict" in typ.__dict__
and "_make" in typ.__dict__
and "_fields" in typ.__dict__
):
return cls(typ, root=root)
return None
def build(self, var: Variable, *args: Type) -> str:
self.root.add_closure(self.local_name, self.global_name, self.expr)
tup = self.expr
fields = ", ".join(
"{0}={1}".format(field, self.root.build(var[i], typ))
for i, (field, typ) in enumerate(tup.__annotations__.items())
)
return f"{self.local_name}({fields})"
def next_namedtuple_name(self, typ: Type[Tuple]) -> str:
num = next(TUPLE_NAME_COUNTER)
return f"namedtuple_{num}_{typ.__name__}"
@cached_property
def local_name(self) -> str:
return self.next_namedtuple_name(self.expr)
@cached_property
def global_name(self) -> str:
return "_" + self.local_name + "_"
class TupleNode(Node):
type = NodeType.TUPLE
compatible_types = TUPLE_TYPES
use_origin = True
def build(self, var: Variable, *args: Type) -> str:
if not args:
return self._build_untyped_tuple(var)
for position, arg in enumerate(args):
if arg is Ellipsis:
assert position == 1
return self._build_vararg_tuple(var, args[0])
return self._build_tuple_literal(var, *args)
def _build_tuple_literal(self, var: Variable, *member_args: Type) -> str:
source = (
"("
+ ", ".join(
self.root.build(var[i], arg) for i, arg in enumerate(member_args)
)
+ ")"
)
if "," not in source:
return source[:-1] + ",)"
return source
def _build_untyped_tuple(self, var: Variable) -> str:
return f"tuple({var})"
def _build_vararg_tuple(self, var: Variable, member_type: Type) -> str:
item_var = var.next_identifier()
handler = self.root.build(item_var, member_type)
return f"tuple({handler} for {item_var} in {var})"
class SetNode(Node):
type = NodeType.SET
compatible_types = SET_TYPES
use_origin = True
def build(self, var: Variable, *args: Type) -> str:
if not args:
return f"set({var})"
return self._build_set_expression(var, *args)
def _build_set_expression(self, var: Variable, member_type: Type) -> str:
member_var = var.next_identifier()
handler = self.root.build(member_var, member_type)
return f"{{{handler} for {member_var} in {var}}}"
class DictNode(Node):
type = NodeType.DICT
compatible_types = DICT_TYPES
use_origin = True
def build(self, var: Variable, *args: Type) -> str:
if not args:
return f"dict({var})"
return self._build_dict_expression(var, *args)
def _build_dict_expression(
self, var: Variable, key_type: Type, value_type: Type
) -> str:
key_var = var.next_identifier()
value_var = key_var.next_identifier()
key_handler = self.root.build(key_var, key_type)
value_handler = self.root.build(value_var, value_type)
return (
f"{{{key_handler}: {value_handler} "
f"for {key_var}, {value_var} in {var}.items()}}"
)
class ListNode(Node):
type = NodeType.LIST
compatible_types = LIST_TYPES
use_origin = True
def build(self, var: Variable, *args: Type) -> str:
if not args:
return f"list({var})"
return self._build_list_expression(var, *args)
def _build_list_expression(self, var: Variable, item_type: Type) -> str:
item_var = var.next_identifier()
handler = self.root.build(item_var, item_type)
return f"[{handler} for {item_var} in {var}]"
class ModelNode(Node):
type = NodeType.MODEL
compatible_types = ()
def __post_init__(self) -> None:
self.root.found_types[self.type].add(self.expr)
@classmethod
def create_if_compatible(cls, typ: Type, *, root: "RootNode") -> Optional["Node"]:
if cls._is_model(typ):
return cls(typ, root=root)
return None
@classmethod
def _is_model(cls, typ: Type) -> bool:
try:
if issubclass(typ, ModelT):
return True
except TypeError:
pass
return False
def build(self, var: Variable, *args: Type) -> str:
model_name = self._ensure_model_name(self.expr)
return f"{model_name}._from_data_field({var})"
def _ensure_model_name(self, typ: Type) -> str:
try:
namespace = typ._options.namespace
except AttributeError:
# abstract model
model_name = "_Model_"
model_global_name = "__Model__"
self.root.add_closure(model_name, model_global_name, self.Model)
else:
model_name = qualname_to_identifier(namespace)
model_global_name = "__" + model_name + "__"
self.root.add_closure(model_name, model_global_name, self.expr)
return model_name
@cached_property
def Model(self) -> Type[ModelT]:
from .base import Model
return Model
class UserNode(Node):
type = NodeType.USER
compatible_types = ()
handler_name: str
def __init__(
self,
expr: Type,
root: "RootNode" = None,
*,
user_types: CoercionMapping = None,
handler: CoercionHandler,
) -> None:
super().__init__(expr, root)
self.handler: CoercionHandler = handler
self.handler_name = qualname_to_identifier(qualname(self.handler))
self.handler_global_name = "__" + self.handler_name + "__"
def __post_init__(self) -> None:
self.root.found_types[self.type].add(self.expr)
def _maybe_coerce(self, value: Any) -> Any:
if value is None:
return None
if isinstance(value, JSON_TYPES):
return self.handler(value)
return value
def build(self, var: Variable, *args: Type) -> str:
self.root.add_closure(
self.handler_name, self.handler_global_name, self._maybe_coerce
)
return f"{self.handler_name}({var})"
class RootNode(Node):
DEFAULT_NODE: ClassVar[Optional[Type["Node"]]] = None
type = NodeType.ROOT
type_stats: Counter[NodeType]
user_types: CoercionMapping
globals: Dict[str, Any]
closures: Dict[str, str]
found_types: Dict[NodeType, Set[Type]]
@classmethod
def _register(cls) -> None: ... # we do not register root nodes.
def add_closure(self, local_name: str, global_name: str, obj: Any) -> None:
self.globals[global_name] = obj
self.closures[local_name] = global_name
def __init__(
self,
expr: Type,
root: "RootNode" = None,
*,
user_types: CoercionMapping = None,
date_parser: Callable[[Any], datetime] = None,
) -> None:
assert self.type == NodeType.ROOT
self.type_stats = Counter()
self.user_types = user_types or {}
self.globals = {}
self.closures = {}
self.date_parser: Callable[[Any], datetime]
if date_parser is not None:
self.date_parser = date_parser
else:
self.date_parser = parse_iso8601
self.found_types = defaultdict(set)
super().__init__(expr, root=self)
def find_compatible_node_or_default(self, info: TypeInfo) -> "Node":
node = self.find_compatible_node(info)
if node is None:
return self.new_default_node(info.type)
else:
return node
def find_compatible_node(self, info: TypeInfo) -> Optional["Node"]:
for types, handler in self.user_types.items():
if self._issubclass(info.type, types):
return UserNode(info.type, root=self.root, handler=handler)
info = UnionNode._maybe_unroll_union(info)
for node_cls in self.BUILTIN_TYPES.values():
node = node_cls.create_if_compatible(info.type, root=self.root)
if node is not None:
return node
return None
def new_default_node(self, typ: Type) -> "Node":
if self.DEFAULT_NODE is None:
raise NotImplementedError(
f"Node of type {type(self).__name__} has no default node type"
)
return self.DEFAULT_NODE(typ, root=self.root)
[docs]class TypeExpression(RootNode):
DEFAULT_NODE = LiteralNode
type = NodeType.ROOT
compatible_types = ()
[docs] def as_function(
self,
*,
name: str = "expr",
argument_name: str = "a",
stacklevel: int = 1,
locals: Dict[str, Any] = None,
globals: Dict[str, Any] = None,
) -> Callable[[T], T]:
sourcecode = self.as_string(name=name, argument_name=argument_name)
if locals is None or globals is None and stacklevel:
frame = _getframe(stacklevel)
globals = frame.f_globals if globals is None else globals
locals = frame.f_locals if locals is None else locals
new_globals = dict(globals or {})
new_globals.update(self.globals)
if DEBUG:
print(f"SOURCE FOR {self!r} ->\n{sourcecode}")
return codegen.build_closure(
"__outer__",
sourcecode,
locals={} if locals is None else locals,
globals=new_globals,
)
[docs] def as_string(self, *, name: str = "expr", argument_name: str = "a") -> str:
expression = self.as_comprehension(argument_name)
return codegen.build_closure_source(
name,
args=[argument_name],
body=[f"return {expression}"],
closures=self.closures,
)
[docs] def as_comprehension(self, argument_name: str = "a") -> str:
return self.build(Variable(argument_name), self.expr)
[docs] def build(self, var: Variable, *args: Type) -> str:
return self._build_expression(var, *args)
def _build_expression(self, var: Variable, typ: Type) -> str:
type_info = self.inspect_type(typ)
node = self.find_compatible_node_or_default(type_info)
res = node.build(var, *type_info.args)
if type_info.is_optional:
return f"({res} if {var} is not None else None)"
else:
return res
@property
def has_models(self) -> bool:
return bool(self.type_stats[NodeType.MODEL])
@property
def has_custom_types(self) -> bool:
return bool(self.type_stats.keys() & USER_TYPES)
@property
def has_generic_types(self) -> bool:
return bool(self.type_stats.keys() & GENERIC_TYPES)
@property
def has_nonfield_types(self) -> bool:
return bool(self.type_stats.keys() & NONFIELD_TYPES)
def _is_NoneType(t: Any) -> bool:
return t is type(None) or t is None # noqa: E721