Source code for airflow_pydantic.operators

from typing import Dict, List, Optional

from pydantic import ConfigDict, Field, field_serializer, field_validator

from .task import _TaskSpecificArgs
from .utils import CallablePath

__all__ = (
    "PythonOperatorArgs",
    "BashOperatorArgs",
    "SSHOperatorArgs",
)


[docs] class PythonOperatorArgs(_TaskSpecificArgs): # python operator args # https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/operators/python/index.html#airflow.providers.standard.operators.python.PythonOperator python_callable: Optional[CallablePath] = Field(default=None, description="python_callable") op_args: Optional[List[object]] = Field( default=None, description="a list of positional arguments that will get unpacked when calling your callable" ) op_kwargs: Optional[Dict[str, object]] = Field( default=None, description="a dictionary of keyword arguments that will get unpacked in your function" ) templates_dict: Optional[Dict[str, object]] = Field( default=None, description="a dictionary where the values are templates that will get templated by the Airflow engine sometime between __init__ and execute takes place and are made available in your callable’s context after the template has been applied. (templated)", ) templates_exts: Optional[List[str]] = Field( default=None, description="a list of file extensions to resolve while processing templated fields, for examples ['.sql', '.hql']" ) show_return_value_in_logs: Optional[bool] = Field( default=None, description="a bool value whether to show return_value logs. Defaults to True, which allows return value log output. It can be set to False", ) # generic extras model_config = ConfigDict(extra="allow")
[docs] class BashOperatorArgs(_TaskSpecificArgs): # bash operator args # https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/_api/airflow/providers/standard/operators/bash/index.html bash_command: Optional[str] = Field(default=None, description="bash_command") env: Optional[Dict[str, str]] = Field(default=None) append_env: Optional[bool] = Field(default=False) output_encoding: Optional[str] = Field(default="utf-8") skip_exit_code: Optional[bool] = Field(default=None) skip_on_exit_code: Optional[int] = Field(default=99) cwd: Optional[str] = Field(default=None) output_processor: Optional[CallablePath] = None # generic extras model_config = ConfigDict(extra="allow")
[docs] class SSHOperatorArgs(_TaskSpecificArgs): # ssh operator args # https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/_api/airflow/providers/ssh/operators/ssh/index.html ssh_hook: Optional[CallablePath] = Field( default=None, description="predefined ssh_hook to use for remote execution. Either ssh_hook or ssh_conn_id needs to be provided." ) ssh_conn_id: Optional[str] = Field( default=None, description="ssh connection id from airflow Connections. ssh_conn_id will be ignored if ssh_hook is provided." ) remote_host: Optional[str] = Field( default=None, description="remote host to connect (templated) Nullable. If provided, it will replace the remote_host which was defined in ssh_hook or predefined in the connection of ssh_conn_id.", ) command: Optional[str] = Field(default=None, description="command to execute on remote host. (templated)") conn_timeout: Optional[int] = Field( default=None, description="timeout (in seconds) for maintaining the connection. The default is 10 seconds. Nullable. If provided, it will replace the conn_timeout which was predefined in the connection of ssh_conn_id.", ) cmd_timeout: Optional[int] = Field( default=None, description="timeout (in seconds) for executing the command. The default is 10 seconds. Nullable, None means no timeout. If provided, it will replace the cmd_timeout which was predefined in the connection of ssh_conn_id.", ) environment: Optional[Dict[str, str]] = Field( default=None, description="a dict of shell environment variables. Note that the server will reject them silently if AcceptEnv is not set in SSH config. (templated)", ) get_pty: Optional[bool] = Field( default=None, description="request a pseudo-terminal from the server. Set to True to have the remote process killed upon task timeout. The default is False but note that get_pty is forced to True when the command starts with sudo.", ) banner_timeout: Optional[int] = Field(default=None, description="timeout to wait for banner from the server in seconds") skip_on_exit_code: Optional[int] = Field( default=None, description="If command exits with this exit code, leave the task in skipped state (default: None). If set to None, any non-zero exit code will be treated as a failure.", ) @field_validator("ssh_hook") @classmethod def _validate_ssh_hook(cls, v): if v: from airflow.providers.ssh.hooks.ssh import SSHHook assert isinstance(v, SSHHook) return v @field_serializer("ssh_hook") def _serialize_hook(self, ssh_hook: object): if ssh_hook is not None: return f"SSHHook(hostname={ssh_hook.hostname}, ssh_conn_id={ssh_hook.ssh_conn_id})" return None # generic extras model_config = ConfigDict(extra="allow")