Source code for airflow_ha.common
from datetime import time, timedelta
from enum import Enum
from typing import Any, Callable, Dict, Literal, Tuple, Union
__all__ = (
"Result",
"Action",
"CheckResult",
"PythonCallable",
"PassTriggerKwargs",
"FailTriggerKwargs",
"Runtime",
"Endtime",
"MaxRetrigger",
"ReferenceDate",
)
[docs]
class Result(str, Enum):
PASS = "pass"
FAIL = "fail"
[docs]
class Action(str, Enum):
CONTINUE = "continue"
RETRIGGER = "retrigger"
STOP = "stop"
CheckResult = Tuple[Result, Action]
PythonCallable = Callable[..., CheckResult]
PassTriggerKwargs = Dict[str, Any]
FailTriggerKwargs = Dict[str, Any]
Runtime = Union[int, timedelta]
Endtime = Union[str, time]
MaxRetrigger = int
ReferenceDate = Literal["start_date", "logical_date", "data_interval_end"]