Source code for airflow_pydantic.sensors.bash
from logging import getLogger
from typing import Any, Dict, List, Optional, Type, Union
from pydantic import Field, field_validator
from ..core import Task, TaskArgs
from ..utils import BashCommands, ImportPath
__all__ = (
"BashSensorArgs",
"BashSensor",
)
_log = getLogger(__name__)
class BashSensorArgs(TaskArgs):
# bash sensor args
# https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/sensors/bash/index.html#airflow.providers.standard.sensors.bash.BashSensor
bash_command: Union[str, List[str], BashCommands] = Field(default=None, description="bash command string, list of strings, or model")
env: Optional[Dict[str, str]] = Field(default=None)
output_encoding: Optional[str] = Field(default=None, description="Output encoding for the command, default is 'utf-8'")
retry_exit_code: Optional[bool] = Field(default=None)
@field_validator("bash_command")
@classmethod
def validate_bash_command(cls, v: Any) -> Any:
if isinstance(v, str):
return v
elif isinstance(v, list) and all(isinstance(item, str) for item in v):
return BashCommands(commands=v)
elif isinstance(v, BashCommands):
return v
else:
raise ValueError("bash_command must be a string, list of strings, or a BashCommands model")
[docs]
class BashSensor(Task, BashSensorArgs):
operator: ImportPath = Field(default="airflow_pydantic.airflow.BashSensor", description="airflow sensor path", validate_default=True)
[docs]
@field_validator("operator")
@classmethod
def validate_operator(cls, v: Type) -> Type:
from airflow_pydantic.airflow import BashSensor, _AirflowPydanticMarker
if not isinstance(v, Type):
raise ValueError(f"operator must be 'airflow.providers.standard.sensors.bash.BashSensor', got: {v}")
if issubclass(v, _AirflowPydanticMarker):
_log.info("BashOperator is a marker class, returning as is")
return v
if not issubclass(v, BashSensor):
raise ValueError(f"operator must be 'airflow.providers.standard.sensors.bash.BashSensor', got: {v}")
return v