airflow-supervisor¶
Apache Airflow utilities for running long-running or always-on jobs with supervisord
Overview¶
This library provides a configuration-driven way of generating supervisor configurations and airflow operators/sensors for long-running or always-on jobs. Configuration is managed by Pydantic, Hydra, and OmegaConf via the supervisor-pydantic library.
How To: Use in Airflow¶
airflow-supervisor
can be installed in your airflow server environment and imported in your dag files. It provides two convenient top level DAG subclasses:
Supervisor
: creates a DAG representing a local supervisor instance running on the airflow worker node (underlying task will usePythonOperator
andBashOperator
to communicate between airflow and supervisor)SupervisorSSH
: creates a DAG representing a remote supervisor instance running on another machine (underlying tasks will useSSHOperator
to communicate between airflow and supervisor)
We expose DAGs composed of a variety of tasks and sensors, which are exposed as a discrete pipeline of steps:
Setup
supervisord
configurationStart the
supervisord
daemonStart the supervised programs with
supervisorctl
Start sensors to query the programs’ state via supervisor’s XML-RPC API
Evaluate and take action according to the program’s state changes
Restart programs if necessary
Tear down the sensors from (4)
Stop the supervised programs from (3)
Stop the
supervisord
daemon from (2)Remove configuration from (1)
This setup provides maximal configureability with a minimal requirements on the machine (for example, no requirements on an existing supervisord
daemon via e.g. systemd
). It also lets you hook your own tasks into any step of the process. For example, if we detect a process has died in step (5), you could configure your own task to take some custom action before/instead of the default restart of step 6.
Here is a nice overview of the DAG, with annotations for code paths and the actions taken by Supervisor:

More docs and code examples coming soon!
Example Dag:¶
from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import SupervisorAirflowConfiguration, Supervisor, ProgramConfiguration
# Create supervisor configuration
cfg = SupervisorAirflowConfiguration(
working_dir="/data/airflow/supervisor",
config_path="/data/airflow/supervisor/supervisor.conf",
program={
"test": ProgramConfiguration(
command="bash -c 'sleep 14400; exit 1'",
)
},
)
# Create DAG as normal
with DAG(
dag_id="test-supervisor",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Link supervisor config to dag
supervisor = Supervisor(dag=dag, cfg=cfg)
Example DAG: airflow-config
¶
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
retries: 0
depends_on_past: false
all_dags:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
extensions:
supervisor:
_target_: airflow_supervisor.SupervisorAirflowConfiguration
airflow:
_target_: airflow_supervisor.ConvenienceConfiguration
port: "*:9091"
working_dir: "/data/airflow/supervisor"
config_path: "/data/airflow/supervisor/supervisor.conf"
program:
test:
_target_: airflow_supervisor.ProgramConfiguration
command: "bash -c 'sleep 14400; exit 1'"
from datetime import timedelta
from airflow_config import load_config, DAG
from airflow_supervisor import Supervisor
config = load_config(config_name="airflow")
with DAG(
dag_id="test-supervisor",
schedule=timedelta(days=1),
config=config,
) as dag:
supervisor = Supervisor(dag=dag, cfg=config.extensions["supervisor"])
Configuration¶
See supervisor-pydantic for reference.
SupervisorAirflowConfiguration
: Wrapper aroundsupervisor_pydantic.SupervisorConvenienceConfiguration
, with added airflow-specific configurationSupervisorSSHAirflowConfiguration
: Wrapper aroundSupervisorAirflowConfiguration
, with added parameters for airflow’sSSHOperator
AirflowConfiguration
: Airflow-specific configuration for how the DAG and Operators should behave, includingairflow_ha.HighAvailabilityOperator
andPythonSensor
classDiagram SupervisorConvenienceConfiguration <|-- SupervisorAirflowConfiguration SupervisorAirflowConfiguration <|-- SupervisorSSHAirflowConfiguration class SupervisorConvenienceConfiguration { supervisor_pydantic.SupervisorConvenienceConfiguration } SupervisorAirflowConfiguration *-- AirflowConfiguration class SupervisorAirflowConfiguration{ airflow: AirflowConfiguration stop_on_exit: bool cleanup: bool restart_on_initial: bool restart_on_retrigger: bool } class SupervisorSSHAirflowConfiguration { command_prefix: str # Airflow SSHOperator Arguments ssh_hook: object ssh_conn_id: str remote_host: str conn_timeout: int cmd_timeout: int environment: dict get_pty: bool banner_timeout: float skip_on_exit_code: List~int~ } class AirflowConfiguration { # PythonSensor arguments check_interval: timedelta check_timeout: timedelta # HighAvailabilityOperator custom args runtime: timedelta endtime: time maxretrigger: int reference_date: str }
Note
This library is built on supervisor-pydantic, which provides configuration elements for all supervisor structures, as well as self-contained tools for interacting with supervisor instances.
Note
This library was generated using copier from the Base Python Project Template repository.