Source code for airflow_pydantic.extras.balancer.host
from typing import TYPE_CHECKING, List, Optional, Union
from pydantic import Field
from ...core import BaseModel
from ...utils import Pool, Variable
if TYPE_CHECKING:
from airflow_pydantic.airflow import SSHHook
__all__ = ("Host",)
[docs]
class Host(BaseModel):
name: str
username: Optional[str] = None
# Password
password: Optional[Union[str, Variable]] = None
# Or get key file
key_file: Optional[str] = None
os: Optional[str] = None
# Airflow / balance
pool: Optional[Pool] = None
size: Optional[int] = None
queues: List[str] = Field(default_factory=list)
tags: List[str] = Field(default_factory=list)
[docs]
def override(self, **kwargs) -> "Host":
return Host(**{**self.model_dump(), **kwargs})
[docs]
def hook(self, username: str = None, use_local: bool = True, **hook_kwargs) -> "SSHHook":
from airflow_pydantic.airflow import SSHHook
if use_local and not self.name.count(".") > 0:
name = f"{self.name}.local"
else:
name = self.name
username = username or self.username
if username and self.password:
if isinstance(self.password, str):
return SSHHook(remote_host=name, username=username, password=self.password, **hook_kwargs)
password = self.password.get()
if isinstance(password, dict):
# TODO
# Assume "password"
password = password["password"]
return SSHHook(remote_host=name, username=username, password=password, **hook_kwargs)
elif username and self.key_file:
return SSHHook(remote_host=name, username=username, key_file=self.key_file, **hook_kwargs)
elif username:
return SSHHook(remote_host=name, username=username, **hook_kwargs)
else:
return SSHHook(remote_host=name, **hook_kwargs)
def __lt__(self, other):
return self.name < other.name
def __le__(self, other):
return self.name <= other.name
def __hash__(self):
return hash(self.name)