Source code for faust.utils.cron

"""Crontab Utilities."""

import time
from datetime import datetime, tzinfo

from croniter.croniter import croniter


[docs]def secs_for_next(cron_format: str, tz: tzinfo = None) -> float: """Return seconds until next execution given Crontab style format.""" now_ts = time.time() # If we have a tz object we'll make now timezone aware, and # if not will set now to be the current timestamp (tz # unaware) # If we have tz, now will be a datetime, if not an integer now = tz and datetime.now(tz) or now_ts cron_it = croniter(cron_format, start_time=now) return cron_it.get_next(float) - now_ts