Getting Started

Overview

airflow-common provides a collection of common operators, tasks, and utilities for Apache Airflow that simplify everyday workflow development. It includes reusable components for topology management, library installation, infrastructure maintenance, and common task patterns.

Key Features:

  • Common Operators: Pre-built Skip, Fail, and Pass operators for workflow control

  • Topology Helpers: Functions like all_success_any_failure and if_booted_do for complex DAG topologies

  • Library Management: Models and operators for installing pip, conda, and git libraries

  • Infrastructure Tasks: Common infrastructure tasks like journal cleanup

  • Pydantic Integration: All models are Pydantic-based for validation and serialization

Note

This library is built on airflow-pydantic and integrates seamlessly with airflow-config for YAML-driven DAG definitions.

Installation

Install airflow-common from PyPI:

pip install airflow-common

Or via conda:

conda install airflow-common -c conda-forge

For use with Apache Airflow 2.x:

pip install airflow-common[airflow]

For use with Apache Airflow 3.x:

pip install airflow-common[airflow3]

Basic Usage

Common Operators

Use the convenience operators for common workflow patterns:

from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Skip, Fail, Pass

with DAG(
    dag_id="common-operators-example",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    # Create a skip task (useful for conditional branches)
    skip_task = Skip(task_id="skip-this")

    # Create a fail task (useful for error handling branches)
    fail_task = Fail(task_id="fail-on-error", trigger_rule="one_failed")

    # Create a pass task (useful for success branches)
    pass_task = Pass(task_id="success-path", trigger_rule="none_failed")

Topology Helpers

Build complex DAG topologies with helper functions:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow_common import all_success_any_failure

with DAG(
    dag_id="topology-example",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    # Create multiple tasks
    tasks = [
        PythonOperator(task_id=f"task-{i}", python_callable=lambda: None)
        for i in range(3)
    ]

    # Create success/failure aggregation
    any_failure, all_success = all_success_any_failure(
        task_id="aggregation",
        tasks=tasks,
        dag=dag,
    )

Library Models

Define libraries to install with Pydantic models:

from airflow_common import PipLibrary, GitRepo, LibraryList

# Define pip libraries
pandas = PipLibrary(name="pandas", version_constraint=">=2.0")
numpy = PipLibrary(name="numpy")

# Define a git repository to clone and install
my_lib = GitRepo(
    name="my-library",
    repo="https://github.com/my-org/my-library.git",
    branch="main",
    install=True,
    editable=True,
)

# Group them in a library list
libraries = LibraryList(
    pip=[pandas, numpy],
    git=[my_lib],
)

Module Structure

airflow Module

Common Airflow operators and topology helpers:

  • Skip, Fail, Pass - Convenience operators

  • all_success_any_failure - Aggregate success/failure from multiple tasks

  • if_booted_do - Conditional execution based on host availability

library Module

Library management models and operators:

  • PipLibrary - Pip package installation model

  • CondaLibrary - Conda package installation model

  • GitRepo - Git repository clone and install model

  • LibraryList - Aggregate library list

  • InstallLibraryOperator - Operator to install libraries locally

  • InstallLibrarySSHOperator - Operator to install libraries via SSH

infra Module

Infrastructure maintenance tasks:

  • JournalctlClean / JournalctlCleanSSH - Clean systemd journal logs

  • Systemctl / SystemctlSSH - Manage systemd services (start, stop, enable, disable, restart)

  • Reboot / RebootSSH - Reboot systems (with optional delay via at command)

  • Lunchy / LunchySSH - Manage macOS launchd services via Lunchy

Integration with airflow-config

Define libraries and tasks in YAML with airflow-config:

# config/libraries.yaml
_target_: airflow_config.Configuration

extensions:
  libraries:
    _target_: airflow_common.LibraryList
    pip:
      - _target_: airflow_common.PipLibrary
        name: pandas
        version_constraint: ">=2.0"
      - _target_: airflow_common.PipLibrary
        name: numpy
    git:
      - _target_: airflow_common.GitRepo
        name: my-library
        repo: "https://github.com/my-org/my-library.git"
        branch: main
        install: true
        editable: true

Next Steps