Source code for faust.auth
"""Authentication Credentials."""
import ssl
from typing import Any, Optional, Union
from aiokafka.conn import AbstractTokenProvider
from faust.types.auth import AuthProtocol, CredentialsT, SASLMechanism
__all__ = [
"Credentials",
"SASLCredentials",
"OAuthCredentials",
"GSSAPICredentials",
"SSLCredentials",
]
[docs]class Credentials(CredentialsT):
"""Base class for authentication credentials."""
[docs]class SASLCredentials(Credentials):
"""Describe SASL credentials."""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.PLAIN
username: Optional[str]
password: Optional[str]
ssl_context: Optional[ssl.SSLContext]
def __init__(
self,
*,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_context: ssl.SSLContext = None,
mechanism: Union[str, SASLMechanism] = None,
) -> None:
self.username = username
self.password = password
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
if mechanism is not None:
self.mechanism = SASLMechanism(mechanism)
def __repr__(self) -> str:
return f"<{type(self).__name__}: username={self.username}>"
[docs]class OAuthCredentials(Credentials):
"""Describe OAuth Bearer credentials over SASL"""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.OAUTHBEARER
ssl_context: Optional[ssl.SSLContext]
def __init__(
self,
*,
oauth_cb: AbstractTokenProvider,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.oauth_cb = oauth_cb
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
def __repr__(self) -> str:
return "<{0}: oauth credentials {1} SSL support".format(
type(self).__name__,
"with" if self.protocol == AuthProtocol.SASL_SSL else "without",
)
[docs]class GSSAPICredentials(Credentials):
"""Describe GSSAPI credentials over SASL."""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.GSSAPI
ssl_context: Optional[ssl.SSLContext]
def __init__(
self,
*,
kerberos_service_name: str = "kafka",
kerberos_domain_name: Optional[str] = None,
ssl_context: ssl.SSLContext = None,
mechanism: Union[str, SASLMechanism] = None,
) -> None:
self.kerberos_service_name = kerberos_service_name
self.kerberos_domain_name = kerberos_domain_name
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
if mechanism is not None:
self.mechanism = SASLMechanism(mechanism)
def __repr__(self) -> str:
return "<{0}: kerberos service={1!r} domain={2!r}".format(
type(self).__name__,
self.kerberos_service_name,
self.kerberos_domain_name,
)
[docs]class SSLCredentials(Credentials):
"""Describe SSL credentials/settings."""
protocol = AuthProtocol.SSL
context: ssl.SSLContext
def __init__(
self,
context: ssl.SSLContext = None,
*,
purpose: Any = None,
cafile: Optional[str] = None,
capath: Optional[str] = None,
cadata: Optional[str] = None,
) -> None:
if context is None:
context = ssl.create_default_context(
purpose=purpose,
cafile=cafile,
capath=capath,
cadata=cadata,
)
self.context = context
def __repr__(self) -> str:
return f"<{type(self).__name__}: context={self.context}>"