Examples¶
This page provides comprehensive examples of using airflow-common for various use cases.
Common Operators¶
Skip, Fail, and Pass Operators¶
Use these convenience operators for common workflow patterns:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow_common import Skip, Fail, Pass
def check_condition(**kwargs):
# Your condition logic
return True
with DAG(
dag_id="conditional-workflow",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
check = PythonOperator(
task_id="check-condition",
python_callable=check_condition,
)
# Success path
success_path = Pass(task_id="success-path", trigger_rule="none_failed")
# Failure path
failure_path = Fail(task_id="failure-path", trigger_rule="one_failed")
# Skip path (for optional tasks)
skip_path = Skip(task_id="skip-path")
check >> [success_path, failure_path]
Topology Helpers¶
All Success / Any Failure Pattern¶
Aggregate results from multiple parallel tasks:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow_common import all_success_any_failure
with DAG(
dag_id="parallel-aggregation",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Create multiple parallel tasks
parallel_tasks = [
BashOperator(task_id=f"process-{i}", bash_command=f"echo 'Processing {i}'")
for i in range(5)
]
# Aggregate success/failure
any_failure, all_success = all_success_any_failure(
task_id="aggregate",
tasks=parallel_tasks,
dag=dag,
)
# Continue workflow based on aggregation
next_step = BashOperator(task_id="next-step", bash_command="echo 'All done!'")
all_success >> next_step
Conditional Execution Based on Host Availability¶
Execute tasks only if a host is reachable:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow_common import if_booted_do
with DAG(
dag_id="host-conditional",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Only run if host is available
deploy_task = BashOperator(
task_id="deploy",
bash_command="deploy.sh",
)
if_booted_do(
task_id="deploy-to-server",
host="server.example.com",
task=deploy_task,
dag=dag,
)
Library Management¶
Installing Pip Libraries¶
Define and install pip packages:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import PipLibrary, InstallLibraryOperator
with DAG(
dag_id="install-pip-libraries",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Define libraries
libraries = [
PipLibrary(name="pandas", version_constraint=">=2.0"),
PipLibrary(name="numpy"),
PipLibrary(name="scikit-learn", reinstall=True),
]
install_task = InstallLibraryOperator(
task_id="install-libs",
pip=libraries,
conda=[],
git=[],
)
Installing from Git Repositories¶
Clone and install from git repositories:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import GitRepo, InstallLibraryOperator
with DAG(
dag_id="install-git-repos",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Define git repositories
repos = [
GitRepo(
name="my-package",
repo="https://github.com/my-org/my-package.git",
branch="main",
install=True,
editable=True,
),
GitRepo(
name="another-package",
repo="https://github.com/my-org/another-package.git",
branch="develop",
clean=True, # Clean before checkout
install=True,
install_deps=True,
),
]
install_task = InstallLibraryOperator(
task_id="install-repos",
pip=[],
conda=[],
git=repos,
)
Installing Conda Libraries¶
Define and install conda packages:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import CondaLibrary, InstallLibraryOperator
with DAG(
dag_id="install-conda-libraries",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Define conda libraries
libraries = [
CondaLibrary(
name="tensorflow",
env="ml-env", # Install in specific environment
),
CondaLibrary(
name="pytorch",
tool="mamba", # Use mamba for faster installs
),
]
install_task = InstallLibraryOperator(
task_id="install-conda-libs",
pip=[],
conda=libraries,
git=[],
)
Combined Library List¶
Install multiple types of libraries together:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import (
PipLibrary,
CondaLibrary,
GitRepo,
LibraryList,
InstallLibraryOperator,
)
with DAG(
dag_id="install-all-libraries",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Define all libraries
library_list = LibraryList(
pip=[
PipLibrary(name="requests"),
PipLibrary(name="pydantic", version_constraint=">=2"),
],
conda=[
CondaLibrary(name="numpy"),
],
git=[
GitRepo(
name="my-lib",
repo="https://github.com/my-org/my-lib.git",
branch="main",
install=True,
),
],
parallel=True, # Install in parallel when possible
)
install_task = InstallLibraryOperator(
task_id="install-all",
pip=library_list.pip,
conda=library_list.conda,
git=library_list.git,
parallel=library_list.parallel,
)
Remote Library Installation via SSH¶
Install libraries on remote machines:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import PipLibrary, GitRepo, InstallLibrarySSHOperator
with DAG(
dag_id="install-remote-libraries",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
install_task = InstallLibrarySSHOperator(
task_id="install-on-remote",
pip=[
PipLibrary(name="pandas"),
PipLibrary(name="numpy"),
],
conda=[],
git=[
GitRepo(
name="my-app",
repo="https://github.com/my-org/my-app.git",
branch="main",
install=True,
),
],
ssh_conn_id="my-remote-server",
command_prefix="source /opt/venv/bin/activate",
)
Infrastructure Tasks¶
Journalctl Cleanup¶
Clean systemd journal logs to free disk space:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import JournalctlClean, JournalctlCleanSSH
with DAG(
dag_id="infrastructure-cleanup",
schedule=timedelta(days=7), # Weekly cleanup
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Local cleanup
local_cleanup = JournalctlClean(
task_id="local-journal-cleanup",
sudo=True,
days=7, # Keep 7 days of logs
)
# Remote cleanup via SSH
remote_cleanup = JournalctlCleanSSH(
task_id="remote-journal-cleanup",
ssh_conn_id="production-server",
sudo=True,
days=7,
)
local_cleanup >> remote_cleanup
Systemctl Service Management¶
Manage systemd services (start, stop, enable, disable, restart):
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Systemctl, SystemctlSSH
with DAG(
dag_id="service-management",
schedule=None, # Manual trigger
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Stop service before maintenance
stop_worker = Systemctl(
task_id="stop-worker",
service="airflow-celery-worker",
action="stop",
sudo=True,
)
# Restart service after maintenance
restart_worker = Systemctl(
task_id="restart-worker",
service="airflow-celery-worker",
action="restart",
sudo=True,
)
# Enable service on remote server
enable_remote = SystemctlSSH(
task_id="enable-remote-service",
service="airflow-scheduler",
action="enable",
ssh_conn_id="production-server",
sudo=True,
)
stop_worker >> restart_worker >> enable_remote
Coordinated Service Restart¶
Stop multiple services, perform maintenance, then restart:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Systemctl, JournalctlClean
with DAG(
dag_id="coordinated-restart",
schedule=timedelta(days=7),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
services = ["airflow-webserver", "airflow-scheduler", "airflow-celery-worker"]
# Stop all services
stop_tasks = [
Systemctl(task_id=f"stop-{svc}", service=svc, action="stop")
for svc in services
]
# Maintenance task
cleanup = JournalctlClean(task_id="cleanup-journals", days=7)
# Restart all services
start_tasks = [
Systemctl(task_id=f"start-{svc}", service=svc, action="start")
for svc in services
]
# Chain: stop all -> cleanup -> start all
for stop_task in stop_tasks:
stop_task >> cleanup
for start_task in start_tasks:
cleanup >> start_task
Reboot with Delayed Execution¶
Schedule reboots using the at command for delayed execution:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Systemctl, Reboot, RebootSSH
with DAG(
dag_id="graceful-reboot",
schedule=None, # Manual trigger
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Stop services first
stop_worker = Systemctl(
task_id="stop-worker",
service="airflow-celery-worker",
action="stop",
)
stop_scheduler = Systemctl(
task_id="stop-scheduler",
service="airflow-scheduler",
action="stop",
)
# Schedule reboot in 2 minutes (to allow graceful shutdown)
reboot = Reboot(
task_id="scheduled-reboot",
sudo=True,
delay_minutes=2,
)
[stop_worker, stop_scheduler] >> reboot
Remote Server Reboot¶
Reboot remote servers via SSH:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import SystemctlSSH, RebootSSH
with DAG(
dag_id="remote-reboot",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Stop services on remote
stop_services = SystemctlSSH(
task_id="stop-remote-services",
service="airflow-celery-worker",
action="stop",
ssh_conn_id="worker-server",
)
# Reboot remote server with delay
reboot_remote = RebootSSH(
task_id="reboot-remote",
ssh_conn_id="worker-server",
sudo=True,
delay_minutes=1,
)
stop_services >> reboot_remote
Lunchy for macOS (launchctl wrapper)¶
Manage macOS services using Lunchy (a friendly launchctl wrapper):
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Lunchy, LunchySSH
with DAG(
dag_id="macos-service-management",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Stop local macOS service
stop_worker = Lunchy(
task_id="stop-worker",
service="timkpaine.airflow-celery-worker",
action="stop",
)
# Start local macOS service
start_worker = Lunchy(
task_id="start-worker",
service="timkpaine.airflow-celery-worker",
action="start",
)
# Restart on remote Mac via SSH
restart_remote = LunchySSH(
task_id="restart-remote-worker",
service="timkpaine.airflow-celery-worker",
action="restart",
ssh_conn_id="mac-worker",
)
stop_worker >> start_worker >> restart_remote
macOS Graceful Reboot¶
Stop Lunchy-managed services before rebooting a Mac:
from airflow import DAG
from datetime import datetime, timedelta
from airflow_common import Lunchy, Reboot
with DAG(
dag_id="macos-graceful-reboot",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
services = [
"timkpaine.airflow-celery-worker",
"timkpaine.airflow-scheduler",
"timkpaine.airflow-webserver",
]
# Stop all services
stop_tasks = [
Lunchy(task_id=f"stop-{svc.split('.')[-1]}", service=svc, action="stop")
for svc in services
]
# Schedule reboot after services stop
reboot = Reboot(
task_id="reboot-mac",
sudo=True,
delay_minutes=1,
)
for stop_task in stop_tasks:
stop_task >> reboot
Integration with airflow-config¶
YAML-Driven Library Installation¶
Define libraries in YAML configuration:
# config/deploy/libraries.yaml
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
retries: 2
retry_delay: 60
all_dags:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
schedule: "0 2 * * *"
extensions:
libraries:
_target_: airflow_common.LibraryList
pip:
- _target_: airflow_common.PipLibrary
name: pandas
version_constraint: ">=2.0"
- _target_: airflow_common.PipLibrary
name: numpy
- _target_: airflow_common.PipLibrary
name: scikit-learn
git:
- _target_: airflow_common.GitRepo
name: my-ml-library
repo: "https://github.com/my-org/my-ml-library.git"
branch: main
install: true
editable: true
parallel: true
dags:
deploy-libraries:
tasks:
install:
_target_: airflow_common.LibraryListTask
task_id: install-libraries
pip: ${extensions.libraries.pip}
conda: []
git: ${extensions.libraries.git}
parallel: ${extensions.libraries.parallel}
# dags/deploy_libraries.py
from airflow_config import load_config, DAG
config = load_config("config/deploy", "libraries")
with DAG(dag_id="deploy-libraries", config=config) as dag:
pass # Tasks are created from config
YAML-Driven Infrastructure Tasks¶
# config/infra/maintenance.yaml
_target_: airflow_config.Configuration
all_dags:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
schedule: "0 3 * * 0" # Weekly on Sunday at 3 AM
dags:
weekly-maintenance:
tasks:
cleanup-journals:
_target_: airflow_common.JournalctlCleanTask
task_id: cleanup-journals
sudo: true
days: 14
Using Git Clone Utility¶
The clone_repo function generates bash commands for git operations:
from airflow_common import clone_repo
# Generate clone/install commands
commands = clone_repo(
name="my-repo",
repo="https://github.com/my-org/my-repo.git",
branch="feature/new-feature",
clean=True,
install=True,
editable=True,
tool="uv", # Use uv for faster installs
)
# Use in a BashOperator
from airflow.operators.bash import BashOperator
clone_task = BashOperator(
task_id="clone-and-install",
bash_command=commands,
)
Integration Notes¶
airflow-pydantic¶
airflow-common is built on airflow-pydantic, which provides:
Base Pydantic models for Airflow constructs
Operator wrappers for serialization
Core utilities like
fail,pass_,skip
airflow-config¶
For YAML-driven DAG definitions, use airflow-config:
Hydra-based configuration management
Per-environment overrides
Seamless integration with all
airflow-commonmodels
All models in airflow-common support Hydra’s _target_ syntax for instantiation from YAML configuration files.