Getting Started

Overview

airflow-ha provides a High Availability (HA) DAG utility for Apache Airflow that enables long-running or “always-on” tasks. It’s designed to work around Airflow’s traditional task execution model, which assumes tasks complete within a reasonable timeframe.

Key Benefits:

  • Always-On DAGs: Build DAGs that continuously run and monitor themselves

  • Self-Retriggering: Automatically retrigger DAGs when tasks complete or fail

  • Graceful Shutdown: Configure runtime limits, end times, and max retrigger counts

  • Flexible Outcomes: Control DAG behavior based on check results (PASS/FAIL, CONTINUE/RETRIGGER/STOP)

  • AWS MWAA Compatible: Perfect for AWS Managed Workflows for Apache Airflow (MWAA) where DAGs have a maximum runtime of 12 hours

Note

This library is used by airflow-supervisor to build DAGs that manage supervisor processes with fault tolerance and automatic recovery.

Important

AWS MWAA Users: AWS Managed Workflows for Apache Airflow imposes a 12-hour maximum DAG runtime limit. airflow-ha provides a clean solution by automatically retriggering DAGs before they hit this limit, allowing you to run continuous workloads on MWAA.

Installation

Install airflow-ha from PyPI:

pip install airflow-ha

For use with Apache Airflow 2.x:

pip install airflow-ha[airflow]

For use with Apache Airflow 3.x:

pip install airflow-ha[airflow3]

Basic Concepts

The HighAvailabilityOperator

The core component is HighAvailabilityOperator, which inherits from PythonSensor. It runs a user-provided python_callable and takes action based on the return value:

Return

Result

Current DAGrun End State

(PASS, RETRIGGER)

Retrigger the same DAG to run again

pass

(PASS, STOP)

Finish the DAG, until its next scheduled run

pass

(FAIL, RETRIGGER)

Retrigger the same DAG to run again

fail

(FAIL, STOP)

Finish the DAG, until its next scheduled run

fail

(*, CONTINUE)

Continue to run the Sensor

N/A

Note

If the sensor times out, the behavior matches (Result.PASS, Action.RETRIGGER).

Result and Action Enums

from airflow_ha import Result, Action

# Result indicates success or failure
Result.PASS  # Task succeeded
Result.FAIL  # Task failed

# Action controls what happens next
Action.CONTINUE   # Keep running the sensor
Action.RETRIGGER  # Trigger a new DAG run
Action.STOP       # Stop and wait for next scheduled run

Basic Usage

Simple Always-On DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action

def check_service_health(**kwargs):
    """Check if the service is healthy."""
    # Your health check logic here
    is_healthy = True  # Replace with actual check
    
    if is_healthy:
        return (Result.PASS, Action.CONTINUE)
    else:
        return (Result.FAIL, Action.RETRIGGER)

with DAG(
    dag_id="always-on-service",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    ha = HighAvailabilityOperator(
        task_id="health-check",
        timeout=3600,  # 1 hour timeout
        poke_interval=60,  # Check every minute
        python_callable=check_service_health,
    )

Using Limiters

Configure automatic shutdown with limiters:

from datetime import datetime, timedelta, time
from airflow import DAG
from airflow_ha import HighAvailabilityOperator, Result, Action

def my_check(**kwargs):
    return (Result.PASS, Action.CONTINUE)

with DAG(
    dag_id="limited-ha-dag",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    ha = HighAvailabilityOperator(
        task_id="ha-task",
        python_callable=my_check,
        poke_interval=60,
        # Limiters
        runtime=timedelta(hours=8),  # Max 8 hours runtime
        endtime=time(18, 0),  # Stop at 6 PM
        maxretrigger=10,  # Max 10 retriggers
    )

Connecting to Downstream Tasks

The HighAvailabilityOperator exposes branches for different outcomes:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_ha import HighAvailabilityOperator, Result, Action

with DAG(
    dag_id="ha-with-branches",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    ha = HighAvailabilityOperator(
        task_id="ha",
        timeout=300,
        poke_interval=30,
        python_callable=lambda **kwargs: (Result.PASS, Action.STOP),
    )
    
    # Connect to different outcome branches
    on_pass_retrigger = PythonOperator(
        task_id="on-pass-retrigger",
        python_callable=lambda: print("Retriggering after success"),
    )
    ha.retrigger_pass >> on_pass_retrigger
    
    on_fail_retrigger = PythonOperator(
        task_id="on-fail-retrigger", 
        python_callable=lambda: print("Retriggering after failure"),
    )
    ha.retrigger_fail >> on_fail_retrigger
    
    on_pass_stop = PythonOperator(
        task_id="on-pass-stop",
        python_callable=lambda: print("Stopping after success"),
    )
    ha.stop_pass >> on_pass_stop
    
    on_fail_stop = PythonOperator(
        task_id="on-fail-stop",
        python_callable=lambda: print("Stopping after failure"),
        trigger_rule="all_failed",
    )
    ha.stop_fail >> on_fail_stop

Configuration Options

HighAvailabilityOperator Parameters

Parameter

Type

Description

python_callable

Callable

Function that returns (Result, Action) tuple

timeout

int

Sensor timeout in seconds

poke_interval

int

Seconds between check calls

runtime

timedelta/int

Maximum runtime before auto-stop

endtime

time/str

Time of day to stop

maxretrigger

int

Maximum number of retriggers

reference_date

str

Date reference: start_date, logical_date, or data_interval_end

pass_trigger_kwargs

dict

Kwargs passed to retrigger on PASS

fail_trigger_kwargs

dict

Kwargs passed to retrigger on FAIL

DAG Params (Runtime Overrides)

The operator automatically adds DAG params for runtime overrides:

  • {task_id}-force-run: Ignore all limiters

  • {task_id}-force-runtime: Override runtime limit

  • {task_id}-force-endtime: Override end time

  • {task_id}-force-maxretrigger: Override max retrigger count

Next Steps