22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 | class Timer:
"""Timer state."""
interval: Seconds
interval_s: float
max_drift: float
min_interval_s: float
max_interval_s: float
last_wakeup_at: float
last_yield_at: float
iteration: int
def __init__(
self,
interval: Seconds,
*,
max_drift_correction: float = 0.1,
name: str = "",
clock: ClockArg = perf_counter,
sleep: SleepArg = asyncio.sleep,
) -> None:
self.interval = interval
self.max_drift_correction = max_drift_correction
self.name = name
self.clock: ClockArg = clock
self.sleep: SleepArg = sleep
interval_s = self.interval_s = want_seconds(interval)
# Log when drift exceeds this number
self.max_drift = min(interval_s * MAX_DRIFT_PERCENT, MAX_DRIFT_CEILING)
if interval_s > self.max_drift_correction:
self.min_interval_s = interval_s - self.max_drift_correction
self.max_interval_s = interval_s + self.max_drift_correction
else:
self.min_interval_s = self.max_interval_s = interval_s
# If the loop calls asyncio.sleep(interval)
# it will always wake up a little bit late, and can eventually
# drift, Using the algorithm below we will usually
# wake up slightly early instead (usually around 0.001s),
# and often actually end up being able to correct the drift
# entirely around 20% of the time. Also add entropy
# to timers.
# first yield interval
# usually callers will sleep before starting first timer
self.epoch = self.clock()
# time of last timer run, updated after each run.
self.last_wakeup_at = self.epoch
# time of last timer run, only including the time
# spent sleeping, not the time running timer callbacks.
self.last_yield_at = self.epoch
self.iteration = 0
self.drifting = 0
self.drifting_early = 0
self.drifting_late = 0
self.overlaps = 0
async def __aiter__(self) -> AsyncIterator[float]:
for _ in count():
sleep_time = self.tick()
await self.sleep(sleep_time)
self.on_before_yield()
yield sleep_time
else: # pragma: no cover
pass # never exits
def adjust_interval(self, drift: float) -> float:
interval_s = self.interval_s
interval_with_drift = interval_s + drift
if interval_with_drift > interval_s:
return min(interval_with_drift, self.max_interval_s)
elif interval_with_drift < interval_s:
return max(interval_with_drift, self.min_interval_s)
else:
return interval_s
def tick(self) -> float:
interval_s = self.interval_s
now = self.clock()
if self.last_yield_at == self.epoch:
self.last_wakeup_at = now
return interval_s
since_epoch = now - self.epoch
time_spent_sleeping = self.last_yield_at - self.last_wakeup_at
time_spent_yielding = now - self.last_wakeup_at - time_spent_sleeping
drift = interval_s - time_spent_sleeping
new_interval = self.adjust_interval(drift)
if interval_s >= 1.0 and abs(drift) >= self.max_drift:
self.drifting += 1
if drift < 0:
self.drifting_late += 1
logger.info(
"Timer %s woke up too late, with a drift of +%r "
"runtime=%r sleeptime=%r",
self.name,
abs(drift),
time_spent_yielding,
time_spent_sleeping,
)
else:
self.drifting_early += 1
logger.info(
"Timer %s woke up too early, with a drift of -%r "
"runtime=%r sleeptime=%r",
self.name,
abs(drift),
time_spent_yielding,
time_spent_sleeping,
)
else:
logger.debug(
"Timer %s woke up - iteration=%r "
"time_spent_sleeping=%r drift=%r "
"new_interval=%r since_epoch=%r",
self.name,
self.iteration,
time_spent_sleeping,
drift,
new_interval,
since_epoch,
)
if time_spent_yielding > interval_s:
self.overlaps += 1
logger.warning(
"Timer %s is overlapping (interval=%r runtime=%r)",
self.name,
self.interval,
time_spent_yielding,
)
self.iteration += 1
self.last_wakeup_at = now
return new_interval
def on_before_yield(self) -> None:
self.last_yield_at = self.clock()
|