Examples¶
Load defaults from config¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultTaskArgs
owner: test
from airflow_config import load_config, DAG, create_dag
conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
Load more defaults from config¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultTaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "01:10"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
from airflow_config import load_config, DAG, create_dag
conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
assert conf.default_args.email == ["myemail@myemail.com"]
assert conf.default_args.email_on_failure is False
assert conf.default_args.email_on_retry is False
assert conf.default_args.retries == 0
assert conf.default_args.depends_on_past is False
assert conf.default_dag_args.start_date == datetime(2024, 1, 1)
assert conf.default_dag_args.catchup is False
assert conf.default_dag_args.tags == ["utility", "test"]
Specialize individual DAGs¶
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
dags:
example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
example_dag2:
default_args:
owner: "custom_owner2"
schedule: "0 4 * * *"
from airflow_config import load_config, DAG, create_dag
conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert d.default_args["owner"] == "test"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.default_args["email_on_failure"] is False
assert d.default_args["email_on_retry"] is False
assert d.default_args["retries"] == 0
assert d.default_args["depends_on_past"] is False
assert d.schedule_interval == timedelta(seconds=3600)
assert isinstance(d.timetable, DeltaDataIntervalTimetable)
assert isinstance(d.timetable._delta, timedelta)
assert d.start_date.year == 2024
assert d.start_date.month == 1
assert d.start_date.day == 1
assert d.catchup is False
assert set(d.tags) == set(["utility", "test"])
# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag", config=conf)
assert d.default_args["owner"] == "custom_owner"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 3 * * *"
# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag2", config=conf)
assert d.default_args["owner"] == "custom_owner2"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 4 * * *"
Declarative Dags - “DAG Factory”¶
Example 1 - Operators and Dependencies¶
# config/test.yaml
# @package _global_
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
dags:
example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
tasks:
task_1:
_target_: airflow_config.SSHOperator
ssh_conn_id: test
ssh_hook: airflow_pydantic.tests.conftest.hook
command: test
do_xcom_push: true
cmd_timeout: 10
get_pty: true
environment: {"test": "test"}
task_2:
_target_: airflow_config.ShortCircuitOperator
python_callable: airflow_config.tests.conftest.should_short_circuit
dependencies: [task_1]
task_3:
_target_: airflow_config.BashOperator
bash_command: "echo '1'"
dependencies: [task_2]
task_4:
_target_: airflow_config.BashOperator
bash_command: "echo `pwd`"
dependencies: [task_3]
task_5:
_target_: airflow_config.PythonOperator
python_callable: airflow_config.tests.setups.utils.print_hello.print_hello
op_args: []
op_kwargs: {}
templates_dict: {}
templates_exts: null
show_return_value_in_logs: true
dependencies: [task_4]
from pathlib import Path
from airflow_config import load_config
conf = load_config("config", "test")
conf.generate(Path(__file__).parent.resolve())
Generated DAG
# Generated by airflow-config
from datetime import datetime
from airflow.models import DAG
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator
from airflow_config.tests.conftest import should_short_circuit
from airflow_config.tests.setups.utils.print_hello import print_hello
with DAG(
description="this is an example dag",
schedule="0 3 * * *",
start_date=datetime.fromisoformat("2024-01-01T00:00:00"),
catchup=False,
tags=["utility", "test"],
dag_id="example_dag",
default_args={
"owner": "custom_owner",
"email": ["myemail@myemail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"depends_on_past": False,
},
) as dag:
task_1 = SSHOperator(
do_xcom_push=True,
ssh_hook=SSHHook(remote_host="test", username="test"),
ssh_conn_id="test",
command="test",
cmd_timeout=10,
environment={"test": "test"},
get_pty=True,
task_id="task_1",
dag=dag,
)
task_2 = ShortCircuitOperator(python_callable=should_short_circuit, task_id="task_2", dag=dag)
task_3 = BashOperator(bash_command="echo '1'", task_id="task_3", dag=dag)
task_4 = BashOperator(bash_command="echo `pwd`", task_id="task_4", dag=dag)
task_5 = PythonOperator(
python_callable=print_hello,
op_args=[],
op_kwargs={},
templates_dict={},
templates_exts=None,
show_return_value_in_logs=True,
task_id="task_5",
dag=dag,
)
task_1 >> task_2
task_2 >> task_3
task_3 >> task_4
task_4 >> task_5
Example 2 - Template Tasks¶
# config/test.yaml
# @package _global_
# @package _global_
_target_: airflow_config.Configuration
templates:
dag:
dag_template:
_target_: airflow_config.DagArgs
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [myemail@myemail.com]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
tasks:
task_template:
_target_: airflow_config.PythonTaskArgs
python_callable: airflow_config.tests.setups.utils.print_hello.print_hello
op_args: []
op_kwargs: {}
templates_dict: {}
templates_exts: null
show_return_value_in_logs: true
dependencies: [task_4]
ssh_task_template:
_target_: airflow_config.SSHTaskArgs
ssh_conn_id: test
ssh_hook: airflow_pydantic.tests.conftest.hook
command: test
do_xcom_push: true
cmd_timeout: 10
get_pty: true
environment: {"test": "test"}
dags:
example_dag:
template: ${templates.dag.dag_template}
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
tasks:
task_1:
_target_: airflow_config.PythonOperator
template: ${templates.tasks.task_template}
task_2:
_target_: airflow_config.SSHOperator
template: ${templates.tasks.ssh_task_template}
dependencies: [task_1]
environment: {"blerg": "blerg"}
from pathlib import Path
from airflow_config import load_config
conf = load_config("config", "test")
conf.generate(Path(__file__).parent.resolve())
Generated DAG
# Generated by airflow-config
from datetime import datetime
from airflow.models import DAG
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow_config.tests.setups.utils.print_hello import print_hello
with DAG(
description="this is an example dag",
schedule="0 3 * * *",
start_date=datetime.fromisoformat("2024-01-01T00:00:00"),
catchup=False,
tags=["utility", "test"],
dag_id="example_dag",
default_args={
"owner": "custom_owner",
"email": ["myemail@myemail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"depends_on_past": False,
},
) as dag:
task_1 = PythonOperator(
python_callable=print_hello,
op_args=[],
op_kwargs={},
templates_dict={},
templates_exts=None,
show_return_value_in_logs=True,
task_id="task_1",
dag=dag,
)
task_2 = SSHOperator(
do_xcom_push=True,
ssh_hook=SSHHook(remote_host="test", username="test"),
ssh_conn_id="test",
command="test",
cmd_timeout=10,
environment={"blerg": "blerg", "test": "test"},
get_pty=True,
task_id="task_2",
dag=dag,
)
task_1 >> task_2
Example 3 - Attribute Access / HighAvailabilityTask¶
# @package _global_
_target_: airflow_config.Configuration
default_task_args:
_target_: airflow_config.TaskArgs
owner: laminar
email: [dev@paine.nyc]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
start_date: ["2025-01-01", "America/New_York"]
catchup: false
max_active_runs: 1
dags:
test-high-availability:
description: "Test HA operator"
tags: ["test", "utility"]
schedule: "0 12 * * *"
tasks:
pre:
_target_: airflow_pydantic.PythonTask
python_callable: validation_dags.test_ha_foo._pre
ha:
_target_: airflow_ha.HighAvailabilityTask
python_callable: validation_dags.test_ha_foo._choose
runtime: 120
maxretrigger: 2
endtime: 23:00:00
timeout: 1000
poke_interval: 10
dependencies: [pre]
retrigger-fail:
_target_: airflow_pydantic.PythonTask
python_callable: validation_dags.test_ha_foo._pre
dependencies: [ha]
validation_dags.test_ha_foo
from random import choice
from airflow_ha import Action, Result
def _pre(**kwargs):
return "test"
def _choose(**kwargs):
return choice(
(
(Result.PASS, Action.CONTINUE),
(Result.PASS, Action.RETRIGGER),
# (Result.PASS, Action.STOP),
# (Result.FAIL, Action.CONTINUE),
# (Result.FAIL, Action.RETRIGGER),
# (Result.FAIL, Action.STOP),
)
)
from pathlib import Path
from airflow_config import load_config
conf = load_config("config", "test")
conf.generate(Path(__file__).parent.resolve())
Generated DAG
# Generated by airflow-config
from datetime import datetime, time
from airflow.models import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow_ha.operator import HighAvailabilitySensor
from validation_dags.test_ha_foo import _choose, _pre
with DAG(
description="Test HA operator",
schedule="0 12 * * *",
start_date=datetime.fromisoformat("2025-01-01T00:00:00-04:56"),
max_active_runs=1,
catchup=False,
tags=["test", "utility"],
dag_id="test-high-availability",
default_args={
"owner": "laminar",
"email": ["dev@paine.nyc"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"depends_on_past": False,
},
) as dag:
pre = PythonOperator(python_callable=_pre, task_id="pre", dag=dag)
ha = HighAvailabilitySensor(
poke_interval=10.0,
timeout=1000.0,
python_callable=_choose,
runtime=120,
endtime=time(23, 0, 0, 0),
maxretrigger=2,
task_id="ha",
dag=dag,
)
retrigger_fail = PythonOperator(
python_callable=_pre, task_id="retrigger-fail", dag=dag
)
pre >> ha
ha >> retrigger_fail
Example 4 - Supervisor / Balancer¶
# @package _global_
_target_: airflow_config.Configuration
default_task_args:
_target_: airflow_config.TaskArgs
owner: laminar
email: [dev@paine.nyc]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false
default_dag_args:
_target_: airflow_config.DagArgs
start_date: ["2025-01-01", "America/New_York"]
catchup: false
max_active_runs: 1
dags:
test-supervisor:
description: "Test supervisor"
tags: ["test", "utility"]
schedule: "0 0 * * *"
tasks:
supervisor:
_target_: airflow_supervisor.SupervisorSSHTask
cfg:
_target_: airflow_supervisor.SupervisorSSHAirflowConfiguration
check_interval: 10
check_timeout: 60
cleanup: true
endtime: 23:00:00
maxretrigger: 5
restart_on_initial: true
restart_on_retrigger: false
runtime: 360
ssh_operator_args:
cmd_timeout: 63
stop_on_exit: true
working_dir: "/tmp/supervisor"
program:
test:
_target_: airflow_supervisor.ProgramConfiguration
command: bash -c "sleep 3600; exit 0"
host:
_target_: airflow_balancer.HostQuery
balancer: ${extensions.balancer}
queue: primary
port:
_target_: airflow_balancer.PortQuery
balancer: ${extensions.balancer}
name: test-supervisor-port
extensions:
balancer:
_target_: airflow_balancer.BalancerConfiguration
default_key_file: /home/airflow/.ssh/id_rsa
hosts:
- name: myhost
os: ubuntu
size: 32
queues: [primary]
tags: []
ports:
- name: test-supervisor-port
host_name: nuc2
port: 9091
tags: [supervisor]
from pathlib import Path
from airflow_config import load_config
conf = load_config("config", "test")
conf.generate(Path(__file__).parent.resolve())
Generated DAG
# Generated by airflow-config
from datetime import datetime, time, timedelta
from pathlib import Path
from airflow.models import DAG
from airflow.models.pool import Pool
from airflow_pydantic import Host, Port
from airflow_supervisor.airflow.ssh import SupervisorSSH
with DAG(
description="Test supervisor",
schedule="0 0 * * *",
start_date=datetime.fromisoformat("2025-01-01T00:00:00-04:56"),
max_active_runs=1,
catchup=False,
tags=["test", "utility"],
dag_id="test-supervisor",
default_args={
"owner": "laminar",
"email": ["dev@paine.nyc"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"depends_on_past": False,
},
) as dag:
supervisor = SupervisorSSH(
cfg={
"inet_http_server": {"port": "*:9001", "username": None, "password": None},
"program": {
"test": {
"command": 'bash -c "sleep 3600; exit 0"',
"autostart": False,
"startsecs": 1,
"startretries": None,
"autorestart": False,
"exitcodes": [0],
"stopsignal": "TERM",
"stopwaitsecs": 30,
"stopasgroup": True,
"killasgroup": True,
"stdout_logfile": Path("/tmp/supervisor/test/output.log"),
"stderr_logfile": Path("/tmp/supervisor/test/error.log"),
"directory": Path("/tmp/supervisor/test"),
}
},
"rpcinterface": {
"supervisor": {
"rpcinterface_factory": "supervisor.rpcinterface:make_main_rpcinterface"
}
},
"config_path": Path("/tmp/supervisor/supervisord.conf"),
"working_dir": Path("/tmp/supervisor"),
"check_interval": timedelta(10.0),
"check_timeout": timedelta(60.0),
"runtime": timedelta(360.0),
"endtime": time(23, 0, 0, 0),
"maxretrigger": 5,
"stop_on_exit": True,
"cleanup": True,
"restart_on_initial": True,
"restart_on_retrigger": False,
"ssh_operator_args": {"cmd_timeout": 63},
},
host=Host(
name="myhost",
key_file="/home/airflow/.ssh/id_rsa",
os="ubuntu",
pool=Pool.create_or_update_pool(
name="myhost",
slots=32,
description="Balancer pool for host(myhost)",
include_deferred=False,
).pool,
size=32,
queues=["primary"],
tags=[],
),
port=Port(
name="test-supervisor-port",
host=Host(
name="myhost",
key_file="/home/airflow/.ssh/id_rsa",
os="ubuntu",
pool=Pool.create_or_update_pool(
name="myhost",
slots=32,
description="Balancer pool for host(myhost)",
include_deferred=False,
).pool,
size=32,
queues=["primary"],
tags=[],
),
host_name="myhost",
port=9091,
tags=["supervisor"],
),
task_id="supervisor",
dag=dag,
)