Source code for airflow_pydantic.sensors.datetime
from logging import getLogger
from typing import Any, Dict, Optional, Type
from pydantic import Field, field_validator
from ..core import Task
from ..utils import DatetimeArg, ImportPath
from .base import BaseSensorArgs
__all__ = (
"DateTimeSensorArgs",
"DateTimeSensor",
"DateTimeSensorAsyncArgs",
"DateTimeSensorAsync",
)
_log = getLogger(__name__)
class DateTimeSensorArgs(BaseSensorArgs):
# datetime sensor args
# https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/date_time/index.html#airflow.providers.standard.sensors.date_time.DateTimeSensor
target_time: DatetimeArg = Field(description="The target date and time to wait for")
[docs]
class DateTimeSensor(Task, DateTimeSensorArgs):
operator: ImportPath = Field(default="airflow_pydantic.airflow.DateTimeSensor", description="airflow sensor path", validate_default=True)
[docs]
@field_validator("operator")
@classmethod
def validate_operator(cls, v: Type) -> Type:
from airflow_pydantic.airflow import DateTimeSensor, _AirflowPydanticMarker
if not isinstance(v, Type):
raise ValueError(f"operator must be 'airflow.providers.standard.sensors.date_time.DateTimeSensor', got: {v}")
if issubclass(v, _AirflowPydanticMarker):
_log.info("DateTimeSensor is a marker class, returning as is")
return v
if not issubclass(v, DateTimeSensor):
raise ValueError(f"operator must be 'airflow.providers.standard.sensors.date_time.DateTimeSensor', got: {v}")
return v
class DateTimeSensorAsyncArgs(BaseSensorArgs):
# datetime sensor async args
# https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/date_time/index.html#airflow.providers.standard.sensors.date_time.DateTimeSensorAsync
target_time: DatetimeArg = Field(description="The target date and time to wait for")
start_from_trigger: Optional[bool] = Field(
default=None, description="If True, the sensor will start from the trigger state when used in deferrable mode"
)
trigger_kwargs: Optional[Dict[str, Any]] = Field(
default=None, description="Additional keyword arguments to pass to the trigger when in deferrable mode"
)
end_from_trigger: Optional[bool] = Field(
default=None, description="If True, the sensor will end from the trigger state when used in deferrable mode"
)
[docs]
class DateTimeSensorAsync(Task, DateTimeSensorAsyncArgs):
operator: ImportPath = Field(default="airflow_pydantic.airflow.DateTimeSensorAsync", description="airflow sensor path", validate_default=True)
[docs]
@field_validator("operator")
@classmethod
def validate_operator(cls, v: Type) -> Type:
from airflow_pydantic.airflow import DateTimeSensorAsync, _AirflowPydanticMarker
if not isinstance(v, Type):
raise ValueError(f"operator must be 'airflow.providers.standard.sensors.date_time.DateTimeSensorAsync', got: {v}")
if issubclass(v, _AirflowPydanticMarker):
_log.info("DateTimeSensorAsync is a marker class, returning as is")
return v
if not issubclass(v, DateTimeSensorAsync):
raise ValueError(f"operator must be 'airflow.providers.standard.sensors.date_time.DateTimeSensorAsync', got: {v}")
return v