Examples¶
This page provides comprehensive examples of using airflow-ha for various use cases.
Always-On Service Monitoring¶
Monitor a service continuously and handle failures:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action
import requests
def check_api_health(**kwargs):
"""Check if the API is responding."""
try:
response = requests.get("http://my-api.example.com/health", timeout=10)
if response.status_code == 200:
return (Result.PASS, Action.CONTINUE)
else:
return (Result.FAIL, Action.RETRIGGER)
except requests.RequestException:
return (Result.FAIL, Action.RETRIGGER)
with DAG(
dag_id="api-health-monitor",
description="Monitor API health continuously",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
pre = PythonOperator(
task_id="setup",
python_callable=lambda: print("Starting health monitoring"),
)
ha = HighAvailabilityOperator(
task_id="health-check",
timeout=3600, # 1 hour timeout per check cycle
poke_interval=30, # Check every 30 seconds
python_callable=check_api_health,
)
# Handle successful stop
cleanup = PythonOperator(
task_id="cleanup",
python_callable=lambda: print("Monitoring stopped"),
)
pre >> ha
ha.stop_pass >> cleanup
AWS MWAA Compatible DAG¶
AWS Managed Workflows for Apache Airflow (MWAA) has a 12-hour maximum DAG runtime. Use runtime limiter to automatically retrigger before hitting this limit:
from datetime import datetime, timedelta
from airflow import DAG
from airflow_ha import HighAvailabilityOperator, Result, Action
def long_running_check(**kwargs):
"""Check for a long-running process."""
# Your check logic here
process_complete = False # Replace with actual check
if process_complete:
return (Result.PASS, Action.STOP)
return (Result.PASS, Action.CONTINUE)
with DAG(
dag_id="mwaa-compatible-dag",
description="Long-running DAG compatible with MWAA 12-hour limit",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="long-running-task",
timeout=3600,
poke_interval=60,
python_callable=long_running_check,
# Retrigger before hitting MWAA's 12-hour limit
runtime=timedelta(hours=11), # Stop after 11 hours
)
Time-Based Scheduling¶
Run a process during business hours only:
from datetime import datetime, timedelta, time
from airflow import DAG
from airflow_ha import HighAvailabilityOperator, Result, Action
def business_hours_process(**kwargs):
"""Process that should only run during business hours."""
# Your processing logic
return (Result.PASS, Action.CONTINUE)
with DAG(
dag_id="business-hours-dag",
description="Run only during business hours",
schedule="0 9 * * 1-5", # Start at 9 AM on weekdays
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="business-process",
timeout=3600,
poke_interval=300, # Check every 5 minutes
python_callable=business_hours_process,
# Stop at 6 PM
endtime=time(18, 0),
)
Recursive/Countdown DAG¶
Build DAGs that trigger themselves with state:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action
with DAG(
dag_id="countdown-dag",
description="Recursive countdown DAG",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
):
def _get_count(**kwargs):
"""Get current count from DAG config, default to 5."""
return kwargs['dag_run'].conf.get('counter', 5) - 1
get_count = PythonOperator(
task_id="get-count",
python_callable=_get_count,
)
def _keep_counting(**kwargs):
"""Decide whether to continue counting."""
count = kwargs["task_instance"].xcom_pull(
key="return_value",
task_ids="get-count"
)
if count > 0:
return (Result.PASS, Action.RETRIGGER)
elif count == 0:
return (Result.PASS, Action.STOP)
else:
return (Result.FAIL, Action.STOP)
keep_counting = HighAvailabilityOperator(
task_id="countdown",
timeout=30,
poke_interval=5,
python_callable=_keep_counting,
# Pass the new counter value to the retriggered DAG
pass_trigger_kwargs={
"conf": '''{"counter": {{ ti.xcom_pull(key="return_value", task_ids="get-count") }}}'''
},
)
get_count >> keep_counting
Max Retrigger Limit¶
Limit the number of times a DAG can retrigger:
from datetime import datetime, timedelta
from airflow import DAG
from airflow_ha import HighAvailabilityOperator, Result, Action
def check_with_retries(**kwargs):
"""A check that might need multiple retries."""
# Your logic here
success = False # Replace with actual check
if success:
return (Result.PASS, Action.STOP)
return (Result.FAIL, Action.RETRIGGER)
with DAG(
dag_id="limited-retries-dag",
description="DAG with max retrigger limit",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="retry-limited-task",
timeout=300,
poke_interval=30,
python_callable=check_with_retries,
# Give up after 5 retriggers
maxretrigger=5,
)
Full Branching Example¶
Handle all possible outcomes with custom tasks:
from datetime import datetime, timedelta
from random import choice
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action, fail
def random_outcome(**kwargs):
"""Randomly choose an outcome for demonstration."""
return choice([
(Result.PASS, Action.CONTINUE),
(Result.PASS, Action.RETRIGGER),
(Result.PASS, Action.STOP),
(Result.FAIL, Action.CONTINUE),
(Result.FAIL, Action.RETRIGGER),
(Result.FAIL, Action.STOP),
])
with DAG(
dag_id="full-branching-dag",
description="Demonstrate all HA branches",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Pre-task
pre = PythonOperator(
task_id="pre",
python_callable=lambda: print("Starting"),
)
# HA Operator
ha = HighAvailabilityOperator(
task_id="ha",
timeout=30,
poke_interval=5,
python_callable=random_outcome,
)
pre >> ha
# Handle retrigger after pass
retrigger_pass = PythonOperator(
task_id="retrigger-pass",
python_callable=lambda: print("Retriggering after success"),
)
ha.retrigger_pass >> retrigger_pass
# Handle retrigger after fail
retrigger_fail = PythonOperator(
task_id="retrigger-fail",
python_callable=lambda: print("Retriggering after failure"),
)
ha.retrigger_fail >> retrigger_fail
# Handle stop after pass
stop_pass = PythonOperator(
task_id="stop-pass",
python_callable=lambda: print("Stopping after success"),
)
ha.stop_pass >> stop_pass
# Handle stop after fail
stop_fail = PythonOperator(
task_id="stop-fail",
python_callable=fail,
trigger_rule="all_failed",
)
ha.stop_fail >> stop_fail
Integration with airflow-supervisor¶
airflow-ha is used by airflow-supervisor to monitor long-running supervisor processes:
from datetime import datetime, timedelta
from airflow import DAG
from airflow_supervisor import (
SupervisorAirflowConfiguration,
Supervisor,
ProgramConfiguration,
)
# The Supervisor internally uses HighAvailabilityOperator
# to monitor and restart supervised programs
cfg = SupervisorAirflowConfiguration(
port=9001,
working_dir="/data/supervisor",
# These are passed to HighAvailabilityOperator
check_interval=timedelta(seconds=30),
check_timeout=timedelta(hours=8),
runtime=timedelta(hours=11), # MWAA compatible
program={
"my-service": ProgramConfiguration(
command="python my_service.py",
),
},
)
with DAG(
dag_id="supervisor-with-ha",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
supervisor = Supervisor(dag=dag, cfg=cfg)
Passing State Between Retriggers¶
Pass configuration data to retriggered DAG runs:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action
def process_batch(**kwargs):
"""Process a batch and track progress."""
conf = kwargs['dag_run'].conf
current_batch = conf.get('batch_id', 0)
total_batches = conf.get('total_batches', 10)
# Process the batch
print(f"Processing batch {current_batch + 1} of {total_batches}")
if current_batch + 1 >= total_batches:
return (Result.PASS, Action.STOP)
return (Result.PASS, Action.RETRIGGER)
with DAG(
dag_id="batch-processing-dag",
description="Process batches across retriggers",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
def _get_next_batch(**kwargs):
conf = kwargs['dag_run'].conf
return conf.get('batch_id', 0) + 1
get_batch = PythonOperator(
task_id="get-next-batch",
python_callable=_get_next_batch,
)
ha = HighAvailabilityOperator(
task_id="process-batch",
timeout=300,
poke_interval=10,
python_callable=process_batch,
pass_trigger_kwargs={
"conf": '''{"batch_id": {{ ti.xcom_pull(key="return_value", task_ids="get-next-batch") }}, "total_batches": 10}'''
},
)
get_batch >> ha
Using with airflow-config (YAML-Driven)¶
Define HA DAGs declaratively with airflow-config:
# config/ha/monitoring.yaml
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
retries: 0
depends_on_past: false
all_dags:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
schedule: "0 0 * * *"
dags:
monitoring-dag:
tasks:
health-check:
_target_: airflow_ha.HighAvailabilityTask
task_id: health-check
timeout: 3600
poke_interval: 60
runtime: 43200 # 12 hours in seconds
python_callable: my_module.check_health
# dags/monitoring.py
from airflow_config import load_config, DAG
config = load_config("config/ha", "monitoring")
with DAG(dag_id="monitoring-dag", config=config) as dag:
# Tasks are automatically created from config
pass
Combining Multiple Limiters¶
Use multiple limiters together for fine-grained control:
from datetime import datetime, timedelta, time
from airflow import DAG
from airflow_ha import HighAvailabilityOperator, Result, Action
def my_check(**kwargs):
return (Result.PASS, Action.CONTINUE)
with DAG(
dag_id="multi-limiter-dag",
description="DAG with multiple limiters",
schedule="0 6 * * *", # Start at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="multi-limited",
timeout=3600,
poke_interval=60,
python_callable=my_check,
# Multiple limiters - first one triggered wins
runtime=timedelta(hours=11), # Max 11 hours
endtime=time(20, 0), # Stop by 8 PM
maxretrigger=24, # Max 24 retriggers
)
Integration Notes¶
airflow-supervisor¶
airflow-ha is a core dependency of airflow-supervisor, which uses it to:
Monitor supervisor process health
Automatically restart failed processes
Handle graceful shutdown and cleanup
AWS MWAA¶
For AWS Managed Workflows for Apache Airflow:
Use
runtime=timedelta(hours=11)to stay under the 12-hour limitDAGs will automatically retrigger before timing out
State is preserved across retriggers via
pass_trigger_kwargs
airflow-pydantic¶
airflow-ha is built on airflow-pydantic, which provides:
Pydantic models for Airflow constructs
Operator wrappers for serialization
Integration with airflow-config for YAML-driven DAGs