Source code for airflow_pydantic.utils.timetables
from datetime import timedelta
from pydantic_extra_types.timezone_name import TimeZoneName
from ..airflow import (
CronDataIntervalTimetable as BaseCronDataIntervalTimetable,
CronTriggerTimetable as BaseCronTriggerTimetable,
DeltaDataIntervalTimetable as BaseDeltaDataIntervalTimetable,
DeltaTriggerTimetable as BaseDeltaTriggerTimetable,
EventsTimetable as BaseEventsTimetable,
MultipleCronTriggerTimetable as BaseMultipleCronTriggerTimetable,
)
from ..core import BaseModel
from .common import DatetimeArg
from .relativedelta import RelativeDelta
__all__ = (
"CronDataIntervalTimetable",
"CronTriggerTimetable",
"DeltaDataIntervalTimetable",
"DeltaTriggerTimetable",
"EventsTimetable",
"FixedTimezone",
"MultipleCronTriggerTimetable",
"Timezone",
)
Timezone = TimeZoneName
FixedTimezone = timedelta
[docs]
class CronTriggerTimetable(BaseModel):
cron: str
timezone: str | Timezone | FixedTimezone | None = None
interval: timedelta | RelativeDelta | None = None
run_immediately: bool | timedelta | None = None
[docs]
def instance(self) -> BaseCronTriggerTimetable:
return BaseCronTriggerTimetable(**self.model_dump(exclude_unset=True))
[docs]
class MultipleCronTriggerTimetable(BaseModel):
crons: list[str]
timezone: str | Timezone | FixedTimezone
interval: timedelta | RelativeDelta | None = None
run_immediately: bool | timedelta | None = None
[docs]
def instance(self) -> BaseMultipleCronTriggerTimetable:
return BaseMultipleCronTriggerTimetable(*self.crons, **self.model_dump(exclude_unset=True, exclude=["crons"]))
[docs]
class CronDataIntervalTimetable(BaseModel):
cron: str
timezone: str | Timezone | FixedTimezone | None = None
[docs]
def instance(self) -> BaseCronDataIntervalTimetable:
return BaseCronDataIntervalTimetable(**self.model_dump(exclude_unset=True))
[docs]
class DeltaDataIntervalTimetable(BaseModel):
delta: timedelta | RelativeDelta
[docs]
def instance(self) -> BaseDeltaDataIntervalTimetable:
return BaseDeltaDataIntervalTimetable(**self.model_dump(exclude_unset=True))