Source code for airflow_pydantic.sensors.timedelta

from datetime import timedelta
from logging import getLogger
from typing import Optional, Type, Union

from pydantic import Field, field_validator

from ..core import Task
from ..utils import ImportPath
from .base import BaseSensorArgs

__all__ = (
    "TimeDeltaSensorArgs",
    "TimeDeltaSensor",
    "WaitSensorArgs",
    "WaitSensor",
)

_log = getLogger(__name__)


class TimeDeltaSensorArgs(BaseSensorArgs):
    # timedelta sensor args
    # https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/time_delta/index.html#airflow.providers.standard.sensors.time_delta.TimeDeltaSensor
    delta: timedelta = Field(description="Time to wait before succeeding.")
    deferrable: Optional[bool] = Field(default=None, description="If True, the sensor will operate in deferrable mode")


class TimeDeltaSensor(Task, TimeDeltaSensorArgs):
    operator: ImportPath = Field(default="airflow_pydantic.airflow.TimeDeltaSensor", description="airflow sensor path", validate_default=True)

    @field_validator("operator")
    @classmethod
    def validate_operator(cls, v: Type) -> Type:
        from airflow_pydantic.airflow import TimeDeltaSensor, _AirflowPydanticMarker

        if not isinstance(v, Type):
            raise ValueError(f"operator must be 'airflow.providers.standard.sensors.time.TimeDeltaSensor', got: {v}")
        if issubclass(v, _AirflowPydanticMarker):
            _log.info("TimeSensor is a marker class, returning as is")
            return v
        if not issubclass(v, TimeDeltaSensor):
            raise ValueError(f"operator must be 'airflow.providers.standard.sensors.time.TimeDeltaSensor', got: {v}")
        return v


class WaitSensorArgs(BaseSensorArgs):
    # wait sensor args
    # https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/time_delta/index.html#airflow.providers.standard.sensors.time_delta.WaitSensor
    time_to_wait: Union[timedelta, int] = Field(description="Time length to wait after the task starts before succeeding.")
    deferrable: Optional[bool] = Field(default=None, description="If True, the sensor will operate in deferrable mode")


[docs] class WaitSensor(Task, WaitSensorArgs): operator: ImportPath = Field(default="airflow_pydantic.airflow.WaitSensor", description="airflow sensor path", validate_default=True)
[docs] @field_validator("operator") @classmethod def validate_operator(cls, v: Type) -> Type: from airflow_pydantic.airflow import WaitSensor, _AirflowPydanticMarker if not isinstance(v, Type): raise ValueError(f"operator must be 'airflow.providers.standard.sensors.time.WaitSensor', got: {v}") if issubclass(v, _AirflowPydanticMarker): _log.info("WaitSensor is a marker class, returning as is") return v if not issubclass(v, WaitSensor): raise ValueError(f"operator must be 'airflow.providers.standard.sensors.time.WaitSensor', got: {v}") return v