Examples

This page provides comprehensive examples of using airflow-pydantic for various use cases.

Programmatic DAG Examples

For cases where programmatic DAG creation is preferred:

DAG Examples

Simple DAG with Python and Bash Tasks

from datetime import datetime, timedelta
from airflow_pydantic import (
    Dag,
    DagArgs,
    TaskArgs,
    PythonTask,
    PythonTaskArgs,
    BashTask,
    BashTaskArgs,
)


def process_data(**kwargs):
    print("Processing data...")
    return {"status": "success"}


dag_args = DagArgs(
    description="A simple ETL DAG",
    schedule="0 6 * * *",  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["etl", "example"],
)

task_args = TaskArgs(
    owner="data-team",
    retries=3,
    retry_delay=timedelta(minutes=5),
    email_on_failure=True,
    email=["alerts@example.com"],
)

dag = Dag(
    dag_id="simple-etl-dag",
    **dag_args.model_dump(exclude_unset=True),
    default_args=task_args,
    tasks={
        "extract": BashTask(
            task_id="extract",
            bash_command="echo 'Extracting data...'",
        ),
        "transform": PythonTask(
            task_id="transform",
            python_callable=process_data,
            dependencies=["extract"],
        ),
        "load": BashTask(
            task_id="load",
            bash_command="echo 'Loading data...'",
            dependencies=["transform"],
        ),
    },
)

DAG with Sensors

from datetime import datetime, time, timedelta
from airflow_pydantic import (
    Dag,
    PythonTask,
    TimeSensor,
    TimeSensorArgs,
    WaitSensor,
    WaitSensorArgs,
    DateTimeSensor,
    DateTimeSensorArgs,
)


def run_after_wait(**kwargs):
    print("Running after wait!")


dag = Dag(
    dag_id="sensor-example-dag",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    tasks={
        "wait_for_time": TimeSensor(
            task_id="wait_for_time",
            target_time=time(9, 0),  # Wait until 9 AM
        ),
        "wait_for_duration": WaitSensor(
            task_id="wait_for_duration",
            time_to_wait=timedelta(minutes=10),
            dependencies=["wait_for_time"],
        ),
        "run_task": PythonTask(
            task_id="run_task",
            python_callable=run_after_wait,
            dependencies=["wait_for_duration"],
        ),
    },
)

DAG with Custom Schedule (Timetables)

from datetime import datetime
from airflow_pydantic import Dag, BashTask
from airflow_pydantic.utils import CronTriggerTimetable, MultipleCronTriggerTimetable

# Single cron with specific timezone
dag_single = Dag(
    dag_id="cron-trigger-dag",
    schedule=CronTriggerTimetable(
        cron="0 9 * * MON-FRI",  # 9 AM on weekdays
        timezone="America/New_York",
    ),
    start_date=datetime(2024, 1, 1),
    tasks={
        "weekday_task": BashTask(
            task_id="weekday_task",
            bash_command="echo 'Running on weekdays!'",
        ),
    },
)

# Multiple cron expressions
dag_multi = Dag(
    dag_id="multi-cron-dag",
    schedule=MultipleCronTriggerTimetable(
        cron_defs=["0 9 * * MON-FRI", "0 12 * * SAT,SUN"],
        timezone="UTC",
    ),
    start_date=datetime(2024, 1, 1),
    tasks={
        "flexible_task": BashTask(
            task_id="flexible_task",
            bash_command="echo 'Running!'",
        ),
    },
)

Task Examples

Python Task with Arguments

from airflow_pydantic import PythonTask, PythonTaskArgs


def greet(name, greeting="Hello", **kwargs):
    message = f"{greeting}, {name}!"
    print(message)
    return message


python_args = PythonTaskArgs(
    python_callable=greet,
    op_args=["World"],
    op_kwargs={"greeting": "Hi"},
    show_return_value_in_logs=True,
)

task = PythonTask(
    task_id="greet-task",
    **python_args.model_dump(exclude_unset=True),
)

Bash Task with Environment Variables

from airflow_pydantic import BashTask, BashTaskArgs

bash_args = BashTaskArgs(
    bash_command="echo $MY_VAR && python script.py",
    env={"MY_VAR": "hello", "DEBUG": "true"},
    append_env=True,
    cwd="/opt/scripts",
    output_encoding="utf-8",
)

task = BashTask(
    task_id="bash-with-env",
    **bash_args.model_dump(exclude_unset=True),
)

SSH Task

from airflow_pydantic import SSHTask, SSHTaskArgs
from airflow_pydantic.utils import BashCommands

