Skip to content

mode.timers

AsyncIO Timers.

Timer

Timer state.

Source code in mode/timers.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class Timer:
    """Timer state."""

    interval: Seconds
    interval_s: float

    max_drift: float
    min_interval_s: float
    max_interval_s: float

    last_wakeup_at: float
    last_yield_at: float
    iteration: int

    def __init__(
        self,
        interval: Seconds,
        *,
        max_drift_correction: float = 0.1,
        name: str = "",
        clock: ClockArg = perf_counter,
        sleep: SleepArg = asyncio.sleep,
    ) -> None:
        self.interval = interval
        self.max_drift_correction = max_drift_correction
        self.name = name
        self.clock: ClockArg = clock
        self.sleep: SleepArg = sleep
        interval_s = self.interval_s = want_seconds(interval)

        # Log when drift exceeds this number
        self.max_drift = min(interval_s * MAX_DRIFT_PERCENT, MAX_DRIFT_CEILING)

        if interval_s > self.max_drift_correction:
            self.min_interval_s = interval_s - self.max_drift_correction
            self.max_interval_s = interval_s + self.max_drift_correction
        else:
            self.min_interval_s = self.max_interval_s = interval_s

        # If the loop calls asyncio.sleep(interval)
        # it will always wake up a little bit late, and can eventually
        # drift, Using the algorithm below we will usually
        # wake up slightly early instead (usually around 0.001s),
        # and often actually end up being able to correct the drift
        # entirely around 20% of the time. Also add entropy
        # to timers.

        # first yield interval
        # usually callers will sleep before starting first timer
        self.epoch = self.clock()
        # time of last timer run, updated after each run.
        self.last_wakeup_at = self.epoch
        # time of last timer run, only including the time
        # spent sleeping, not the time running timer callbacks.
        self.last_yield_at = self.epoch

        self.iteration = 0
        self.drifting = 0
        self.drifting_early = 0
        self.drifting_late = 0
        self.overlaps = 0

    async def __aiter__(self) -> AsyncIterator[float]:
        for _ in count():
            sleep_time = self.tick()
            await self.sleep(sleep_time)
            self.on_before_yield()
            yield sleep_time
        else:  # pragma: no cover
            pass  # never exits

    def adjust_interval(self, drift: float) -> float:
        interval_s = self.interval_s
        interval_with_drift = interval_s + drift
        if interval_with_drift > interval_s:
            return min(interval_with_drift, self.max_interval_s)
        elif interval_with_drift < interval_s:
            return max(interval_with_drift, self.min_interval_s)
        else:
            return interval_s

    def tick(self) -> float:
        interval_s = self.interval_s
        now = self.clock()
        if self.last_yield_at == self.epoch:
            self.last_wakeup_at = now
            return interval_s
        since_epoch = now - self.epoch
        time_spent_sleeping = self.last_yield_at - self.last_wakeup_at
        time_spent_yielding = now - self.last_wakeup_at - time_spent_sleeping
        drift = interval_s - time_spent_sleeping

        new_interval = self.adjust_interval(drift)

        if interval_s >= 1.0 and abs(drift) >= self.max_drift:
            self.drifting += 1
            if drift < 0:
                self.drifting_late += 1
                logger.info(
                    "Timer %s woke up too late, with a drift of +%r "
                    "runtime=%r sleeptime=%r",
                    self.name,
                    abs(drift),
                    time_spent_yielding,
                    time_spent_sleeping,
                )
            else:
                self.drifting_early += 1
                logger.info(
                    "Timer %s woke up too early, with a drift of -%r "
                    "runtime=%r sleeptime=%r",
                    self.name,
                    abs(drift),
                    time_spent_yielding,
                    time_spent_sleeping,
                )
        else:
            logger.debug(
                "Timer %s woke up - iteration=%r "
                "time_spent_sleeping=%r drift=%r "
                "new_interval=%r since_epoch=%r",
                self.name,
                self.iteration,
                time_spent_sleeping,
                drift,
                new_interval,
                since_epoch,
            )

        if time_spent_yielding > interval_s:
            self.overlaps += 1
            logger.warning(
                "Timer %s is overlapping (interval=%r runtime=%r)",
                self.name,
                self.interval,
                time_spent_yielding,
            )

        self.iteration += 1
        self.last_wakeup_at = now

        return new_interval

    def on_before_yield(self) -> None:
        self.last_yield_at = self.clock()

timer_intervals(interval, max_drift_correction=0.1, name='', clock=perf_counter)

Generate timer sleep times.

Note

This function is deprecated, please use itertimer instead (this function also sleeps and calculates sleep time correctly.)

Source code in mode/timers.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def timer_intervals(  # XXX deprecated
    interval: Seconds,
    max_drift_correction: float = 0.1,
    name: str = "",
    clock: ClockArg = perf_counter,
) -> Iterator[float]:
    """Generate timer sleep times.

    !!! note
        This function is deprecated, please use `itertimer`
        instead (this function also sleeps and calculates sleep time correctly.)
    """
    state = Timer(
        interval,
        max_drift_correction=max_drift_correction,
        name=name,
        clock=clock,
    )
    for _ in count():
        sleep_time = state.tick()
        state.on_before_yield()  # includes callback time.
        yield sleep_time
    else:  # pragma: no cover
        pass  # never exits