Source code for airflow_supervisor.config.supervisor

from typing import List, Optional, Union

from pydantic import Field, field_serializer, field_validator
from supervisor_pydantic import SupervisorConvenienceConfiguration

from .airflow import AirflowConfiguration

__all__ = (
    "SupervisorAirflowConfiguration",
    "SupervisorSSHAirflowConfiguration",
    "load_airflow_config",
    "load_airflow_ssh_config",
)


[docs] class SupervisorAirflowConfiguration(SupervisorConvenienceConfiguration): airflow: AirflowConfiguration = Field( default_factory=AirflowConfiguration, description="Required options for airflow integration" ) stop_on_exit: bool = Field(default=True, description="Stop supervisor on dag completion") cleanup: bool = Field( default=True, description="Cleanup supervisor folder on dag completion. Note: stop_on_exit must be True" )
[docs] class SupervisorSSHAirflowConfiguration(SupervisorAirflowConfiguration): command_prefix: Optional[str] = Field(default="") # SSH Kwargs ssh_hook: Optional[object] = Field(default=None) ssh_conn_id: Optional[str] = Field(default=None) remote_host: Optional[str] = Field(default=None) conn_timeout: Optional[int] = Field(default=None) cmd_timeout: Optional[int] = Field(default=3600) environment: Optional[dict] = Field(default=None) get_pty: Optional[bool] = Field(default=None) banner_timeout: Optional[float] = Field(default=None) skip_on_exit_code: Optional[Union[int, List[int]]] = Field(default=None) @field_validator("ssh_hook") @classmethod def _validate_ssh_hook(cls, v): if v: from airflow.providers.ssh.hooks.ssh import SSHHook assert isinstance(v, SSHHook) return v @field_serializer("ssh_hook") def _serialize_hook(self, ssh_hook: object): if ssh_hook is not None: return f"SSHHook(hostname={ssh_hook.hostname}, ssh_conn_id={ssh_hook.ssh_conn_id})" return None
load_airflow_config = SupervisorAirflowConfiguration.load load_airflow_ssh_config = SupervisorSSHAirflowConfiguration.load