Source code for airflow_config.dag
from functools import singledispatch
from inspect import currentframe
from typing import Any, Optional
from airflow.models.dag import DAG as BaseDag
from .configuration.base import Configuration
from .utils import generate_dag_id
__all__ = (
"generate_dag_id",
"create_dag",
"create_dags",
"DAG",
)
[docs]
class DAG(BaseDag):
[docs]
def __init__(self, config: Configuration = None, **kwargs):
if config:
config.pre_apply(self, kwargs)
super().__init__(**kwargs)
if config:
config.apply(self, kwargs)
[docs]
@singledispatch
def create_dag(arg, **kwargs: Any) -> DAG:
raise NotImplementedError()
[docs]
@singledispatch
def create_dags(arg, **kwargs: Any) -> DAG:
raise NotImplementedError()
@create_dag.register
def _create_dag_config(config: Configuration, dag_id: str = "", _offset: int = 3) -> DAG:
dag_id = dag_id or generate_dag_id(offset=_offset + 1)
dag = DAG(config=config, dag_id=dag_id)
cur_frame = currentframe()
for _ in range(_offset - 1):
if hasattr(cur_frame, "f_back") and cur_frame.f_back and hasattr(cur_frame.f_back, "f_globals"):
cur_frame = cur_frame.f_back
else:
break
cur_frame.f_globals[dag_id] = dag
return dag
@create_dag.register
def _create_dag_dir(config_dir: str = "config", config_name: str = "", overrides: Optional[list[str]] = None, dag_id: str = "", **kwargs: Any) -> DAG:
dag_id = dag_id or generate_dag_id(offset=4)
config = Configuration.load(config_dir=config_dir, config_name=config_name, overrides=overrides, _offset=5)
return create_dag(config, dag_id=dag_id, _offset=5, **kwargs)
@create_dags.register
def _create_dags_config(configs: list, dag_ids: list[str] = None, **kwargs: Any) -> list[DAG]:
ret = []
dag_ids = dag_ids or []
for i, config in enumerate(configs):
if i < len(dag_ids):
dag_id = dag_ids[i]
else:
dag_id = ""
ret.append(create_dag(config, dag_id=dag_id, _offset=5, **kwargs))
return ret
@create_dags.register
def _create_dags_dirs(
config_dir: str = "config",
config_names: list[list[str]] = None,
overrides: Optional[list[str]] = None,
dag_id_base: str = "",
**kwargs: Any,
) -> list[DAG]:
ret = []
dag_id_base = dag_id_base or generate_dag_id(offset=4)
for config_name in config_names:
config = Configuration.load(config_dir=config_dir, config_name=config_name, overrides=overrides, _offset=5)
ret.append(create_dag(config, dag_id=f"{dag_id_base}-{config_name}", _offset=5, **kwargs))
return ret