Examples

This page provides comprehensive examples of using airflow-supervisor for various use cases.

Basic Local Supervisor

The simplest example runs a supervisor instance on the same machine as the Airflow worker:

from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

# Define the programs to supervise
cfg = SupervisorAirflowConfiguration(
    port=9001,
    working_dir="/data/supervisor/my-service",
    program={
        "my-app": ProgramConfiguration(
            command="python /app/server.py",
            stdout_logfile="/var/log/my-app/output.log",
            stderr_logfile="/var/log/my-app/error.log",
        ),
    },
)

with DAG(
    dag_id="basic-supervisor-dag",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=cfg)

Multiple Programs

Supervise multiple programs within a single supervisor instance:

from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

cfg = SupervisorAirflowConfiguration(
    port=9002,
    working_dir="/data/supervisor/multi-service",
    program={
        "web-server": ProgramConfiguration(
            command="gunicorn app:app --bind 0.0.0.0:8000",
            directory="/app/web",
        ),
        "celery-worker": ProgramConfiguration(
            command="celery -A tasks worker --loglevel=info",
            directory="/app/tasks",
        ),
        "redis": ProgramConfiguration(
            command="redis-server /etc/redis.conf",
        ),
    },
)

with DAG(
    dag_id="multi-program-supervisor",
    schedule=timedelta(hours=12),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=cfg)

Remote Supervisor via SSH

Manage a supervisor instance on a remote machine using SSH:

from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow_pydantic import SSHOperatorArgs
from airflow_supervisor import (
    SupervisorSSHAirflowConfiguration,
    SupervisorSSH,
    ProgramConfiguration,
)

cfg = SupervisorSSHAirflowConfiguration(
    port=9001,
    working_dir="/opt/services/my-app",
    program={
        "my-app": ProgramConfiguration(
            command="/opt/services/my-app/run.sh",
        ),
    },
    ssh_operator_args=SSHOperatorArgs(
        ssh_hook=SSHHook(
            remote_host="app-server.example.com",
            username="deploy",
            key_file="/home/airflow/.ssh/id_rsa",
        ),
        cmd_timeout=300,
    ),
    command_prefix="source /etc/profile &&",
)

