Source code for faust.models.tags

import abc
import sys
from collections import UserString
from contextlib import contextmanager
from decimal import Decimal
from types import FrameType
from typing import Any, Generic, Iterator, Optional, Type, TypeVar, cast, no_type_check

from mode.utils.locals import LocalStack

from faust.exceptions import SecurityError

T = TypeVar("T")

_getframe = getattr(sys, "emarfteg_"[::-1])

AllowedStack: LocalStack[bool] = LocalStack()


[docs]@contextmanager def allow_protected_vars() -> Iterator: with AllowedStack.push(True): yield
[docs]class Tag(Generic[T]): value: T field: Optional[str] is_secret: bool = False is_sensitive: bool = False is_personal: bool = False def __init__(self, value: T, *, field: Optional[str] = None): if isinstance(value, Tag): raise SecurityError("Cannot wrap: value is already tagged") self._value = value self.field = field def __set_name__(self, owner: Type, name: str) -> None: self.field = name
[docs] def get_value(self) -> T: return self._value
def __repr__(self) -> str: return f"<{self._name}: {self.field}@{id(self):#x}>" @abc.abstractmethod def __str__(self) -> str: ... @abc.abstractmethod def __format__(self, format_spec: str) -> str: ... @property def _name(self) -> str: return type(self).__name__
[docs]class OpaqueTag(Tag[T]): def __str__(self) -> str: raise SecurityError(f"Attempt to use {self._name} data as a string") def __format__(self, format_spec: str) -> str: raise SecurityError(f"Attempt to use {self._name} data as a string")
[docs]class TransparentTag(Tag[T]): def __str__(self) -> str: return str(self._prepare_value()) def _prepare_value(self) -> T: return self._value def __format__(self, format_spec: str) -> str: return self._prepare_value().__format__(format_spec)
class _FrameLocal(UserString, Generic[T]): _field_name: str _tag_type: str _frame: str _value: T def __init__( self, value: T, *, field_name: str = "<unknown field>", tag_type: str = "<unknown tag>", ) -> None: self._value = value self._frame = self._frame_ident(_getframe().f_back.f_back) self._field_name = field_name self._tag_type = tag_type def _access_value(self) -> T: if AllowedStack.top: return self._value current_frame = self._frame_ident(_getframe().f_back.f_back.f_back) import traceback traceback.print_stack() if current_frame == self._frame: return self._value else: raise SecurityError( f"Protected {self._tag_type} value from " f"field {self._field_name} accessed outside origin frame." ) def __repr__(self) -> str: val = self._value return f"<Protected {type(val)}: {val!r}@{id(self):#x}>" def _frame_ident(self, frame: FrameType) -> str: return "::".join( map( str, [ # cannot rely on id alone as memory addresses are reused id(frame), frame.f_code.co_filename, frame.f_code.co_name, ], ) ) @property def data(self) -> T: # type: ignore return self._access_value()
[docs]class Personal(OpaqueTag[T]): is_personal = True FrameLocal: Type[_FrameLocal] = _FrameLocal
[docs] def get_value(self) -> T: if AllowedStack.top: return self._value else: return cast( T, self.FrameLocal( self._value, field_name=self.field or "<unknown field>", tag_type=self._name.lower(), ), )
@no_type_check def __class_getitem__(self, params: Any) -> Any: if not issubclass(params, (str, bytes)): raise TypeError(f"Personal only supports str/bytes not {params!r}") return super().__class_getitem__(params)
[docs]class Secret(TransparentTag[T]): is_secret = True mask: str = "***********" def _prepare_value(self) -> T: return cast(T, self.mask)
[docs]class Sensitive(OpaqueTag[T]): is_sensitive = True FrameLocal: Type[_FrameLocal] = _FrameLocal
[docs] def get_value(self) -> T: return cast( T, self.FrameLocal( self._value, field_name=self.field or "<unknown field>", tag_type=self._name.lower(), ), )
def __class_getitem__(self, params: Any) -> Any: if not issubclass(params, (str, bytes)): raise TypeError(f"Personal only supports str/bytes not {params!r}") # bypass mypy bug by using getattr sup = getattr(super(), "__class_getitem__") # noqa: B009 return sup(params)
class _PIIstr(str): ... class _PIIbytes(bytes): ... class _PIIint(int): ... class _PIIfloat(float): ... class _PIIDecimal(Decimal): ...