Source code for faust.types.auth

import ssl
from enum import Enum
from typing import Optional, Union

__all__ = [
    "AUTH_PROTOCOLS_SSL",
    "AUTH_PROTOCOLS_SASL",
    "AuthProtocol",
    "CredentialsArg",
    "CredentialsT",
    "SASLMechanism",
    "to_credentials",
]


[docs]class AuthProtocol(Enum): SSL = "SSL" PLAINTEXT = "PLAINTEXT" SASL_PLAINTEXT = "SASL_PLAINTEXT" SASL_SSL = "SASL_SSL"
[docs]class SASLMechanism(Enum): PLAIN = "PLAIN" GSSAPI = "GSSAPI" SCRAM_SHA_256 = "SCRAM-SHA-256" SCRAM_SHA_512 = "SCRAM-SHA-512" OAUTHBEARER = "OAUTHBEARER"
AUTH_PROTOCOLS_SSL = {AuthProtocol.SSL, AuthProtocol.SASL_SSL} AUTH_PROTOCOLS_SASL = {AuthProtocol.SASL_PLAINTEXT, AuthProtocol.SASL_SSL}
[docs]class CredentialsT: protocol: AuthProtocol
CredentialsArg = Union[CredentialsT, ssl.SSLContext]
[docs]def to_credentials(obj: CredentialsArg = None) -> Optional[CredentialsT]: if obj is not None: from faust.auth import SSLCredentials # XXX :( if isinstance(obj, ssl.SSLContext): return SSLCredentials(obj) if isinstance(obj, CredentialsT): return obj from faust.exceptions import ImproperlyConfigured raise ImproperlyConfigured(f"Unknown credentials type {type(obj)}: {obj}") return None