Source code for airflow_supervisor.airflow.local

from logging import getLogger
from typing import TYPE_CHECKING, Dict

from airflow_pydantic import Pool, fail, skip
from supervisor_pydantic.client import SupervisorRemoteXMLRPCClient
from supervisor_pydantic.convenience import (
    SupervisorTaskStep,
    check_programs,
    kill_supervisor,
    remove_supervisor_config,
    restart_programs,
    start_programs,
    start_supervisor,
    stop_programs,
    stop_supervisor,
    write_supervisor_config,
)

from airflow_supervisor.config import SupervisorAirflowConfiguration

if TYPE_CHECKING:
    from airflow.models.dag import DAG
    from airflow.models.operator import Operator
    from airflow_ha import CheckResult, HighAvailabilityOperator

__all__ = ("Supervisor",)

_log = getLogger(__name__)


[docs] class Supervisor(object): _cfg: SupervisorAirflowConfiguration _dag: "DAG" _kill_dag: "DAG" _pool: "Pool" _xmlrpc_client: SupervisorRemoteXMLRPCClient
[docs] def __init__(self, dag: "DAG", cfg: SupervisorAirflowConfiguration, **kwargs): if isinstance(cfg, dict): # NOTE: used in airflow-pydantic rendering cfg = SupervisorAirflowConfiguration.model_validate(cfg) # store config self._cfg = cfg # process pool self._pool = cfg.pool.pool if isinstance(cfg.pool, Pool) else cfg.pool # store or create client self._xmlrpc_client = kwargs.pop("xmlrpc_client", SupervisorRemoteXMLRPCClient(self._cfg)) # store dag self._dag = dag self.setup_dag() # initialize tasks self.initialize_tasks() self.configure_supervisor >> self.start_supervisor >> self.start_programs >> self.check_programs # fail, restart self.check_programs.retrigger_fail >> self.restart_programs # pass, finish self.check_programs.stop_pass >> self.stop_programs >> self.stop_supervisor >> self.unconfigure_supervisor # TODO make helper dag self._force_kill = self.get_step_operator("force-kill") # Default non running from airflow.operators.python import PythonOperator ( PythonOperator(task_id=f"{self._dag.dag_id}-force-kill-dag", python_callable=skip, pool=self._pool) >> self._force_kill ) # Deal with any configuration or cleanup problems any_config_fail = PythonOperator( task_id=f"{self._dag.dag_id}-check-config-failed", python_callable=fail, trigger_rule="one_failed", pool=self._pool, ) self.configure_supervisor >> any_config_fail self.start_supervisor >> any_config_fail self.start_programs >> any_config_fail self.stop_programs >> any_config_fail self.unconfigure_supervisor >> any_config_fail
def setup_dag(self): # override dag kwargs that dont make sense self._dag.catchup = False self._dag.concurrency = 1 self._dag.max_active_tasks = 1 self._dag.max_active_runs = 1 def initialize_tasks(self): from airflow.operators.python import PythonOperator # NOTE: initialize this first as it is relied upon by startup steps self._check_programs = self.get_step_operator("check-programs") # tasks self._configure_supervisor = self.get_step_operator(step="configure-supervisor") self._start_supervisor = self.get_step_operator(step="start-supervisor") self._start_programs = self.get_step_operator("start-programs") if self._cfg.stop_on_exit: _log.info("Stopping programs on exit") self._stop_programs = self.get_step_operator("stop-programs") if self._cfg.cleanup: _log.info("Cleaning up supervisor config on exit") self._unconfigure_supervisor = self.get_step_operator("unconfigure-supervisor") else: _log.info("Skipping cleanup of supervisor config on exit") self._unconfigure_supervisor = PythonOperator( task_id=f"{self._dag.dag_id}-unconfigure-supervisor", python_callable=skip ) else: _log.info("Not stopping programs on exit") _log.info("Skipping cleanup of supervisor config on exit") self._stop_programs = PythonOperator(task_id=f"{self._dag.dag_id}-stop-programs", python_callable=skip) self._unconfigure_supervisor = PythonOperator( task_id=f"{self._dag.dag_id}-unconfigure-supervisor", python_callable=skip ) self._restart_programs = self.get_step_operator("restart-programs") self._stop_supervisor = self.get_step_operator("stop-supervisor") @property def configure_supervisor(self) -> "Operator": return self._configure_supervisor @property def start_supervisor(self) -> "Operator": return self._start_supervisor @property def start_programs(self) -> "Operator": return self._start_programs @property def check_programs(self) -> "HighAvailabilityOperator": return self._check_programs @property def stop_programs(self) -> "Operator": return self._stop_programs @property def restart_programs(self) -> "Operator": return self._restart_programs @property def stop_supervisor(self) -> "Operator": return self._stop_supervisor @property def unconfigure_supervisor(self) -> "Operator": return self._unconfigure_supervisor @property def supervisor_client(self) -> SupervisorRemoteXMLRPCClient: return SupervisorRemoteXMLRPCClient(self._cfg) def get_base_operator_kwargs(self) -> Dict: return dict(dag=self._dag, pool=self._pool) def get_step_kwargs(self, step: SupervisorTaskStep) -> Dict: if step == "configure-supervisor": return dict( python_callable=lambda **kwargs: ( self.check_programs.check_end_conditions(**kwargs) is None and write_supervisor_config(self._cfg, _exit=False) ), do_xcom_push=True, ) elif step == "start-supervisor": return dict( python_callable=lambda **kwargs: ( self.check_programs.check_end_conditions(**kwargs) is None and start_supervisor(self._cfg._pydantic_path, _exit=False) ), do_xcom_push=True, ) elif step == "start-programs": if self._cfg.restart_on_retrigger: _log.info("Restarting programs on retrigger") return dict( python_callable=lambda **kwargs: ( self.check_programs.check_end_conditions(**kwargs) is None and start_programs( self._cfg, # Always restart programs restart=True, _exit=False, ) ), do_xcom_push=True, ) if self._cfg.restart_on_initial: _log.info("Restarting programs on initial run") return dict( python_callable=lambda **kwargs: ( self.check_programs.check_end_conditions(**kwargs) is None and start_programs( self._cfg, # Restart programs if initial run restart=self.check_programs.is_initial_run(**kwargs), _exit=False, ) ), do_xcom_push=True, ) _log.info("Starting programs as normal on initial run") return dict( python_callable=lambda **kwargs: ( self.check_programs.check_end_conditions(**kwargs) is None # Don't restart programs and start_programs(self._cfg, _exit=False) ), do_xcom_push=True, ) elif step == "stop-programs": return dict(python_callable=lambda: stop_programs(self._cfg, _exit=False), do_xcom_push=True) elif step == "check-programs": def _check_programs(supervisor_cfg=self._cfg, **kwargs) -> "CheckResult": from airflow_ha import Action, Result # TODO formalize if check_programs(supervisor_cfg, check_done=True, _exit=False): # finish return Result.PASS, Action.STOP if check_programs(supervisor_cfg, check_running=True, _exit=False): return Result.PASS, Action.CONTINUE if check_programs(supervisor_cfg, _exit=False): return Result.PASS, Action.CONTINUE return Result.FAIL, Action.RETRIGGER return dict(python_callable=_check_programs, do_xcom_push=True) elif step == "restart-programs": return dict(python_callable=lambda: restart_programs(self._cfg, _exit=False), do_xcom_push=True) elif step == "stop-supervisor": return dict(python_callable=lambda: stop_supervisor(self._cfg, _exit=False), do_xcom_push=True) elif step == "unconfigure-supervisor": return dict(python_callable=lambda: remove_supervisor_config(self._cfg, _exit=False), do_xcom_push=True) elif step == "force-kill": return dict(python_callable=lambda: kill_supervisor(self._cfg, _exit=False), do_xcom_push=True) raise NotImplementedError(f"Unknown step: {step}") def get_step_operator(self, step: SupervisorTaskStep) -> "Operator": from airflow.operators.python import PythonOperator from airflow_ha import HighAvailabilityOperator if step == "check-programs": ha_operator_args = { # Sensor Args "task_id": f"{self._dag.dag_id}-{step}", "poke_interval": self._cfg.check_interval.total_seconds(), "timeout": self._cfg.check_timeout.total_seconds(), "mode": "poke", # HighAvailabilityOperator Args "runtime": self._cfg.runtime, "endtime": self._cfg.endtime, "maxretrigger": self._cfg.maxretrigger, "reference_date": self._cfg.reference_date, # Pass through **self.get_base_operator_kwargs(), **self.get_step_kwargs(step), } _log.info(f"Creating HighAvailabilityOperator for {step} with args: {ha_operator_args}") return HighAvailabilityOperator(**ha_operator_args) return PythonOperator( **{"task_id": f"{self._dag.dag_id}-{step}", **self.get_base_operator_kwargs(), **self.get_step_kwargs(step)} ) def __lshift__(self, other: "Operator") -> "Operator": """e.g. Supervisor() << b""" self.configure_supervisor << other return self.unconfigure_supervisor def __rshift__(self, other: "Operator") -> "Operator": """e.g. Supervisor() >> b""" self.unconfigure_supervisor >> other return other def set_upstream(self, other: "Operator"): self.configure_supervisor.set_upstream(other) def set_downstream(self, other: "Operator"): self.unconfigure_supervisor.set_downstream(other) def update_relative(self, other, upstream: bool = True, edge_modifier=None): if upstream: self.configure_supervisor.update_relative(other, upstream=True, edge_modifier=edge_modifier) else: self.unconfigure_supervisor.update_relative(other, upstream=False, edge_modifier=edge_modifier) return self @property def leaves(self): """Return the leaves of the DAG.""" return self.unconfigure_supervisor.leaves @property def roots(self): """Return the roots of the DAG.""" return self.configure_supervisor.roots