Source code for airflow_pydantic.sensors.filesystem

from logging import getLogger
from typing import Any, Dict, Optional, Type

from pydantic import Field, field_validator

from ..core import Task
from ..utils import ImportPath
from .base import BaseSensorArgs

__all__ = (
    "FileSensorArgs",
    "FileSensor",
)

_log = getLogger(__name__)


class FileSensorArgs(BaseSensorArgs):
    # file sensor args
    # https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/filesystem/index.html#airflow.providers.standard.sensors.filesystem.FileSensor
    fs_conn_id: Optional[str] = Field(default=None, description="The connection ID to use when connecting to the filesystem")
    filepath: Optional[str] = Field(default=None, description="The file path to check for existence")
    recursive: Optional[bool] = Field(default=None, description="Whether to check for the file recursively in subdirectories")
    deferrable: Optional[bool] = Field(default=None, description="Set to True to enable deferrable mode for this operator")
    start_from_trigger: Optional[bool] = Field(
        default=None, description="If True, the sensor will start from the trigger state when used in deferrable mode"
    )
    trigger_kwargs: Optional[Dict[str, Any]] = Field(
        default=None, description="Additional keyword arguments to pass to the trigger when in deferrable mode"
    )


[docs] class FileSensor(Task, FileSensorArgs): operator: ImportPath = Field(default="airflow_pydantic.airflow.FileSensor", description="airflow sensor path", validate_default=True)
[docs] @field_validator("operator") @classmethod def validate_operator(cls, v: Type) -> Type: from airflow_pydantic.airflow import FileSensor, _AirflowPydanticMarker if not isinstance(v, Type): raise ValueError(f"operator must be 'airflow.providers.standard.sensors.filesystem.FileSensor', got: {v}") if issubclass(v, _AirflowPydanticMarker): _log.info("FileSensor is a marker class, returning as is") return v if not issubclass(v, FileSensor): raise ValueError(f"operator must be 'airflow.providers.standard.sensors.filesystem.FileSensor', got: {v}") return v