Getting Started¶
Why airflow-config?¶
airflow-config enables YAML-driven, declarative configuration for Apache Airflow DAGs. Instead of hardcoding settings in Python files, you define your DAG configurations in YAML files that can be:
Centrally managed: Change settings across multiple DAGs from a single file
Environment-specific: Maintain separate configurations for dev, staging, and production
Version controlled: Track configuration changes separately from code changes
Validated: Pydantic models ensure configuration correctness before deployment
Key Benefits:
Separation of Concerns: Keep business logic separate from DAG configuration
No Code Changes: Update schedules, retries, emails, etc. without touching Python
DAG Factory: Generate entire DAGs declaratively from YAML
Type Safety: Pydantic validation catches configuration errors early
Extensibility: Integrate with other airflow-laminar libraries seamlessly
The airflow-laminar Ecosystem¶
airflow-config is the configuration backbone of the airflow-laminar ecosystem. It integrates with several libraries to provide a comprehensive Airflow development experience:
Core Foundation: airflow-pydantic¶
airflow-pydantic is the underpinning library that makes airflow-config possible. It provides:
Pydantic Models: Every Airflow construct (DAGs, Operators, Sensors, Hooks) has a corresponding Pydantic model
Serialization: Full JSON/YAML serialization support for all Airflow constructs
Code Generation: Models can render themselves as Python code via the
render()methodInstantiation: Models can create runtime Airflow objects via the
instantiate()methodType Validation: Automatic validation of all configuration values
# airflow-pydantic models power the _target_ declarations
default_args:
_target_: airflow_pydantic.TaskArgs # Pydantic model from airflow-pydantic
owner: data-team
retries: 3
Host & Port Management: airflow-balancer¶
airflow-balancer provides utilities for tracking hosts and ports across your infrastructure. When integrated with airflow-config, you can:
Define your host infrastructure in YAML
Automatically create Airflow pools for each host
Select hosts based on queues, OS, tags, or custom criteria
Track port usage to avoid conflicts
# config/config.yaml
extensions:
balancer:
_target_: airflow_balancer.BalancerConfiguration
default_key_file: /home/airflow/.ssh/id_rsa
hosts:
- name: worker1
os: ubuntu
size: 16
queues: [compute]
Long-Running Jobs: airflow-supervisor¶
airflow-supervisor enables running long-running or always-on jobs with supervisord. Integration with airflow-config allows you to:
Define supervisor configurations in YAML
Automatically generate supervisor DAGs
Combine with
airflow-balancerfor host selection
dags:
long-running-job:
tasks:
supervisor:
_target_: airflow_supervisor.SupervisorSSHTask
cfg:
_target_: airflow_supervisor.SupervisorSSHAirflowConfiguration
runtime: 3600
program:
worker:
_target_: airflow_supervisor.ProgramConfiguration
command: python worker.py
Common Operators: airflow-common¶
airflow-common provides common operators, topology helpers, and library management tasks. It integrates with airflow-config to provide:
Control Operators:
Skip,Fail,Passfor workflow controlTopology Helpers: Functions for building complex DAG structures
Library Management: Operators for installing pip/conda packages
Installation¶
Install from PyPI:
pip install airflow-config
For use with Apache Airflow 2.x:
pip install airflow-config[airflow]
For use with Apache Airflow 3.x:
pip install airflow-config[airflow3]
Or via conda:
conda install airflow-config -c conda-forge
Basic Usage¶
Step 1: Create a Configuration File¶
Create a config directory in your Airflow DAGs folder and add a configuration file:
# dags/config/dev.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: data-team
email: [alerts@example.com]
email_on_failure: true
email_on_retry: false
retries: 3
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "@daily"
start_date: "2024-01-01"
catchup: false
tags: ["production"]
Step 2: Load Configuration in Your DAG¶
# dags/my_dag.py
from airflow.providers.standard.operators.bash import BashOperator
from airflow_config import DAG, load_config
# Load the configuration
config = load_config(config_name="dev")
# Create the DAG with config
with DAG(
dag_id="my-etl-pipeline",
description="Daily ETL pipeline",
config=config,
):
extract = BashOperator(task_id="extract", bash_command="python extract.py")
transform = BashOperator(task_id="transform", bash_command="python transform.py")
load = BashOperator(task_id="load", bash_command="python load.py")
extract >> transform >> load
Step 3: Environment-Specific Configurations¶
Create separate configs for different environments:
# dags/config/prod.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: data-team
email: [production-alerts@example.com]
email_on_failure: true
retries: 5 # More retries in production
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "0 6 * * *" # 6 AM daily in production
start_date: "2024-01-01"
catchup: false
tags: ["production", "critical"]
# dags/my_dag.py
import os
from airflow_config import DAG, load_config
# Load environment-specific config
env = os.getenv("AIRFLOW_ENV", "dev")
config = load_config(config_name=env)
with DAG(dag_id="my-etl-pipeline", config=config):
# ... tasks
DAG Factory: Fully Declarative DAGs¶
airflow-config can generate entire DAGs from YAML, similar to dag-factory but with additional benefits. There are two approaches to DAG generation:
Approach |
Method |
Use Case |
|---|---|---|
File Generation |
|
Generate Python DAG files that can be version controlled and inspected |
In-Memory Generation |
|
Create DAGs at runtime without writing files |
Defining DAGs in YAML¶
First, define your DAGs declaratively in YAML:
# dags/config/declarative.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: data-team
retries: 2
default_dag_args:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
dags:
etl-pipeline:
description: "ETL Pipeline"
schedule: "0 6 * * *"
tags: ["etl", "production"]
tasks:
extract:
_target_: airflow_config.BashTask
bash_command: "python extract.py"
transform:
_target_: airflow_config.PythonTask
python_callable: my_module.transform
dependencies: [extract]
load:
_target_: airflow_config.BashTask
bash_command: "python load.py"
dependencies: [transform]
Approach 1: Generate Python DAG Files¶
Use config.generate() to create actual Python files on disk. This approach is useful when you want to:
Version control the generated DAG code
Inspect and debug the generated Python code
Review changes before deployment
Use standard Airflow tooling that expects
.pyfiles
# dags/generate_dags.py
from pathlib import Path
from airflow_config import load_config
config = load_config("config", "declarative")
config.generate(Path(__file__).parent) # Generates Python DAG files
This creates a file like dags/etl-pipeline.py:
# Generated by airflow-config
from datetime import datetime
from airflow.models import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from my_module import transform
with DAG(
description="ETL Pipeline",
schedule="0 6 * * *",
start_date=datetime.fromisoformat("2024-01-01T00:00:00"),
catchup=False,
tags=["etl", "production"],
dag_id="etl-pipeline",
default_args={
"owner": "data-team",
"retries": 2,
},
) as dag:
extract = BashOperator(bash_command="python extract.py", task_id="extract", dag=dag)
transform_task = PythonOperator(python_callable=transform, task_id="transform", dag=dag)
load = BashOperator(bash_command="python load.py", task_id="load", dag=dag)
extract >> transform_task
transform_task >> load
Approach 2: In-Memory DAG Generation¶
Use config.generate_in_mem() to create DAGs directly in memory at runtime. This approach is useful when you want to:
Avoid file management - no generated files to maintain
Dynamic DAGs - DAGs are created fresh on each scheduler parse
Simpler deployment - just deploy the YAML configs and one Python file
Cleaner DAG folder - fewer Python files to manage
# dags/generate_dags.py
from pathlib import Path
from airflow_config import load_config
config = load_config("config", "declarative")
config.generate_in_mem(dir=Path(__file__).parent) # Creates DAGs in memory
Note: When using
generate_in_mem(), a placeholder DAG is created to ensure Airflow’s scheduler recognizes the file. The actual DAGs are injected into the module’s global namespace.
Comparison¶
Feature |
|
|
|---|---|---|
Creates Python files |
✅ Yes |
❌ No |
Version controllable output |
✅ Yes |
❌ No |
Inspectable generated code |
✅ Yes |
⚠️ Via |
Dynamic at runtime |
❌ No (must re-run) |
✅ Yes |
File management overhead |
Higher |
Lower |
Debugging ease |
Easier |
Harder |
Hybrid Approach¶
You can use file generation during development for easier debugging, then switch to in-memory generation in production:
# dags/generate_dags.py
import os
from pathlib import Path
from airflow_config import load_config
config = load_config("config", "declarative")
if os.getenv("AIRFLOW_ENV") == "development":
# Generate files for easier debugging
config.generate(Path(__file__).parent)
else:
# Use in-memory for cleaner production deployment
config.generate_in_mem(dir=Path(__file__).parent)
Visualization¶
airflow-config includes a built-in UI for viewing your configurations.
Airflow Plugin¶
The plugin adds a “Config” menu item to the Airflow toolbar, allowing you to browse and validate your configurations directly in Airflow.
Standalone Viewer¶
You can also run the viewer as a standalone application:
airflow-config-viewer
Next Steps¶
See the Examples page for comprehensive usage patterns
Check the API Reference for complete API documentation
Explore the Test Setups for real-world examples