Examples

Load defaults from config

# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.DefaultTaskArgs
  owner: test
from airflow_config import load_config, DAG, create_dag

conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)

assert conf.default_args.owner == "test"

Load more defaults from config

# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.DefaultTaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false
default_dag_args:
  _target_: airflow_config.DagArgs
  schedule: "01:10"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]
from airflow_config import load_config, DAG, create_dag

conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
assert conf.default_args.email == ["myemail@myemail.com"]
assert conf.default_args.email_on_failure is False
assert conf.default_args.email_on_retry is False
assert conf.default_args.retries == 0
assert conf.default_args.depends_on_past is False
assert conf.default_dag_args.start_date == datetime(2024, 1, 1)
assert conf.default_dag_args.catchup is False
assert conf.default_dag_args.tags == ["utility", "test"]

Specialize individual DAGs

# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.TaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false

default_dag_args:
  _target_: airflow_config.DagArgs
  schedule: "01:00"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]

dags:
  example_dag:
    default_args:
      owner: "custom_owner"
    description: "this is an example dag"
    schedule: "0 3 * * *"

  example_dag2:
    default_args:
      owner: "custom_owner2"
    schedule: "0 4 * * *"
from airflow_config import load_config, DAG, create_dag

conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert d.default_args["owner"] == "test"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.default_args["email_on_failure"] is False
assert d.default_args["email_on_retry"] is False
assert d.default_args["retries"] == 0
assert d.default_args["depends_on_past"] is False
assert d.schedule_interval == timedelta(seconds=3600)
assert isinstance(d.timetable, DeltaDataIntervalTimetable)
assert isinstance(d.timetable._delta, timedelta)
assert d.start_date.year == 2024
assert d.start_date.month == 1
assert d.start_date.day == 1
assert d.catchup is False
assert d.tags == ["utility", "test"]

# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag", config=conf)
assert d.default_args["owner"] == "custom_owner"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 3 * * *"

# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag2", config=conf)
assert d.default_args["owner"] == "custom_owner2"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 4 * * *"

Declarative Dags - “DAG Factory”

# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration

default_args:
  _target_: airflow_config.TaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false

default_dag_args:
  _target_: airflow_config.DagArgs
  schedule: "01:00"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]

dags:
  example_dag:
    default_args:
      owner: "custom_owner"
    description: "this is an example dag"
    schedule: "0 3 * * *"
    tasks:
      task_1:
        _target_: airflow_config.SSHOperator
        ssh_conn_id: test
        ssh_hook: airflow_pydantic.tests.conftest.hook
        command: test
        do_xcom_push: true
        cmd_timeout: 10
        get_pty: true
        environment: {"test": "test"}
      task_2:
        _target_: airflow_config.ShortCircuitOperator
        python_callable: airflow_config.tests.conftest.should_short_circuit
        dependencies: [task_1]
      task_3:
        _target_: airflow_config.BashOperator
        bash_command: "echo '1'"
        dependencies: [task_2]
      task_4:
        _target_: airflow_config.BashOperator
        bash_command: "echo `pwd`"
        dependencies: [task_3]
      task_5:
        _target_: airflow_config.PythonOperator
        python_callable: airflow_config.tests.setups.utils.print_hello.print_hello
        op_args: []
        op_kwargs: {}
        templates_dict: {}
        templates_exts: null
        show_return_value_in_logs: true
        dependencies: [task_4]
from pathlib import Path
from airflow_config import load_config
conf = load_config("config", "test")
conf.generate(Path(__file__).parent.resolve())

Generated DAG

# Generated by airflow-config
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow_config.tests.conftest import should_short_circuit
from airflow_config.tests.setups.utils.print_hello import print_hello
with DAG('example_dag') as dag:
    task_1 = SSHOperator(do_xcom_push=True, ssh_hook=SSHHook(remote_host='test', username='test', port=22, cmd_timeout=10, keepalive_interval=30, banner_timeout=30.0), ssh_conn_id='test', command='test', cmd_timeout=10, environment={'test': 'test'}, get_pty=True, task_id='task_1', dag=dag)
    task_2 = ShortCircuitOperator(python_callable=should_short_circuit, task_id='task_2', dag=dag)
    task_3 = BashOperator(bash_command="echo '1'", append_env=False, output_encoding='utf-8', skip_on_exit_code=99, task_id='task_3', dag=dag)
    task_4 = BashOperator(bash_command='echo `pwd`', append_env=False, output_encoding='utf-8', skip_on_exit_code=99, task_id='task_4', dag=dag)
    task_5 = PythonOperator(python_callable=print_hello, op_args=[], op_kwargs={}, templates_dict={}, show_return_value_in_logs=True, task_id='task_5', dag=dag)
    task_1 >> task_2
    task_2 >> task_3
    task_3 >> task_4
    task_4 >> task_5