Source code for airflow_pydantic.extras.balancer.host

from typing import TYPE_CHECKING

from pydantic import Field

from ...core import BaseModel
from ...utils import Pool, Variable

if TYPE_CHECKING:
    from airflow_pydantic.airflow import SSHHook


__all__ = ("Host",)


[docs] class Host(BaseModel): name: str username: str | None = None # Password password: str | Variable | None = None # Or get key file key_file: str | None = None os: str | None = None # Airflow / balance pool: Pool | None = None size: int | None = None queues: list[str] = Field(default_factory=list) tags: list[str] = Field(default_factory=list)
[docs] def override(self, **kwargs) -> "Host": return Host(**{**self.model_dump(), **kwargs})
[docs] def hook(self, username: str | None = None, use_local: bool = True, **hook_kwargs) -> "SSHHook": from airflow_pydantic.airflow import SSHHook if use_local and not self.name.count(".") > 0: name = f"{self.name}.local" else: name = self.name username = username or self.username if username and self.password: if isinstance(self.password, str): return SSHHook(remote_host=name, username=username, password=self.password, **hook_kwargs) password = self.password.get() if isinstance(password, dict): # TODO # Assume "password" password = password["password"] return SSHHook(remote_host=name, username=username, password=password, **hook_kwargs) elif username and self.key_file: return SSHHook(remote_host=name, username=username, key_file=self.key_file, **hook_kwargs) elif username: return SSHHook(remote_host=name, username=username, **hook_kwargs) else: return SSHHook(remote_host=name, **hook_kwargs)
def __lt__(self, other): return self.name < other.name def __le__(self, other): return self.name <= other.name def __hash__(self): return hash(self.name)