"""OpenTracing utilities."""
import asyncio
import sys
import typing
from contextvars import ContextVar
from functools import wraps
from typing import Any, Callable, Optional, Tuple
import opentracing
from mode import shortlabel
__all__ = [
"current_span",
"noop_span",
"set_current_span",
"finish_span",
"operation_name_from_fun",
"traced_from_parent_span",
"call_with_trace",
]
if typing.TYPE_CHECKING:
_current_span: ContextVar[opentracing.Span]
_current_span = ContextVar("current_span")
[docs]def current_span() -> Optional[opentracing.Span]:
"""Get the current span for this context (if any)."""
return _current_span.get(None)
[docs]def set_current_span(span: opentracing.Span) -> None:
"""Set the current span for the current context."""
_current_span.set(span)
[docs]def noop_span() -> opentracing.Span:
"""Return a span that does nothing when traced."""
return opentracing.Tracer()._noop_span
[docs]def finish_span(
span: Optional[opentracing.Span], *, error: BaseException = None
) -> None:
"""Finish span, and optionally set error tag."""
if span is not None:
if error:
span.__exit__(type(error), error, error.__traceback__)
else:
span.finish()
[docs]def operation_name_from_fun(fun: Any) -> str:
"""Generate opentracing name from function."""
obj = getattr(fun, "__self__", None)
if obj is not None:
objlabel = shortlabel(obj)
funlabel = shortlabel(fun)
if funlabel.startswith(objlabel):
# remove obj name from function label
funlabel = funlabel[len(objlabel) :]
return f"{objlabel}-{funlabel}"
else:
return f"{shortlabel(fun)}"
[docs]def traced_from_parent_span(
parent_span: opentracing.Span = None,
callback: Optional[Callable] = None,
**extra_context: Any,
) -> Callable:
"""Decorate function to be traced from parent span."""
def _wrapper(fun: Callable, **more_context: Any) -> Callable:
operation_name = operation_name_from_fun(fun)
@wraps(fun)
def _inner(*args: Any, **kwargs: Any) -> Any:
parent = parent_span
if parent is None:
parent = current_span()
if parent is not None:
child = parent.tracer.start_span(
operation_name=operation_name,
child_of=parent,
tags={**extra_context, **more_context},
)
if callback is not None:
callback(child)
on_exit = (_restore_span, (parent, child))
set_current_span(child)
return call_with_trace(child, fun, on_exit, *args, **kwargs)
return fun(*args, **kwargs)
return _inner
return _wrapper
def _restore_span(
span: opentracing.Span, expected_current_span: opentracing.Span
) -> None:
current = current_span()
assert current is expected_current_span
set_current_span(span)
[docs]def call_with_trace(
span: opentracing.Span,
fun: Callable,
callback: Optional[Tuple[Callable, Tuple[Any, ...]]],
*args: Any,
**kwargs: Any,
) -> Any:
"""Call function and trace it from parent span."""
cb: Optional[Callable] = None
cb_args: Tuple = ()
if callback:
cb, cb_args = callback
span.__enter__()
try:
ret = fun(*args, **kwargs)
except BaseException:
span.__exit__(*sys.exc_info())
raise
else:
if asyncio.iscoroutine(ret):
# if async def method, we attach our span to
# when it completes.
async def corowrapped() -> Any:
await_ret = None
try:
await_ret = await ret
except BaseException:
span.__exit__(*sys.exc_info())
if cb:
cb(*cb_args)
raise
else:
span.__exit__(None, None, None)
if cb:
cb(*cb_args)
return await_ret
return corowrapped()
else:
# for non async def method, we just exit the span.
span.__exit__(None, None, None)
if cb:
cb(*cb_args)
return ret