ssh_args = SSHTaskArgs(
    ssh_conn_id="my_ssh_connection",
    command=BashCommands(
        commands=[
            "cd /opt/app",
            "git pull",
            "pip install -r requirements.txt",
            "python run.py",
        ],
        login=True,
    ),
    environment={"PYTHONPATH": "/opt/app"},
    cmd_timeout=300,
)

task = SSHTask(
    task_id="remote-deploy",
    **ssh_args.model_dump(exclude_unset=True),
)

Trigger DAG Run Task

from airflow_pydantic import TriggerDagRunTask, TriggerDagRunTaskArgs

trigger_args = TriggerDagRunTaskArgs(
    trigger_dag_id="downstream-dag",
    wait_for_completion=True,
    poke_interval=30,
    conf={"triggered_by": "upstream-dag"},
)

task = TriggerDagRunTask(
    task_id="trigger-downstream",
    **trigger_args.model_dump(exclude_unset=True),
)

Utility Examples

Using Pools

from airflow_pydantic import Pool, BashTask, BashTaskArgs

# Define a pool
pool = Pool(
    pool="resource-limited-pool",
    slots=3,
    description="Pool for resource-intensive tasks",
)

# Use pool in task
task = BashTask(
    task_id="pooled-task",
    bash_command="heavy_computation.sh",
    pool=pool,
    pool_slots=1,
)

Using Variables

from airflow_pydantic import Variable

# Define variables
api_key_var = Variable(
    key="API_KEY",
    description="API key for external service",
    is_encrypted=True,
)

config_var = Variable(
    key="APP_CONFIG",
    description="Application configuration",
    deserialize_json=True,
)

# Get values at runtime
api_key = api_key_var.get()
config = config_var.get()  # Returns parsed JSON

Using BashCommands Helper

from airflow_pydantic.utils import BashCommands, in_bash, in_conda, in_virtualenv

# Using BashCommands model
commands = BashCommands(
    commands=[
        "echo 'Starting...'",
        "python process.py",
        "echo 'Done!'",
    ],
    login=True,
    cwd="/opt/app",
    env={"DEBUG": "1"},
)

# Convert to bash command string
bash_string = commands.model_dump()

# Using helper functions
conda_cmd = in_conda("myenv", "python script.py")
# Result: "micromamba activate myenv && python script.py"

venv_cmd = in_virtualenv("/opt/venv", "python script.py")
# Result: "source /opt/venv/bin/activate && python script.py"

bash_cmd = in_bash("echo 'hello'", login=True, cwd="/tmp")
# Result: bash -lc 'cd /tmp; echo 'hello''

Using DAG Parameters

from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from airflow_pydantic import Dag, BashTask


class MyDagParams(BaseModel):
    environment: str = "production"
    debug: bool = False
    batch_size: Optional[int] = None


dag = Dag(
    dag_id="parameterized-dag",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    params=MyDagParams,
    tasks={
        "run_with_params": BashTask(
            task_id="run_with_params",
            bash_command='echo "Running in {{ params.environment }}"',
        ),
    },
)

Serialization Examples

Save and Load DAG Configuration

import json
from datetime import datetime
from airflow_pydantic import Dag, BashTask

# Create a DAG
dag = Dag(
    dag_id="serializable-dag",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    description="A DAG that can be serialized",
    tasks={
        "task1": BashTask(task_id="task1", bash_command="echo 'hello'"),
    },
)

# Serialize to JSON
dag_json = dag.model_dump_json(exclude_unset=True, indent=2)
print(dag_json)

# Save to file
with open("dag_config.json", "w") as f:
    f.write(dag_json)

# Load from file
with open("dag_config.json", "r") as f:
    loaded_dag = Dag.model_validate_json(f.read())

Generate DAG Python Code

from datetime import datetime
from airflow_pydantic import Dag, BashTask, PythonTask


def my_func(**kwargs):
    pass


dag = Dag(
    dag_id="generated-dag",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    tasks={
        "bash_task": BashTask(
            task_id="bash_task",
            bash_command="echo 'Hello'",
        ),
        "python_task": PythonTask(
            task_id="python_task",
            python_callable="my_module.my_func",
            dependencies=["bash_task"],
        ),
    },
)

# Render to Python code
python_code = dag.render()
print(python_code)

# Save to a .py file for use in Airflow
with open("dags/generated_dag.py", "w") as f:
    f.write(python_code)

Code Generation with render()

The render() method is a powerful feature that generates valid Python code from Pydantic models. This enables code generation workflows, debugging, and migration paths.

