[docs]classHost(BaseModel):name:strusername:Optional[str]=None# Passwordpassword:Optional[str]=None# If password is stored in a variablepassword_variable:Optional[str]=None# if stored in structured container, access by keypassword_variable_key:Optional[str]=None# Or get key filekey_file:Optional[str]=Noneos:Optional[str]=None# Airflow / balancepool:Optional[str]=Nonesize:Optional[int]=Nonequeues:List[str]=Field(default_factory=list)tags:List[str]=Field(default_factory=list)@propertydefhook(self,use_local:bool=True)->SSHHook:ifuse_localandnotself.name.count(".")>0:name=f"{self.name}.local"ifself.usernameandself.password:returnSSHHook(remote_host=name,username=self.username,password=self.password)elifself.usernameandself.password_variable:raiseNotImplementedError()elifself.usernameandself.key_file:returnSSHHook(remote_host=name,username=self.username,key_file=self.key_file)elifself.username:returnSSHHook(remote_host=name,username=self.username)else:returnSSHHook(remote_host=name)def__lt__(self,other):returnself.name<other.namedef__hash__(self):returnhash(self.name)
[docs]classBalancerConfiguration(BaseModel):hosts:List[Host]=Field(default_factory=list)default_username:str="airflow"# Passworddefault_password:Optional[str]=None# If password is stored in a variabledefault_password_variable:Optional[str]=None# if stored in structured container, access by keydefault_password_variable_key:Optional[str]=None# Or get key filedefault_key_file:Optional[str]=None# The queue that might include the host running airflow itselfprimary_queue:str="default"# The queue that does not include the host running airflow itselfsecondary_queue:str="default"# The default worker queuedefault_queue:str="default"# The default pool sizedefault_size:int=Field(default=10)# rewrite pool size from config if differs from airflow variable stored valueoverride_pool_size:bool=False# create connection object in airflow for hostcreate_connection:bool=False@propertydefall_hosts(self):returnsorted(list(set(self.hosts)))@model_validator(mode="after")def_sync_limits(self)->Self:forhostinself.hosts:ifnothost.pool:host.pool=host.nameifnothost.size:host.size=self.default_size# check airflow firsttry:Pool.get_pool(host.pool)exceptPoolNotFound:# else set to defaultPool.create_or_update_pool(name=host.pool,slots=host.size,description=f"Balancer pool for host {host.name} / {host.pool}")ifnothost.usernameandself.default_username:host.username=self.default_usernameifnothost.passwordandself.default_password:host.password=self.default_passwordifnothost.password_variableandself.default_password_variable:host.password_variable=self.default_password_variableifnothost.password_variable_keyandself.default_password_variable_key:host.password_variable_key=self.default_password_variable_keyifnothost.key_fileandself.default_key_file:host.key_file=self.default_key_fileifnothost.size:host.size=self.default_size