Examples¶
Load defaults from config¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultTaskArgs
owner: test
from airflow_config import load_config, DAG, create_dag
conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
Load more defaults from config¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultTaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "01:10"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
from airflow_config import load_config, DAG, create_dag
conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
assert conf.default_args.email == ["myemail@myemail.com"]
assert conf.default_args.email_on_failure is False
assert conf.default_args.email_on_retry is False
assert conf.default_args.retries == 0
assert conf.default_args.depends_on_past is False
assert conf.default_dag_args.start_date == datetime(2024, 1, 1)
assert conf.default_dag_args.catchup is False
assert conf.default_dag_args.tags == ["utility", "test"]
Specialize individual DAGs¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
dags:
example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
example_dag2:
default_args:
owner: "custom_owner2"
schedule: "0 4 * * *"
from airflow_config import load_config, DAG, create_dag
conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert d.default_args["owner"] == "test"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.default_args["email_on_failure"] is False
assert d.default_args["email_on_retry"] is False
assert d.default_args["retries"] == 0
assert d.default_args["depends_on_past"] is False
assert d.schedule_interval == timedelta(seconds=3600)
assert isinstance(d.timetable, DeltaDataIntervalTimetable)
assert isinstance(d.timetable._delta, timedelta)
assert d.start_date.year == 2024
assert d.start_date.month == 1
assert d.start_date.day == 1
assert d.catchup is False
assert d.tags == ["utility", "test"]
# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag", config=conf)
assert d.default_args["owner"] == "custom_owner"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 3 * * *"
# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag2", config=conf)
assert d.default_args["owner"] == "custom_owner2"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 4 * * *"
Declarative Dags - “DAG Factory”¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
dags:
example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
tasks:
task_1:
_target_: airflow_config.SSHOperator
ssh_conn_id: test
ssh_hook: airflow_pydantic.tests.conftest.hook
command: test
do_xcom_push: true
cmd_timeout: 10
get_pty: true
environment: {"test": "test"}
task_2:
_target_: airflow_config.ShortCircuitOperator
python_callable: airflow_config.tests.conftest.should_short_circuit
dependencies: [task_1]
task_3:
_target_: airflow_config.BashOperator
bash_command: "echo '1'"
dependencies: [task_2]
task_4:
_target_: airflow_config.BashOperator
bash_command: "echo `pwd`"
dependencies: [task_3]
task_5:
_target_: airflow_config.PythonOperator
python_callable: airflow_config.tests.setups.utils.print_hello.print_hello
op_args: []
op_kwargs: {}
templates_dict: {}
templates_exts: null
show_return_value_in_logs: true
dependencies: [task_4]
from pathlib import Path
from airflow_config import load_config
conf = load_config("config", "test")
conf.generate(Path(__file__).parent.resolve())
Generated DAG
# Generated by airflow-config
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow_config.tests.conftest import should_short_circuit
from airflow_config.tests.setups.utils.print_hello import print_hello
with DAG('example_dag') as dag:
task_1 = SSHOperator(do_xcom_push=True, ssh_hook=SSHHook(remote_host='test', username='test', port=22, cmd_timeout=10, keepalive_interval=30, banner_timeout=30.0), ssh_conn_id='test', command='test', cmd_timeout=10, environment={'test': 'test'}, get_pty=True, task_id='task_1', dag=dag)
task_2 = ShortCircuitOperator(python_callable=should_short_circuit, task_id='task_2', dag=dag)
task_3 = BashOperator(bash_command="echo '1'", append_env=False, output_encoding='utf-8', skip_on_exit_code=99, task_id='task_3', dag=dag)
task_4 = BashOperator(bash_command='echo `pwd`', append_env=False, output_encoding='utf-8', skip_on_exit_code=99, task_id='task_4', dag=dag)
task_5 = PythonOperator(python_callable=print_hello, op_args=[], op_kwargs={}, templates_dict={}, show_return_value_in_logs=True, task_id='task_5', dag=dag)
task_1 >> task_2
task_2 >> task_3
task_3 >> task_4
task_4 >> task_5