Getting Started¶
Overview¶
airflow-supervisor provides Apache Airflow operators and configuration for running long-running or always-on jobs with supervisord. It builds on supervisor-pydantic for configuration management and is designed to work seamlessly with airflow-config for YAML-driven DAG definitions.
Key Benefits:
Long-Running Jobs: Run services that need to stay up beyond Airflow’s task execution model
Always-On Services: Keep daemon processes running with automatic monitoring
Configuration-Driven: Define supervisor configurations in YAML using airflow-config
High Availability: Built on airflow-ha for fault tolerance and retrigger capabilities
Remote Execution: Support for both local and SSH-based supervisor management
Note
This library builds on supervisor-pydantic, which provides Pydantic configuration models for all supervisor structures. It is designed to work with airflow-config for YAML-driven DAG definitions.
Installation¶
Install airflow-supervisor from PyPI:
pip install airflow-supervisor
For use with Apache Airflow 2.x:
pip install airflow-supervisor[airflow]
For use with Apache Airflow 3.x:
pip install airflow-supervisor[airflow3]
Basic Usage¶
Local Supervisor DAG¶
The simplest use case is running a supervisor instance on the Airflow worker node:
from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import SupervisorAirflowConfiguration, Supervisor, ProgramConfiguration
# Create supervisor configuration
cfg = SupervisorAirflowConfiguration(
working_dir="/data/airflow/supervisor",
config_path="/data/airflow/supervisor/supervisor.conf",
program={
"my-service": ProgramConfiguration(
command="python my_service.py",
)
},
)
# Create DAG
with DAG(
dag_id="my-supervisor-dag",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
supervisor = Supervisor(dag=dag, cfg=cfg)
Remote Supervisor via SSH¶
For managing supervisor on remote machines:
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow_pydantic import SSHOperatorArgs
from airflow_supervisor import (
SupervisorSSHAirflowConfiguration,
SupervisorSSH,
ProgramConfiguration,
)
# Create SSH-enabled supervisor configuration
cfg = SupervisorSSHAirflowConfiguration(
port=9001,
working_dir="/data/supervisor",
program={
"my-service": ProgramConfiguration(
command="python my_service.py",
)
},
ssh_operator_args=SSHOperatorArgs(
ssh_hook=SSHHook(
remote_host="my-server.example.com",
username="airflow",
key_file="/path/to/key",
),
),
)
with DAG(
dag_id="remote-supervisor-dag",
schedule=timedelta(hours=8),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
supervisor = SupervisorSSH(dag=dag, cfg=cfg)
Configuration-Driven DAGs with airflow-config¶
The recommended approach is using airflow-config for fully declarative DAG definitions:
# config/supervisor/my-service.yaml
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
retries: 0
depends_on_past: false
all_dags:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
extensions:
supervisor:
_target_: airflow_supervisor.SupervisorAirflowConfiguration
port: 9091
working_dir: "/data/airflow/supervisor"
program:
my-service:
_target_: airflow_supervisor.ProgramConfiguration
command: "python my_service.py"
# dags/my_supervisor_dag.py
from datetime import timedelta
from airflow_config import load_config, DAG
from airflow_supervisor import Supervisor
config = load_config("config/supervisor", "my-service")
with DAG(
dag_id="my-supervisor-dag",
schedule=timedelta(days=1),
config=config,
) as dag:
supervisor = Supervisor(dag=dag, cfg=config.extensions["supervisor"])
DAG Lifecycle¶
When you create a Supervisor DAG, it orchestrates the following lifecycle:
Configure - Write supervisord configuration file
Start Supervisor - Launch the supervisord daemon
Start Programs - Start supervised programs via supervisorctl
Monitor - Continuously check program status via XML-RPC API
Handle Failures - Restart programs on failure (via airflow-ha)
Stop Programs - Gracefully stop supervised programs
Stop Supervisor - Shut down the supervisord daemon
Cleanup - Remove configuration files
The DAG structure provides hooks at each step for custom actions.
Configuration Options¶
SupervisorAirflowConfiguration¶
Key configuration options for SupervisorAirflowConfiguration:
Option |
Type |
Description |
|---|---|---|
|
timedelta |
Interval between program status checks (default: 5s) |
|
timedelta |
Timeout for status checks (default: 8 hours) |
|
timedelta |
Maximum runtime of the supervisor job |
|
time |
End time of the job |
|
bool |
Stop programs when DAG finishes (default: True) |
|
bool |
Remove config files on completion (default: True) |
|
bool |
Restart programs on HA retrigger (default: False) |
|
str |
Airflow pool for task scheduling |
ProgramConfiguration¶
Supervisor program configuration (from supervisor-pydantic):
Option |
Type |
Description |
|---|---|---|
|
str |
The command to run |
|
bool |
Start automatically (default: False for Airflow) |
|
bool |
Restart on exit (default: False) |
|
int |
Seconds to consider started (default: 1) |
|
list |
Expected exit codes (default: [0]) |
|
str |
Signal to stop (default: TERM) |
|
Path |
Standard output log file |
|
Path |
Standard error log file |
Next Steps¶
See Examples for more detailed use cases
Consult the API Reference for complete API documentation
Visit supervisor-pydantic for supervisor configuration details