Source code for faust.types.auth
import ssl
from enum import Enum
from typing import Optional, Union
__all__ = [
"AUTH_PROTOCOLS_SSL",
"AUTH_PROTOCOLS_SASL",
"AuthProtocol",
"CredentialsArg",
"CredentialsT",
"SASLMechanism",
"to_credentials",
]
[docs]class AuthProtocol(Enum):
SSL = "SSL"
PLAINTEXT = "PLAINTEXT"
SASL_PLAINTEXT = "SASL_PLAINTEXT"
SASL_SSL = "SASL_SSL"
[docs]class SASLMechanism(Enum):
PLAIN = "PLAIN"
GSSAPI = "GSSAPI"
SCRAM_SHA_256 = "SCRAM-SHA-256"
SCRAM_SHA_512 = "SCRAM-SHA-512"
OAUTHBEARER = "OAUTHBEARER"
AUTH_PROTOCOLS_SSL = {AuthProtocol.SSL, AuthProtocol.SASL_SSL}
AUTH_PROTOCOLS_SASL = {AuthProtocol.SASL_PLAINTEXT, AuthProtocol.SASL_SSL}
[docs]class CredentialsT:
protocol: AuthProtocol
CredentialsArg = Union[CredentialsT, ssl.SSLContext]
[docs]def to_credentials(obj: CredentialsArg = None) -> Optional[CredentialsT]:
if obj is not None:
from faust.auth import SSLCredentials # XXX :(
if isinstance(obj, ssl.SSLContext):
return SSLCredentials(obj)
if isinstance(obj, CredentialsT):
return obj
from faust.exceptions import ImproperlyConfigured
raise ImproperlyConfigured(f"Unknown credentials type {type(obj)}: {obj}")
return None