"""Add domain servers to nginx.conf executable script.""" import argparse from dataclasses import dataclass, field import json import sys REDIRECT_TEMPLATE = "\n".join( ( "\tlocation / {{", "\t\tresolver 127.0.0.11 valid=30s;", "\t\tset $host_{i} {redirection_host};", "\t\tproxy_pass $host_{i}/;", "\t\tinclude proxy_common.conf;", "\t}}", ) ) SERVER_HTTPS_TEMPLATE = "\n".join( ( "server {{", "\tinclude server_common.conf;", "\tlisten 443 ssl;", "\t", "\tserver_name {server_name};", "\tssl_certificate {certificates_path}/{certificate}/fullchain.pem;", "\tssl_certificate_key {certificates_path}/{certificate}/privkey.pem;", "\t", "{options}", "{redirection}", "}}", ) ) SERVER_HTTP_TEMPLATE = "\n".join( ( "server {{", "\tinclude server_common.conf;", "\t", "\tserver_name {server_name};", "{options}", "{redirection}", "}}", ) ) @dataclass class Server: """Server entry for nginx.conf file.""" server_name: str redirect: str | None = None certificate: str | None = None options: list[str] = field(default_factory=list) certificates_path: str = "/etc/letsencrypt/live" def __post_init__(self) -> None: if self.options is None: self.options = [] def format(self, i: int, indent: str = " ", base_indent: int = 1) -> str: """Format server to place inside nginx.conf""" res = (indent * base_indent) + ( SERVER_HTTPS_TEMPLATE if self.certificate is not None else SERVER_HTTP_TEMPLATE ).replace("\n", "\n" + indent * base_indent).replace("\t", indent).format( server_name=self.server_name, certificate=self.certificate, redirection=( REDIRECT_TEMPLATE.replace("\n", "\n" + indent * base_indent) .replace("\t", indent) .format(i=i, redirection_host=self.redirect) if self.redirect is not None else "" ), options=( ("\n" + indent * (base_indent + 1)) + ("\n" + indent * (base_indent + 1)).join( ("\n" + indent * (base_indent + 1)).join(option.split("\n")) for option in self.options ) ), certificates_path=self.certificates_path, ) res = "\n".join(line for line in res.split("\n") if line.strip() != "") return res @dataclass class CLIParams: """add_servers CLI parameters""" nginx: str domains_list_txt: str servers_config_json: str certificates_path: str http_only: bool dry_run: bool def main() -> None: """Parse arguments and add domains to nginx.conf""" parser = argparse.ArgumentParser("add-servers", description="Add domain servers to a given nginx.conf file") parser.add_argument("--nginx", "-f", required=True, help="Path to nginx.conf file to edit inplace") parser.add_argument( "--domains_list_txt", "-d", required=True, help="Path to domains list with ssl certificates ", ) parser.add_argument( "--servers_config_json", "-s", required=False, default=None, help='Path to domains json {"domain": {"redirect": "redirection_path", "options": []}};', ) parser.add_argument( "--certificates_path", "-c", required=False, default="/etc/letsencrypt/live", help="Path to a directory containing certificates", ) parser.add_argument( "--http-only", action="store_true", help="Remove certificates usage from servers section", ) parser.add_argument( "--dry-run", action="store_true", help="Print servers section and quit withot making changes", ) args: CLIParams = parser.parse_args() replacement_substring = "# " with open(args.nginx, "r", encoding="utf-8") as file: nginx_config = file.read() if nginx_config.count(replacement_substring) != 1: print( f"Error. File must contain exactly one '{replacement_substring}' substring to replace with servers. Exiting" ) sys.exit(1) if args.domains_list_txt is not None: with open(args.domains_list_txt, "r", encoding="utf-8") as file: domains_certs_list = [ domain.strip() for domain in file.readlines() if domain.strip() != "" and not domain.startswith("#") ] else: domains_certs_list = [] nginx_servers: list[Server] = [] if args.servers_config_json is not None: with open(args.servers_config_json, "r", encoding="utf-8") as file: for server_name, params in json.load(file).items(): server_name: str params: dict[str, str] nginx_servers.append( Server( (server_name if "*" not in server_name else f"{server_name} {server_name.replace('*.', '')}"), params.get("redirect"), ( None if args.http_only else server_name.replace("*.", "") if server_name in domains_certs_list else server_name[server_name.find(".") + 1 :] if "." in server_name and f"*.{server_name[server_name.find('.') + 1:]}" in domains_certs_list else None ), params.get("options"), args.certificates_path, ) ) for domain in domains_certs_list: if not any( ( domain == server.server_name or ( domain == f"*.{server.server_name[server.server_name.find('.') + 1:]}" if "." in server.server_name else False ) ) for server in nginx_servers ): nginx_servers.append( Server(domain, None, domain if not args.http_only else None, certificates_path=args.certificates_path) ) nginx_servers_part = "\n\n".join(server.format(i) for i, server in enumerate(nginx_servers)) print(f"Using following servers part for nginx.conf:\n\n{nginx_servers_part}") if not args.dry_run: config_backup_filename = f"{args.nginx}.bak" print(f"Backing up old nginx config {args.nginx} as {config_backup_filename}") with open(config_backup_filename, "w", encoding="utf-8") as file: file.write(nginx_config) with open(args.nginx, "w", encoding="utf-8") as file: file.write(nginx_config.replace(replacement_substring, nginx_servers_part.lstrip())) if __name__ == "__main__": main()