Source code for airflow_supervisor.config.supervisor
from datetime import time, timedelta
from typing import Literal, Optional, Union
from airflow_pydantic import Pool
from pydantic import Field
from supervisor_pydantic import SupervisorConvenienceConfiguration
__all__ = (
"SupervisorAirflowConfiguration",
"load_airflow_config",
)
# PredefinedTemplates = Literal[
# "always_on",
# "half_day",
# # TODO add more
# ]
[docs]
class SupervisorAirflowConfiguration(SupervisorConvenienceConfiguration):
"""Settings that MUST be set when running in airflow"""
# Passthrough to PythonSensor in airflow-ha
check_interval: timedelta = Field(
default=timedelta(seconds=5), description="Interval between supervisor program status checks"
)
check_timeout: timedelta = Field(
default=timedelta(hours=8), description="Timeout to wait for supervisor program status checks"
)
# HighAvailabilityOperator custom args
runtime: Optional[timedelta] = Field(default=None, description="Max runtime of Supervisor job")
endtime: Optional[time] = Field(default=None, description="End time of Supervisor job")
maxretrigger: Optional[int] = Field(
default=None,
description="Max number of retriggers of Supervisor job (e.g. max number of checks separated by `check_interval`)",
)
reference_date: Literal["start_date", "logical_date", "data_interval_end"] = Field(
default="data_interval_end",
description="Reference date for the job. NOTE: Airflow schedules after end of date interval, so `data_interval_end` is the default",
)
# Airflow-specific settings
pool: Optional[Union[str, Pool]] = Field(
default=None,
description="Airflow pool to use for the job. If not set, the job will use the default pool, or the pool from a balancer host.",
)
"""Other Airflow Configuration"""
# Should the programs be stopped when the DAG finishes?
stop_on_exit: Optional[bool] = Field(default=True, description="Stop supervisor on dag completion")
# Should the supervisor folder be removed on dag completion?
cleanup: Optional[bool] = Field(
default=True, description="Cleanup supervisor folder on dag completion. Note: stop_on_exit must be True"
)
# For Jobs that do not shutdown, e.g. stop_on_exit=False, one might want to configure them to
# restart when the DAG is rescheduled by airflow or retriggered by airflow-ha
restart_on_initial: Optional[bool] = Field(
default=False,
description="Restart the job when the DAG is run directly via airflow (NOT retriggered). This is useful for jobs that do not shutdown",
)
restart_on_retrigger: Optional[bool] = Field(
default=False,
description="Restart the job when the DAG is retriggered. This is useful for jobs that do not shutdown",
)
# template: Optional[PredefinedTemplates] = Field(
# default=None,
# description="A template of settings to use. This is a convenience for common settings. Individual settings will override the template.",
# )
# @model_validator(mode="after")
# def _set_fields_from_template(self):
# if self.template == "always_on":
# self.stop_on_exit = False
# self.cleanup = False
# self.restart_on_initial = True
# self.restart_on_retrigger = True
# elif self.template == "half_day":
# self.runtime = self.runtime or self.timedelta(hours=12)
# self.stop_on_exit = True
# self.cleanup = True
# self.restart_on_initial = False
# self.restart_on_retrigger = False
# return self
load_airflow_config = SupervisorAirflowConfiguration.load