fromconfigparserimportConfigParserfrominspectimportcurrentframefromosimportenviron,getcwd,pathfromtypingimportcastfromuuidimportuuid4__all__=("generate_dag_id",)def_get_calling_dag(offset:int=2)->str:cur_frame=currentframe()for_inrange(offset):ifhasattr(cur_frame,"f_back")andcur_frame.f_backandhasattr(cur_frame.f_back,"f_globals"):cur_frame=cur_frame.f_backelse:breakif"__file__"notincur_frame.f_globals:# pragma: no cover# Assumed in ipython/interpreterreturnpath.join(getcwd(),"ipython.py")# pragma: no coverreturncur_frame.f_globals["__file__"]def_get_dag_root()->str:home=environ.get("AIRFLOW_HOME","")file=path.join(home,"airflow.cfg")ifhomeandpath.exists(file):config=ConfigParser()config.read(file)returnconfig["core"]["dags_folder"]return""
[docs]defgenerate_dag_id(name:str="",dag_root:str="",offset:int=2)->str:ifnotname:try:# get file of calling python file, can't use the module name as one might# have the same module in multiple folderscaller=_get_calling_dag(offset=offset)# remove python suffix, replace path with dashname=caller.replace(".py","").replace("/","-").replace("_","-")# remove root folderdag_root=dag_rootor_get_dag_root()name=name.replace(dag_root.replace("/","-").replace("_","-"),"")whilename.startswith("-"):name=name[1:]exceptException:name=cast(str,uuid4())returnname