Getting Started¶
Overview¶
airflow-ha provides a High Availability (HA) DAG utility for Apache Airflow that enables long-running or “always-on” tasks. It’s designed to work around Airflow’s traditional task execution model, which assumes tasks complete within a reasonable timeframe.
Key Benefits:
Always-On DAGs: Build DAGs that continuously run and monitor themselves
Self-Retriggering: Automatically retrigger DAGs when tasks complete or fail
Graceful Shutdown: Configure runtime limits, end times, and max retrigger counts
Flexible Outcomes: Control DAG behavior based on check results (PASS/FAIL, CONTINUE/RETRIGGER/STOP)
AWS MWAA Compatible: Perfect for AWS Managed Workflows for Apache Airflow (MWAA) where DAGs have a maximum runtime of 12 hours
Note
This library is used by airflow-supervisor to build DAGs that manage supervisor processes with fault tolerance and automatic recovery.
Important
AWS MWAA Users: AWS Managed Workflows for Apache Airflow imposes a 12-hour maximum DAG runtime limit. airflow-ha provides a clean solution by automatically retriggering DAGs before they hit this limit, allowing you to run continuous workloads on MWAA.
Installation¶
Install airflow-ha from PyPI:
pip install airflow-ha
For use with Apache Airflow 2.x:
pip install airflow-ha[airflow]
For use with Apache Airflow 3.x:
pip install airflow-ha[airflow3]
Basic Concepts¶
The HighAvailabilityOperator¶
The core component is HighAvailabilityOperator, which inherits from PythonSensor. It runs a user-provided python_callable and takes action based on the return value:
Return |
Result |
Current DAGrun End State |
|---|---|---|
|
Retrigger the same DAG to run again |
|
|
Finish the DAG, until its next scheduled run |
|
|
Retrigger the same DAG to run again |
|
|
Finish the DAG, until its next scheduled run |
|
|
Continue to run the Sensor |
N/A |
Note
If the sensor times out, the behavior matches (Result.PASS, Action.RETRIGGER).
Result and Action Enums¶
from airflow_ha import Result, Action
# Result indicates success or failure
Result.PASS # Task succeeded
Result.FAIL # Task failed
# Action controls what happens next
Action.CONTINUE # Keep running the sensor
Action.RETRIGGER # Trigger a new DAG run
Action.STOP # Stop and wait for next scheduled run
Basic Usage¶
Simple Always-On DAG¶
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action
def check_service_health(**kwargs):
"""Check if the service is healthy."""
# Your health check logic here
is_healthy = True # Replace with actual check
if is_healthy:
return (Result.PASS, Action.CONTINUE)
else:
return (Result.FAIL, Action.RETRIGGER)
with DAG(
dag_id="always-on-service",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="health-check",
timeout=3600, # 1 hour timeout
poke_interval=60, # Check every minute
python_callable=check_service_health,
)
Using Limiters¶
Configure automatic shutdown with limiters:
from datetime import datetime, timedelta, time
from airflow import DAG
from airflow_ha import HighAvailabilityOperator, Result, Action
def my_check(**kwargs):
return (Result.PASS, Action.CONTINUE)
with DAG(
dag_id="limited-ha-dag",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="ha-task",
python_callable=my_check,
poke_interval=60,
# Limiters
runtime=timedelta(hours=8), # Max 8 hours runtime
endtime=time(18, 0), # Stop at 6 PM
maxretrigger=10, # Max 10 retriggers
)
Connecting to Downstream Tasks¶
The HighAvailabilityOperator exposes branches for different outcomes:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action
with DAG(
dag_id="ha-with-branches",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
ha = HighAvailabilityOperator(
task_id="ha",
timeout=300,
poke_interval=30,
python_callable=lambda **kwargs: (Result.PASS, Action.STOP),
)
# Connect to different outcome branches
on_pass_retrigger = PythonOperator(
task_id="on-pass-retrigger",
python_callable=lambda: print("Retriggering after success"),
)
ha.retrigger_pass >> on_pass_retrigger
on_fail_retrigger = PythonOperator(
task_id="on-fail-retrigger",
python_callable=lambda: print("Retriggering after failure"),
)
ha.retrigger_fail >> on_fail_retrigger
on_pass_stop = PythonOperator(
task_id="on-pass-stop",
python_callable=lambda: print("Stopping after success"),
)
ha.stop_pass >> on_pass_stop
on_fail_stop = PythonOperator(
task_id="on-fail-stop",
python_callable=lambda: print("Stopping after failure"),
trigger_rule="all_failed",
)
ha.stop_fail >> on_fail_stop
Configuration Options¶
HighAvailabilityOperator Parameters¶
Parameter |
Type |
Description |
|---|---|---|
|
Callable |
Function that returns |
|
int |
Sensor timeout in seconds |
|
int |
Seconds between check calls |
|
timedelta/int |
Maximum runtime before auto-stop |
|
time/str |
Time of day to stop |
|
int |
Maximum number of retriggers |
|
str |
Date reference: |
|
dict |
Kwargs passed to retrigger on PASS |
|
dict |
Kwargs passed to retrigger on FAIL |
DAG Params (Runtime Overrides)¶
The operator automatically adds DAG params for runtime overrides:
{task_id}-force-run: Ignore all limiters{task_id}-force-runtime: Override runtime limit{task_id}-force-endtime: Override end time{task_id}-force-maxretrigger: Override max retrigger count
Next Steps¶
See Examples for more detailed use cases
Consult the API Reference for complete API documentation
Check out airflow-supervisor which uses this library for supervisor process management