with DAG(
    dag_id="remote-supervisor-dag",
    schedule=timedelta(hours=8),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    supervisor = SupervisorSSH(dag=dag, cfg=cfg)

Always-On Service

Configure a service that stays running and doesn’t stop when the DAG completes:

from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

cfg = SupervisorAirflowConfiguration(
    port=9003,
    working_dir="/data/supervisor/always-on",
    program={
        "persistent-service": ProgramConfiguration(
            command="python /app/daemon.py",
        ),
    },
    # Don't stop the service when DAG completes
    stop_on_exit=False,
    cleanup=False,
    # Restart when DAG is re-triggered
    restart_on_initial=True,
    restart_on_retrigger=True,
)

with DAG(
    dag_id="always-on-supervisor",
    schedule=timedelta(hours=1),  # Check every hour
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=cfg)

Time-Limited Job with Runtime

Run a job for a maximum duration:

from airflow import DAG
from datetime import timedelta, datetime, time
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

cfg = SupervisorAirflowConfiguration(
    port=9004,
    working_dir="/data/supervisor/batch-job",
    program={
        "batch-process": ProgramConfiguration(
            command="python /app/batch_processor.py",
        ),
    },
    # Run for maximum 4 hours
    runtime=timedelta(hours=4),
    # Or end at specific time
    endtime=time(18, 0),  # 6 PM
)

with DAG(
    dag_id="time-limited-supervisor",
    schedule="0 9 * * *",  # Start at 9 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=cfg)

Using Airflow Pools

Control concurrency with Airflow pools:

from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

cfg = SupervisorAirflowConfiguration(
    port=9005,
    working_dir="/data/supervisor/pooled-job",
    program={
        "resource-intensive": ProgramConfiguration(
            command="python /app/heavy_compute.py",
        ),
    },
    # Use specific Airflow pool
    pool="gpu-workers",
)

with DAG(
    dag_id="pooled-supervisor",
    schedule=timedelta(hours=2),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=cfg)

Configuration-Driven with airflow-config

Basic YAML Configuration

# config/supervisor/web-service.yaml
_target_: airflow_config.Configuration

default_args:
  _target_: airflow_config.DefaultArgs
  retries: 0
  depends_on_past: false

all_dags:
  _target_: airflow_config.DagArgs
  start_date: "2024-01-01"
  catchup: false
  tags:
    - supervisor
    - production

extensions:
  supervisor:
    _target_: airflow_supervisor.SupervisorAirflowConfiguration
    port: 9091
    working_dir: "/data/supervisor/web-service"
    check_interval: 10  # seconds
    check_timeout: 28800  # 8 hours
    program:
      web-server:
        _target_: airflow_supervisor.ProgramConfiguration
        command: "gunicorn app:app --bind 0.0.0.0:8080"
        directory: "/app"
        stdout_logfile: "/var/log/web-server/output.log"
        stderr_logfile: "/var/log/web-server/error.log"
# dags/web_service_supervisor.py
from datetime import timedelta
from airflow_config import load_config, DAG
from airflow_supervisor import Supervisor

config = load_config("config/supervisor", "web-service")

with DAG(
    dag_id="web-service-supervisor",
    schedule=timedelta(hours=12),
    config=config,
) as dag:
    supervisor = Supervisor(dag=dag, cfg=config.extensions["supervisor"])

SSH with Host from airflow-balancer

When using airflow-balancer for dynamic host selection:

# config/supervisor/distributed-service.yaml
_target_: airflow_config.Configuration

extensions:
  balancer:
    _target_: airflow_balancer.BalancerConfiguration
    hosts:
      - _target_: airflow_pydantic.Host
        name: "server-1.example.com"
        pool: "compute-pool"
      - _target_: airflow_pydantic.Host
        name: "server-2.example.com"
        pool: "compute-pool"

  supervisor:
    _target_: airflow_supervisor.SupervisorSSHAirflowConfiguration
    port: 9001
    working_dir: "/opt/services/distributed"
    program:
      worker:
        _target_: airflow_supervisor.ProgramConfiguration
        command: "python /opt/services/worker.py"
    ssh_operator_args:
      _target_: airflow_pydantic.SSHOperatorArgs
      cmd_timeout: 120

dags:
  distributed-supervisor:
    schedule: "0 */4 * * *"
    tasks:
      run-service:
        _target_: airflow_supervisor.SupervisorSSHTask
        task_id: run-service
        cfg: ${extensions.supervisor}
        host:
          _target_: airflow_pydantic.HostQuery
          balancer: ${extensions.balancer}
          kind: select

Chaining with Other Tasks

Supervisor DAGs can be chained with other Airflow tasks:

from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

def setup_environment():
    # Setup code here
    pass

def cleanup_environment():
    # Cleanup code here
    pass

cfg = SupervisorAirflowConfiguration(
    port=9006,
    working_dir="/data/supervisor/chained",
    program={
        "my-service": ProgramConfiguration(command="python service.py"),
    },
)

with DAG(
    dag_id="chained-supervisor",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    # Pre-supervisor setup
    setup = PythonOperator(
        task_id="setup",
        python_callable=setup_environment,
    )

    # Supervisor job
    supervisor = Supervisor(dag=dag, cfg=cfg)

    # Post-supervisor cleanup
    cleanup = PythonOperator(
        task_id="cleanup",
        python_callable=cleanup_environment,
    )

    # Chain the tasks
    setup >> supervisor >> cleanup

Custom Check Intervals

Configure monitoring frequency for different use cases:

from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import (
    SupervisorAirflowConfiguration,
    Supervisor,
    ProgramConfiguration,
)

# High-frequency monitoring for critical services
critical_cfg = SupervisorAirflowConfiguration(
    port=9007,
    working_dir="/data/supervisor/critical",
    program={
        "critical-service": ProgramConfiguration(command="python critical.py"),
    },
    check_interval=timedelta(seconds=2),  # Check every 2 seconds
    check_timeout=timedelta(hours=24),    # 24-hour timeout
)

# Low-frequency monitoring for batch jobs
batch_cfg = SupervisorAirflowConfiguration(
    port=9008,
    working_dir="/data/supervisor/batch",
    program={
        "batch-job": ProgramConfiguration(command="python batch.py"),
    },
    check_interval=timedelta(minutes=1),  # Check every minute
    check_timeout=timedelta(hours=8),     # 8-hour timeout
)

with DAG(
    dag_id="monitoring-examples",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    critical = Supervisor(dag=dag, cfg=critical_cfg)
    # Note: Typically you'd have separate DAGs for different services

Integration Notes

supervisor-pydantic

airflow-supervisor builds on supervisor-pydantic, which provides:

  • Pydantic models for all supervisor configuration sections

  • XML-RPC client for supervisor communication

  • Convenience utilities for supervisor management

airflow-config

For YAML-driven DAG definitions, use airflow-config:

  • Hydra-based configuration management

  • Per-environment overrides

  • Integration with airflow-pydantic models

airflow-ha

High availability features come from airflow-ha:

  • Automatic retrigger on failure

  • Runtime and endtime constraints

  • Reference date handling

airflow-balancer

For dynamic host selection, integrate with airflow-balancer:

  • Host pool management

  • Dynamic host and port selection

  • Load balancing across machines