Source code for airflow_ha.common
from collections.abc import Callable
from datetime import time, timedelta
from enum import Enum
from typing import Any, Literal
__all__ = (
"Action",
"CheckResult",
"Endtime",
"FailTriggerKwargs",
"MaxRetrigger",
"PassTriggerKwargs",
"PythonCallable",
"ReferenceDate",
"Result",
"Runtime",
)
[docs]
class Result(str, Enum):
PASS = "pass"
FAIL = "fail"
[docs]
class Action(str, Enum):
CONTINUE = "continue"
RETRIGGER = "retrigger"
STOP = "stop"
CheckResult = tuple[Result, Action]
PythonCallable = Callable[..., CheckResult]
PassTriggerKwargs = dict[str, Any]
FailTriggerKwargs = dict[str, Any]
Runtime = int | timedelta
Endtime = str | time
MaxRetrigger = int
ReferenceDate = Literal["start_date", "logical_date", "data_interval_end"]