Source code for airflow_balancer.config.host

from typing import List, Optional

from airflow.models.variable import Variable
from airflow.providers.ssh.hooks.ssh import SSHHook
from pydantic import BaseModel, Field

__all__ = ("Host",)


[docs] class Host(BaseModel): name: str username: Optional[str] = None # Password password: Optional[str] = None # If password is stored in a variable password_variable: Optional[str] = None # if stored in structured container, access by key password_variable_key: Optional[str] = None # Or get key file key_file: Optional[str] = None os: Optional[str] = None # Airflow / balance pool: Optional[str] = None size: Optional[int] = None queues: List[str] = Field(default_factory=list) tags: List[str] = Field(default_factory=list)
[docs] def override(self, **kwargs) -> "Host": return Host(**{**self.model_dump(), **kwargs})
[docs] def hook(self, username: str = None, use_local: bool = True) -> SSHHook: if use_local and not self.name.count(".") > 0: name = f"{self.name}.local" else: name = self.name username = username or self.username if username and self.password: return SSHHook(remote_host=name, username=username, password=self.password) elif username and self.password_variable: if self.password_variable_key: credentials = Variable.get(self.password_variable, deserialize_json=True) password = credentials[self.password_variable_key] else: password = Variable.get(self.password_variable) return SSHHook(remote_host=name, username=username, password=password) elif username and self.key_file: return SSHHook(remote_host=name, username=username, key_file=self.key_file) elif username: return SSHHook(remote_host=name, username=username) else: return SSHHook(remote_host=name)
def __lt__(self, other): return self.name < other.name def __hash__(self): return hash(self.name)