Source code for airflow_pydantic.sensors.bash

from logging import getLogger
from typing import Any, Dict, List, Optional, Type, Union

from pydantic import Field, field_validator

from ..core import Task, TaskArgs
from ..utils import BashCommands, ImportPath

__all__ = (
    "BashSensorArgs",
    "BashSensor",
)

_log = getLogger(__name__)


class BashSensorArgs(TaskArgs):
    # bash sensor args
    # https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/bash/index.html#airflow.providers.standard.sensors.bash.BashSensor
    bash_command: Union[str, List[str], BashCommands] = Field(default=None, description="bash command string, list of strings, or model")
    env: Optional[Dict[str, str]] = Field(default=None)
    output_encoding: Optional[str] = Field(default=None, description="Output encoding for the command, default is 'utf-8'")
    retry_exit_code: Optional[bool] = Field(default=None)

    @field_validator("bash_command")
    @classmethod
    def validate_bash_command(cls, v: Any) -> Any:
        if isinstance(v, str):
            return v
        elif isinstance(v, list) and all(isinstance(item, str) for item in v):
            return BashCommands(commands=v)
        elif isinstance(v, BashCommands):
            return v
        else:
            raise ValueError("bash_command must be a string, list of strings, or a BashCommands model")


[docs] class BashSensor(Task, BashSensorArgs): operator: ImportPath = Field(default="airflow_pydantic.airflow.BashSensor", description="airflow sensor path", validate_default=True)
[docs] @field_validator("operator") @classmethod def validate_operator(cls, v: Type) -> Type: from airflow_pydantic.airflow import BashSensor, _AirflowPydanticMarker if not isinstance(v, Type): raise ValueError(f"operator must be 'airflow.providers.standard.sensors.bash.BashSensor', got: {v}") if issubclass(v, _AirflowPydanticMarker): _log.info("BashOperator is a marker class, returning as is") return v if not issubclass(v, BashSensor): raise ValueError(f"operator must be 'airflow.providers.standard.sensors.bash.BashSensor', got: {v}") return v