from typing import Dict
from airflow.models.dag import DAG
from airflow.models.operator import Operator
from airflow.operators.python import PythonOperator
from airflow_common_operators import fail, skip
from airflow_ha import Action, CheckResult, HighAvailabilityOperator, Result
from supervisor_pydantic.client import SupervisorRemoteXMLRPCClient
from supervisor_pydantic.convenience import (
SupervisorTaskStep,
check_programs,
kill_supervisor,
remove_supervisor_config,
restart_programs,
start_programs,
start_supervisor,
stop_programs,
stop_supervisor,
write_supervisor_config,
)
from airflow_supervisor.config import SupervisorAirflowConfiguration
__all__ = ("Supervisor",)
[docs]
class Supervisor(object):
_dag: DAG
_cfg: SupervisorAirflowConfiguration
_kill_dag: DAG
_xmlrpc_client: SupervisorRemoteXMLRPCClient
[docs]
def __init__(self, dag: DAG, cfg: SupervisorAirflowConfiguration, **kwargs):
# store config
self._cfg = cfg
# store or create client
self._xmlrpc_client = kwargs.pop("xmlrpc_client", SupervisorRemoteXMLRPCClient(self._cfg))
# store dag
self._dag = dag
self.setup_dag()
# initialize tasks
self.initialize_tasks()
self.configure_supervisor >> self.start_supervisor >> self.start_programs >> self.check_programs
# fail, restart
self.check_programs.retrigger_fail >> self.restart_programs
# pass, finish
self.check_programs.stop_pass >> self.stop_programs >> self.stop_supervisor >> self.unconfigure_supervisor
# TODO make helper dag
self._force_kill = self.get_step_operator("force-kill")
# Default non running
PythonOperator(task_id=f"{self._dag.dag_id}-force-kill-dag", python_callable=skip) >> self._force_kill
# Deal with any configuration or cleanup problems
any_config_fail = PythonOperator(
task_id=f"{self._dag.dag_id}-check-config-failed", python_callable=fail, trigger_rule="one_failed"
)
self.configure_supervisor >> any_config_fail
self.start_supervisor >> any_config_fail
self.start_programs >> any_config_fail
self.stop_programs >> any_config_fail
self.unconfigure_supervisor >> any_config_fail
def setup_dag(self):
# override dag kwargs that dont make sense
self._dag.catchup = False
self._dag.concurrency = 1
self._dag.max_active_tasks = 1
self._dag.max_active_runs = 1
def initialize_tasks(self):
# NOTE: initialize this first as it is relied upon by startup steps
self._check_programs = self.get_step_operator("check-programs")
# tasks
self._configure_supervisor = self.get_step_operator(step="configure-supervisor")
self._start_supervisor = self.get_step_operator(step="start-supervisor")
self._start_programs = self.get_step_operator("start-programs")
if self._cfg.stop_on_exit:
self._stop_programs = self.get_step_operator("stop-programs")
if self._cfg.cleanup:
self._unconfigure_supervisor = self.get_step_operator("unconfigure-supervisor")
else:
self._unconfigure_supervisor = PythonOperator(
task_id=f"{self._dag.dag_id}-unconfigure-supervisor", python_callable=skip
)
else:
self._stop_programs = PythonOperator(task_id=f"{self._dag.dag_id}-stop-programs", python_callable=skip)
self._unconfigure_supervisor = PythonOperator(
task_id=f"{self._dag.dag_id}-unconfigure-supervisor", python_callable=skip
)
self._restart_programs = self.get_step_operator("restart-programs")
self._stop_supervisor = self.get_step_operator("stop-supervisor")
@property
def configure_supervisor(self) -> "Operator":
return self._configure_supervisor
@property
def start_supervisor(self) -> "Operator":
return self._start_supervisor
@property
def start_programs(self) -> "Operator":
return self._start_programs
@property
def check_programs(self) -> "HighAvailabilityOperator":
return self._check_programs
@property
def stop_programs(self) -> "Operator":
return self._stop_programs
@property
def restart_programs(self) -> "Operator":
return self._restart_programs
@property
def stop_supervisor(self) -> "Operator":
return self._stop_supervisor
@property
def unconfigure_supervisor(self) -> "Operator":
return self._unconfigure_supervisor
@property
def supervisor_client(self) -> SupervisorRemoteXMLRPCClient:
return SupervisorRemoteXMLRPCClient(self._cfg)
def get_base_operator_kwargs(self) -> Dict:
return dict(dag=self._dag)
def get_step_kwargs(self, step: SupervisorTaskStep) -> Dict:
if step == "configure-supervisor":
return dict(
python_callable=lambda **kwargs: (
self.check_programs.check_end_conditions(**kwargs) is None
and write_supervisor_config(self._cfg, _exit=False)
),
do_xcom_push=True,
)
elif step == "start-supervisor":
return dict(
python_callable=lambda **kwargs: (
self.check_programs.check_end_conditions(**kwargs) is None
and start_supervisor(self._cfg._pydantic_path, _exit=False)
),
do_xcom_push=True,
)
elif step == "start-programs":
return dict(
python_callable=lambda **kwargs: (
self.check_programs.check_end_conditions(**kwargs) is None
and start_programs(self._cfg, _exit=False)
),
do_xcom_push=True,
)
elif step == "stop-programs":
return dict(python_callable=lambda: stop_programs(self._cfg, _exit=False), do_xcom_push=True)
elif step == "check-programs":
def _check_programs(supervisor_cfg=self._cfg, **kwargs) -> CheckResult:
# TODO formalize
if check_programs(supervisor_cfg, check_done=True, _exit=False):
# finish
return Result.PASS, Action.STOP
if check_programs(supervisor_cfg, check_running=True, _exit=False):
return Result.PASS, Action.CONTINUE
if check_programs(supervisor_cfg, _exit=False):
return Result.PASS, Action.CONTINUE
return Result.FAIL, Action.RETRIGGER
return dict(python_callable=_check_programs, do_xcom_push=True)
elif step == "restart-programs":
return dict(python_callable=lambda: restart_programs(self._cfg, _exit=False), do_xcom_push=True)
elif step == "stop-supervisor":
return dict(python_callable=lambda: stop_supervisor(self._cfg, _exit=False), do_xcom_push=True)
elif step == "unconfigure-supervisor":
return dict(python_callable=lambda: remove_supervisor_config(self._cfg, _exit=False), do_xcom_push=True)
elif step == "force-kill":
return dict(python_callable=lambda: kill_supervisor(self._cfg, _exit=False), do_xcom_push=True)
raise NotImplementedError(f"Unknown step: {step}")
def get_step_operator(self, step: SupervisorTaskStep) -> "Operator":
if step == "check-programs":
return HighAvailabilityOperator(
**{
# Sensor Args
"task_id": f"{self._dag.dag_id}-{step}",
"poke_interval": self._cfg.airflow.check_interval.total_seconds(),
"timeout": self._cfg.airflow.check_timeout.total_seconds(),
"mode": "poke",
# HighAvailabilityOperator Args
"runtime": self._cfg.airflow.runtime,
"endtime": self._cfg.airflow.endtime,
"maxretrigger": self._cfg.airflow.maxretrigger,
"reference_date": self._cfg.airflow.reference_date,
# Pass through
**self.get_base_operator_kwargs(),
**self.get_step_kwargs(step),
}
)
return PythonOperator(
**{"task_id": f"{self._dag.dag_id}-{step}", **self.get_base_operator_kwargs(), **self.get_step_kwargs(step)}
)