import os
from getpass import getuser
from logging import getLogger
from pathlib import Path
from shutil import rmtree
from signal import SIGKILL, SIGTERM
from subprocess import Popen
from tempfile import gettempdir
from typing import Dict, Optional
from hydra import compose, initialize_config_dir
from hydra.utils import instantiate
from pydantic import BaseModel, Field, PrivateAttr, model_validator
from ..exceptions import ConfigNotFoundError
from ..utils import _get_calling_file
from .convenience import ConvenienceConfiguration
from .eventlistener import EventListenerConfiguration
from .fcgiprogram import FcgiProgramConfiguration
from .group import GroupConfiguration
from .include import IncludeConfiguration
from .inet_http_server import InetHttpServerConfiguration
from .program import ProgramConfiguration
from .rpcinterface import RpcInterfaceConfiguration
from .supervisorctl import SupervisorctlConfiguration
from .supervisord import SupervisordConfiguration
from .unix_http_server import UnixHttpServerConfiguration
__all__ = (
"SupervisorConfiguration",
"SupervisorConvenienceConfiguration",
"load_config",
"load_convenience_config",
)
_log = getLogger(__name__)
[docs]
class SupervisorConfiguration(BaseModel):
[docs]
def to_cfg(self) -> str:
ret = ""
if self.unix_http_server:
ret += self.unix_http_server.to_cfg() + "\n"
if self.inet_http_server:
ret += self.inet_http_server.to_cfg() + "\n"
if self.supervisord:
ret += self.supervisord.to_cfg() + "\n"
if self.supervisorctl:
ret += self.supervisorctl.to_cfg() + "\n"
if self.include:
ret += self.include.to_cfg() + "\n"
for k, v in self.program.items():
ret += v.to_cfg(key=k) + "\n"
for k, v in (self.group or {}).items():
ret += v.to_cfg(key=k) + "\n"
for k, v in (self.fcgiprogram or {}).items():
ret += v.to_cfg(key=k) + "\n"
for k, v in (self.eventlistener or {}).items():
ret += v.to_cfg(key=k) + "\n"
for k, v in (self.rpcinterface or {}).items():
ret += v.to_cfg(key=k) + "\n"
return ret
# supervisor setup
unix_http_server: Optional[UnixHttpServerConfiguration] = Field(default=None)
inet_http_server: Optional[InetHttpServerConfiguration] = Field(default=None)
supervisord: SupervisordConfiguration = Field(default=SupervisordConfiguration())
supervisorctl: SupervisorctlConfiguration = Field(default=SupervisorctlConfiguration())
include: Optional[IncludeConfiguration] = Field(default=None)
program: Dict[str, ProgramConfiguration]
group: Optional[Dict[str, GroupConfiguration]] = Field(default=None)
fcgiprogram: Optional[Dict[str, FcgiProgramConfiguration]] = Field(default=None)
eventlistener: Optional[Dict[str, EventListenerConfiguration]] = Field(default=None)
rpcinterface: Optional[Dict[str, RpcInterfaceConfiguration]] = Field(default=None)
# other configuration
config_path: Optional[Path] = Field(default="supervisor.cfg", description="Path to supervisor configuration file, relative to `working_dir`")
working_dir: Optional[Path] = Field(default="", description="Path to supervisor working directory")
@model_validator(mode="after")
def _setup_config_and_working_dir(self):
if self.working_dir is None or self.working_dir == "":
if self.supervisord.directory:
# use this as the dir
self.working_dir = self.supervisord.directory
else:
self.working_dir = Path(gettempdir()).resolve() / f"supervisor-{getuser()}-{'-'.join(list(self.program.keys()))}"
self.supervisord.directory = self.working_dir
if not Path(self.config_path).exists() or str(self.working_dir) not in str(self.config_path):
self.config_path = (self.working_dir / self.config_path).resolve()
# force pidfile to be in working dir if not otherwise set
if not self.supervisord.pidfile:
self.supervisord.pidfile = self.working_dir / "supervisord.pid"
# force logfile to be in working dir if not otherwise set
if not self.supervisord.logfile:
self.supervisord.logfile = self.working_dir / "supervisord.log"
for name, program_config in self.program.items():
if program_config.directory is None:
program_config.directory = self.working_dir / name
return self
@classmethod
def _find_parent_config_folder(cls, config_dir: str = "config", config_name: str = "", *, basepath: str = "", _offset: int = 2):
if basepath:
if basepath.endswith((".py", ".cfg", ".yml", ".yaml")):
calling_dag = Path(basepath)
else:
calling_dag = Path(basepath) / "dummy.py"
else:
calling_dag = Path(_get_calling_file(offset=_offset))
folder = calling_dag.parent.resolve()
exists = (
(folder / config_dir).exists()
if not config_name
else ((folder / config_dir / f"{config_name}.yml").exists() or (folder / config_dir / f"{config_name}.yaml").exists())
)
while not exists:
folder = folder.parent
if str(folder) == os.path.abspath(os.sep):
raise ConfigNotFoundError(config_dir=config_dir, dagfile=calling_dag)
exists = (
(folder / config_dir).exists()
if not config_name
else ((folder / config_dir / f"{config_name}.yml").exists() or (folder / config_dir / f"{config_name}.yaml").exists())
)
config_dir = (folder / config_dir).resolve()
if not config_name:
return folder.resolve(), config_dir, ""
elif (folder / config_dir / f"{config_name}.yml").exists():
return folder.resolve(), config_dir, (folder / config_dir / f"{config_name}.yml").resolve()
return folder.resolve(), config_dir, (folder / config_dir / f"{config_name}.yaml").resolve()
[docs]
@classmethod
def load(
cls: "SupervisorConfiguration",
config_dir: str = "config",
config_name: str = "",
overrides: Optional[list[str]] = None,
*,
basepath: str = "",
_offset: int = 3,
) -> "SupervisorConfiguration":
overrides = overrides or []
with initialize_config_dir(config_dir=str(Path(__file__).resolve().parent / "hydra"), version_base=None):
if config_dir:
hydra_folder, config_dir, _ = cls._find_parent_config_folder(
config_dir=config_dir, config_name=config_name, basepath=basepath, _offset=_offset
)
cfg = compose(config_name="base", overrides=[], return_hydra_config=True)
searchpaths = cfg["hydra"]["searchpath"]
searchpaths.extend([hydra_folder, config_dir])
if config_name:
overrides = [
f"+config={config_name}",
*overrides.copy(),
f"hydra.searchpath=[{','.join(searchpaths)}]",
]
else:
overrides = [*overrides.copy(), f"hydra.searchpath=[{','.join(searchpaths)}]"]
cfg = compose(config_name="base", overrides=overrides)
config = instantiate(cfg)
if not isinstance(config, cls):
if issubclass(cls, type(config)):
config = config.model_dump()
config = cls(**config)
return config
[docs]
def write(self):
_log.info(f"Making working dir: {self.working_dir}")
self.working_dir.mkdir(parents=True, exist_ok=True)
for program_name, program_config in self.program.items():
if program_config.directory:
_log.info(f"Making program dir ({program_name}): {program_config.directory}")
program_config.directory.mkdir(exist_ok=True)
_log.info(f"Writing config file: {self.config_path}")
self.config_path.write_text(self.to_cfg())
[docs]
def rmdir(self):
if not self.running():
_log.info(f"Removing working dir: {self.working_dir}")
rmtree(self.working_dir)
[docs]
def start(self, daemon: bool = False):
_log.info(f"Starting supervisord: {self.config_path}")
if not self.running():
if daemon is False:
Popen(f"supervisord -n -c {str(self.config_path)}", shell=True)
return
Popen(f"supervisord -c {str(self.config_path)}", close_fds=True, shell=True)
[docs]
def running(self):
# grab the pidfile, find the process with the pid, and kill
_log.info(f"Checking supervisord: {self.supervisord.pidfile}")
if not self.supervisord.pidfile.exists():
return False
try:
os.kill(int(self.supervisord.pidfile.read_text()), 0)
except OSError:
return False
return True
[docs]
def stop(self):
if self.running():
# grab the pidfile, find the process with the pid, and kill with SIGTERM
_log.info(f"Stopping supervisord: {self.supervisord.pidfile}")
os.kill(int(self.supervisord.pidfile.read_text()), SIGTERM)
[docs]
def kill(self):
if self.running():
# grab the pidfile, find the process with the pid, and kill with SIGKILL
_log.info(f"Killing supervisord: {self.supervisord.pidfile}")
os.kill(int(self.supervisord.pidfile.read_text()), SIGKILL)
[docs]
class SupervisorConvenienceConfiguration(SupervisorConfiguration):
convenience: ConvenienceConfiguration = Field(
default_factory=ConvenienceConfiguration, description="Required configurations for convenience integration"
)
_pydantic_path: Path = PrivateAttr(default="pydantic.json")
def _write_self(self):
# TODO make config driven
self.write()
_log.info(f"Writing model json: {self._pydantic_path}")
Path(self._pydantic_path).write_text(self.model_dump_json())
@model_validator(mode="after")
def _setup_convenience_defaults(self):
"""Method to overload configuration with values needed for the setup
of convenience tasks that we construct"""
# inet_http_server
if not self.inet_http_server:
self.inet_http_server = InetHttpServerConfiguration()
self.inet_http_server.port = self.convenience.port
self.inet_http_server.username = self.convenience.username
self.inet_http_server.password = self.convenience.password
self.supervisorctl.serverurl = f"{self.convenience.protocol}://{self.convenience.host}:{self.convenience.port.split(':')[-1]}/"
# rpcinterface
if not self.rpcinterface:
self.rpcinterface = {"supervisor": RpcInterfaceConfiguration()}
self.rpcinterface["supervisor"].rpcinterface_factory = self.convenience.rpcinterface_factory
# supervisord
self.supervisord.nodaemon = False
self.supervisord.identifier = "supervisor"
# programs
for config in self.program.values():
config.autostart = False
config.autorestart = False
config.startsecs = self.convenience.startsecs
config.startretries = self.convenience.startretries
config.exitcodes = self.convenience.exitcodes
config.stopsignal = self.convenience.stopsignal
config.stopwaitsecs = self.convenience.stopwaitsecs
# other
if str(self.working_dir) not in str(self._pydantic_path):
self._pydantic_path = self.working_dir / "pydantic.json"
return self
load_config = SupervisorConfiguration.load
load_convenience_config = SupervisorConvenienceConfiguration.load