"""Window Types."""
import os
import typing
from math import floor
from typing import List, Optional, Type, cast
from mode import Seconds, want_seconds
from .types.windows import WindowRange, WindowRange_from_start, WindowT
__all__ = [
"Window",
"HoppingWindow",
"TumblingWindow",
"SlidingWindow",
]
NO_CYTHON = bool(os.environ.get("NO_CYTHON", False))
[docs]class Window(WindowT):
"""Base class for window types."""
class _PyHoppingWindow(Window):
"""Hopping window type.
Fixed-size, overlapping windows.
"""
size: float
step: float
def __init__(
self, size: Seconds, step: Seconds, expires: Optional[Seconds] = None
) -> None:
self.size = want_seconds(size)
self.step = want_seconds(step)
self.expires = want_seconds(expires) if expires else None
def ranges(self, timestamp: float) -> List[WindowRange]:
start = self._start_initial_range(timestamp)
return [
WindowRange_from_start(float(start), self.size)
for start in range(int(start), int(timestamp) + 1, int(self.step))
]
def stale(self, timestamp: float, latest_timestamp: float) -> bool:
return (
timestamp <= self._stale_before(latest_timestamp, self.expires)
if self.expires
else False
)
def current(self, timestamp: float) -> WindowRange:
"""Get the latest window range for a given timestamp."""
step = self.step
start = self._start_initial_range(timestamp)
m = floor((timestamp - start) / step)
return WindowRange_from_start(start + (step * m), self.size)
def delta(self, timestamp: float, d: Seconds) -> WindowRange:
return self.current(timestamp - want_seconds(d))
def earliest(self, timestamp: float) -> WindowRange:
start = self._start_initial_range(timestamp)
return WindowRange_from_start(float(start), self.size)
def _start_initial_range(self, timestamp: float) -> float:
closest_step = (timestamp // self.step) * self.step
return closest_step - self.size + self.step
def _stale_before(self, latest_timestamp: float, expires: float) -> float:
return self.current(latest_timestamp - expires)[0]
if typing.TYPE_CHECKING:
HoppingWindow = _PyHoppingWindow
else:
if not NO_CYTHON: # pragma: no cover
try:
from ._cython.windows import HoppingWindow
except ImportError:
HoppingWindow = _PyHoppingWindow
else:
# isinstance(HoppingWindow, Window) is True
# isinstance(HoppingWindow, WindowT) is True
Window.register(HoppingWindow)
else: # pragma: no cover
HoppingWindow = _PyHoppingWindow
[docs]class TumblingWindow(HoppingWindow):
"""Tumbling window type.
Fixed-size, non-overlapping, gap-less windows.
"""
def __init__(self, size: Seconds, expires: Optional[Seconds] = None) -> None:
super(TumblingWindow, self).__init__(size, size, expires)
class _PySlidingWindow(Window):
"""Sliding window type.
Notes:
Fixed-size, overlapping windows that work on differences between
record timestamps
"""
before: float
after: float
def __init__(self, before: Seconds, after: Seconds, expires: Seconds) -> None:
self.before = want_seconds(before)
self.after = want_seconds(after)
self.expires = want_seconds(expires)
def ranges(self, timestamp: float) -> List[WindowRange]:
"""Return list of windows from timestamp.
Notes:
.. sourcecode:: sql
SELECT *
FROM s1, s2
WHERE
s1.key = s2.key
AND
s1.ts - before <= s2.ts AND s2.ts <= s1.ts + after
"""
return [
(timestamp - self.before, timestamp + self.after),
]
def stale(self, timestamp: float, latest_timestamp: float) -> bool:
return (
timestamp <= self._stale_before(self.expires, latest_timestamp)
if self.expires
else False
)
def _stale_before(self, expires: float, latest_timestamp: float) -> float:
return latest_timestamp - expires
def current(self, timestamp: float) -> WindowRange:
"""Get the latest window range for a given timestamp."""
return timestamp - self.before, timestamp + self.after
def delta(self, timestamp: float, d: Seconds) -> WindowRange:
return self.current(timestamp - want_seconds(d))
def earliest(self, timestamp: float) -> WindowRange:
return self.current(timestamp)
if not NO_CYTHON: # pragma: no cover
try:
from ._cython.windows import SlidingWindow
except ImportError:
SlidingWindow = _PySlidingWindow
else:
SlidingWindow = cast(Type[Window], SlidingWindow)
Window.register(SlidingWindow)
else: # pragma: no cover
SlidingWindow = _PySlidingWindow