Rendering Individual Tasks

You can render individual operators and sensors:

from airflow_pydantic import BashTask, PythonTask, SSHTask
from datetime import timedelta

# Render a BashTask
bash_task = BashTask(
    task_id="extract_data",
    bash_command="python extract.py --date {{ ds }}",
    retries=3,
    retry_delay=timedelta(minutes=5),
)
print(bash_task.render())
# Output:
# BashOperator(
#     task_id="extract_data",
#     bash_command="python extract.py --date {{ ds }}",
#     retries=3,
#     retry_delay=timedelta(minutes=5),
# )

# Render an SSHTask
ssh_task = SSHTask(
    task_id="remote_job",
    ssh_conn_id="my_server",
    command="./run_job.sh",
    cmd_timeout=300,
)
print(ssh_task.render())

Rendering Complete DAGs

Render an entire DAG with all its tasks and dependencies:

from datetime import datetime, timedelta
from airflow_pydantic import Dag, DagArgs, TaskArgs, BashTask, PythonTask

dag = Dag(
    dag_id="etl-pipeline",
    description="Daily ETL pipeline",
    schedule="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args=TaskArgs(
        owner="data-team",
        retries=3,
        retry_delay=timedelta(minutes=5),
    ),
    tasks={
        "extract": BashTask(
            task_id="extract",
            bash_command="python extract.py",
        ),
        "transform": PythonTask(
            task_id="transform",
            python_callable="etl.transform.run",
            dependencies=["extract"],
        ),
        "load": BashTask(
            task_id="load",
            bash_command="python load.py",
            dependencies=["transform"],
        ),
    },
)

# Generate the complete DAG file
dag_code = dag.render()
print(dag_code)

CI/CD Code Generation

Use render() in CI/CD pipelines to generate DAG files from configuration:

# scripts/generate_dags.py
import json
from pathlib import Path
from airflow_pydantic import Dag

def generate_dags(config_dir: Path, output_dir: Path):
    """Generate DAG files from JSON configuration."""
    for config_file in config_dir.glob("*.json"):
        # Load configuration
        with open(config_file) as f:
            config = json.load(f)

        # Create and validate DAG model
        dag = Dag.model_validate(config)

        # Render to Python code
        python_code = dag.render()

        # Write DAG file
        output_file = output_dir / f"{dag.dag_id}.py"
        output_file.write_text(python_code)
        print(f"Generated: {output_file}")

if __name__ == "__main__":
    generate_dags(
        config_dir=Path("config/dags"),
        output_dir=Path("dags"),
    )

Rendering Timetables

Custom timetables can also be rendered:

from airflow_pydantic import Dag, BashTask
from airflow_pydantic.utils import CronTriggerTimetable, MultipleCronTriggerTimetable

dag = Dag(
    dag_id="multi-schedule-dag",
    schedule=MultipleCronTriggerTimetable(
        cron_defs=["0 9 * * MON-FRI", "0 12 * * SAT,SUN"],
        timezone="America/New_York",
    ),
    start_date=datetime(2024, 1, 1),
    tasks={
        "task": BashTask(
            task_id="task",
            bash_command="echo 'Running!'",
        ),
    },
)

print(dag.render())

Additional airflow-config Patterns

For more advanced airflow-config usage patterns, see the declarative DAG examples at the top of this page.

Using Pools in YAML

# config/pooled_dag.yaml
pools:
  resource_pool:
    _target_: airflow_pydantic.Pool
    pool: resource-limited-pool
    slots: 5
    description: "Pool for resource-intensive tasks"

tasks:
  heavy_task:
    _target_: airflow_pydantic.BashTask
    task_id: heavy_task
    bash_command: "python heavy_computation.py"
    pool: resource-limited-pool
    pool_slots: 1

Using Variables in YAML

# config/var_dag.yaml
variables:
  api_config:
    _target_: airflow_pydantic.Variable
    key: API_CONFIG
    description: "API configuration"
    deserialize_json: true

Template Pattern

Use the template pattern to create reusable task configurations:

from airflow_pydantic import SSHTask, SSHTaskArgs

# Create a template task
template_task = SSHTask(
    task_id="template",
    ssh_conn_id="default_ssh",
    command="echo 'default'",
    retries=3,
    retry_delay=timedelta(minutes=5),
)

# Create new tasks based on template
task1 = SSHTask(
    task_id="task1",
    template=template_task,
    command="echo 'task 1'",  # Override specific fields
)

task2 = SSHTask(
    task_id="task2",
    template=template_task,
    command="echo 'task 2'",
)