Getting Started¶
Overview¶
airflow-common provides a collection of common operators, tasks, and utilities for Apache Airflow that simplify everyday workflow development. It includes reusable components for topology management, library installation, infrastructure maintenance, and common task patterns.
Key Features:
Common Operators: Pre-built
Skip,Fail, andPassoperators for workflow controlTopology Helpers: Functions like
all_success_any_failureandif_booted_dofor complex DAG topologiesLibrary Management: Models and operators for installing pip, conda, and git libraries
Infrastructure Tasks: Common infrastructure tasks like journal cleanup
Pydantic Integration: All models are Pydantic-based for validation and serialization
Note
This library is built on airflow-pydantic and integrates seamlessly with airflow-config for YAML-driven DAG definitions.
Installation¶
Install airflow-common from PyPI:
pip install airflow-common
Or via conda:
conda install airflow-common -c conda-forge
For use with Apache Airflow 2.x:
pip install airflow-common[airflow]
For use with Apache Airflow 3.x:
pip install airflow-common[airflow3]
Basic Usage¶
Common Operators¶
Use the convenience operators for common workflow patterns:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Skip, Fail, Pass
with DAG(
dag_id="common-operators-example",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Create a skip task (useful for conditional branches)
skip_task = Skip(task_id="skip-this")
# Create a fail task (useful for error handling branches)
fail_task = Fail(task_id="fail-on-error", trigger_rule="one_failed")
# Create a pass task (useful for success branches)
pass_task = Pass(task_id="success-path", trigger_rule="none_failed")
Topology Helpers¶
Build complex DAG topologies with helper functions:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow_common import all_success_any_failure
with DAG(
dag_id="topology-example",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Create multiple tasks
tasks = [
PythonOperator(task_id=f"task-{i}", python_callable=lambda: None)
for i in range(3)
]
# Create success/failure aggregation
any_failure, all_success = all_success_any_failure(
task_id="aggregation",
tasks=tasks,
dag=dag,
)
Library Models¶
Define libraries to install with Pydantic models:
from airflow_common import PipLibrary, GitRepo, LibraryList
# Define pip libraries
pandas = PipLibrary(name="pandas", version_constraint=">=2.0")
numpy = PipLibrary(name="numpy")
# Define a git repository to clone and install
my_lib = GitRepo(
name="my-library",
repo="https://github.com/my-org/my-library.git",
branch="main",
install=True,
editable=True,
)
# Group them in a library list
libraries = LibraryList(
pip=[pandas, numpy],
git=[my_lib],
)
Module Structure¶
airflow Module¶
Common Airflow operators and topology helpers:
Skip,Fail,Pass- Convenience operatorsall_success_any_failure- Aggregate success/failure from multiple tasksif_booted_do- Conditional execution based on host availability
library Module¶
Library management models and operators:
PipLibrary- Pip package installation modelCondaLibrary- Conda package installation modelGitRepo- Git repository clone and install modelLibraryList- Aggregate library listInstallLibraryOperator- Operator to install libraries locallyInstallLibrarySSHOperator- Operator to install libraries via SSH
infra Module¶
Infrastructure maintenance tasks:
JournalctlClean/JournalctlCleanSSH- Clean systemd journal logsSystemctl/SystemctlSSH- Manage systemd services (start, stop, enable, disable, restart)Reboot/RebootSSH- Reboot systems (with optional delay viaatcommand)Lunchy/LunchySSH- Manage macOS launchd services via Lunchy
Integration with airflow-config¶
Define libraries and tasks in YAML with airflow-config:
# config/libraries.yaml
_target_: airflow_config.Configuration
extensions:
libraries:
_target_: airflow_common.LibraryList
pip:
- _target_: airflow_common.PipLibrary
name: pandas
version_constraint: ">=2.0"
- _target_: airflow_common.PipLibrary
name: numpy
git:
- _target_: airflow_common.GitRepo
name: my-library
repo: "https://github.com/my-org/my-library.git"
branch: main
install: true
editable: true
Next Steps¶
See Examples for more detailed use cases
Consult the API Reference for complete API documentation
Check out airflow-pydantic for core Pydantic Airflow models