update 2024-04-22
Changes: - add jinja2 template engine instead of string-replacing - fix certbot cron usage - replace json servers configuration with yaml
This commit is contained in:
@@ -1,121 +1,87 @@
|
||||
"""Add domain servers to nginx.conf executable script."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
import json
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
REDIRECT_TEMPLATE = "\n".join(
|
||||
(
|
||||
"\tlocation / {{",
|
||||
"\t\tresolver 127.0.0.11 valid=30s;",
|
||||
"\t\tset $host_{i} {redirection_host};",
|
||||
"\t\tproxy_pass $host_{i}/;",
|
||||
"\t\tinclude proxy_common.conf;",
|
||||
"\t}}",
|
||||
)
|
||||
)
|
||||
|
||||
SERVER_HTTPS_TEMPLATE = "\n".join(
|
||||
(
|
||||
"server {{",
|
||||
"\tinclude server_common.conf;",
|
||||
"\tlisten 443 ssl;",
|
||||
"\t",
|
||||
"\tserver_name {server_name};",
|
||||
"\tssl_certificate {certificates_path}/{certificate}/fullchain.pem;",
|
||||
"\tssl_certificate_key {certificates_path}/{certificate}/privkey.pem;",
|
||||
"\t",
|
||||
"{options}",
|
||||
"{redirection}",
|
||||
"}}",
|
||||
)
|
||||
)
|
||||
|
||||
SERVER_HTTP_TEMPLATE = "\n".join(
|
||||
(
|
||||
"server {{",
|
||||
"\tinclude server_common.conf;",
|
||||
"\t",
|
||||
"\tserver_name {server_name};",
|
||||
"{options}",
|
||||
"{redirection}",
|
||||
"}}",
|
||||
)
|
||||
)
|
||||
import jinja2
|
||||
import yaml
|
||||
|
||||
|
||||
@dataclass
|
||||
class Server:
|
||||
"""Server entry for nginx.conf file."""
|
||||
|
||||
server_name: str
|
||||
name: str
|
||||
all_names: str | None = None
|
||||
redirect: str | None = None
|
||||
certificate: str | None = None
|
||||
options: list[str] = field(default_factory=list)
|
||||
certificate_dir: str | None = None
|
||||
port: int = None
|
||||
ssl_port: int = None
|
||||
server_options: list[str] = field(default_factory=list)
|
||||
location_options: list[str] = field(default_factory=list)
|
||||
|
||||
certificates_path: str = "/etc/letsencrypt/live"
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.options is None:
|
||||
self.options = []
|
||||
if self.server_options is None:
|
||||
self.server_options = []
|
||||
if self.location_options is None:
|
||||
self.location_options = []
|
||||
if self.all_names is None:
|
||||
self.all_names = self.name
|
||||
if self.port is None:
|
||||
self.port = 80
|
||||
if self.ssl_port is None:
|
||||
self.ssl_port = 443
|
||||
|
||||
def format(self, i: int, indent: str = " ", base_indent: int = 1) -> str:
|
||||
"""Format server to place inside nginx.conf"""
|
||||
res = (indent * base_indent) + (
|
||||
SERVER_HTTPS_TEMPLATE if self.certificate is not None else SERVER_HTTP_TEMPLATE
|
||||
).replace("\n", "\n" + indent * base_indent).replace("\t", indent).format(
|
||||
server_name=self.server_name,
|
||||
certificate=self.certificate,
|
||||
redirection=(
|
||||
REDIRECT_TEMPLATE.replace("\n", "\n" + indent * base_indent)
|
||||
.replace("\t", indent)
|
||||
.format(i=i, redirection_host=self.redirect)
|
||||
if self.redirect is not None
|
||||
else ""
|
||||
),
|
||||
options=(
|
||||
("\n" + indent * (base_indent + 1))
|
||||
+ ("\n" + indent * (base_indent + 1)).join(
|
||||
("\n" + indent * (base_indent + 1)).join(option.split("\n")) for option in self.options
|
||||
)
|
||||
),
|
||||
certificates_path=self.certificates_path,
|
||||
)
|
||||
res = "\n".join(line for line in res.split("\n") if line.strip() != "")
|
||||
return res
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert server class to dict for nginx.conf.j2 jinja2-template"""
|
||||
return {
|
||||
"name": self.name,
|
||||
"all_names": self.all_names,
|
||||
"redirect": self.redirect,
|
||||
"server_options": self.server_options,
|
||||
"location_options": self.location_options,
|
||||
"certificate_dir": self.certificate_dir,
|
||||
"port": self.port,
|
||||
"ssl_port": self.ssl_port,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class CLIParams:
|
||||
"""add_servers CLI parameters"""
|
||||
|
||||
nginx: str
|
||||
nginx_template: str
|
||||
domains_list_txt: str
|
||||
servers_config_json: str
|
||||
servers_config: str
|
||||
certificates_path: str
|
||||
http_only: bool
|
||||
dry_run: bool
|
||||
output: str | None
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Parse arguments and add domains to nginx.conf"""
|
||||
parser = argparse.ArgumentParser("add-servers", description="Add domain servers to a given nginx.conf file")
|
||||
parser.add_argument("--nginx", "-f", required=True, help="Path to nginx.conf file to edit inplace")
|
||||
parser.add_argument("--nginx-template", "-f", required=True, help="Path to nginx.conf.j2 template file")
|
||||
parser.add_argument(
|
||||
"--domains_list_txt",
|
||||
"-d",
|
||||
required=True,
|
||||
help="Path to domains list with ssl certificates ",
|
||||
help="Path to file with domains which have ssl certificates",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--servers_config_json",
|
||||
"--servers-config",
|
||||
"-s",
|
||||
required=False,
|
||||
default=None,
|
||||
help='Path to domains json {"domain": {"redirect": "redirection_path", "options": []}};',
|
||||
help="Path to servers configuration yaml file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--certificates_path",
|
||||
"--certificates-path",
|
||||
"-c",
|
||||
required=False,
|
||||
default="/etc/letsencrypt/live",
|
||||
@@ -126,85 +92,77 @@ def main() -> None:
|
||||
action="store_true",
|
||||
help="Remove certificates usage from servers section",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print servers section and quit withot making changes",
|
||||
)
|
||||
parser.add_argument("--output", "-o", help="Path to nginx.conf output file")
|
||||
|
||||
args: CLIParams = parser.parse_args()
|
||||
|
||||
replacement_substring = "# <place_for_servers>"
|
||||
|
||||
with open(args.nginx, "r", encoding="utf-8") as file:
|
||||
nginx_config = file.read()
|
||||
if nginx_config.count(replacement_substring) != 1:
|
||||
print(
|
||||
f"Error. File must contain exactly one '{replacement_substring}' substring to replace with servers. Exiting"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
if args.domains_list_txt is not None:
|
||||
with open(args.domains_list_txt, "r", encoding="utf-8") as file:
|
||||
domains_certs_list = [
|
||||
domains_with_certs = [
|
||||
domain.strip() for domain in file.readlines() if domain.strip() != "" and not domain.startswith("#")
|
||||
]
|
||||
else:
|
||||
domains_certs_list = []
|
||||
domains_with_certs = []
|
||||
|
||||
nginx_servers: list[Server] = []
|
||||
if args.servers_config_json is not None:
|
||||
with open(args.servers_config_json, "r", encoding="utf-8") as file:
|
||||
for server_name, params in json.load(file).items():
|
||||
server_name: str
|
||||
params: dict[str, str]
|
||||
nginx_servers.append(
|
||||
Server(
|
||||
(server_name if "*" not in server_name else f"{server_name} {server_name.replace('*.', '')}"),
|
||||
params.get("redirect"),
|
||||
(
|
||||
None
|
||||
if args.http_only
|
||||
else server_name.replace("*.", "")
|
||||
if server_name in domains_certs_list
|
||||
else server_name[server_name.find(".") + 1 :]
|
||||
if "." in server_name
|
||||
and f"*.{server_name[server_name.find('.') + 1:]}" in domains_certs_list
|
||||
else None
|
||||
),
|
||||
params.get("options"),
|
||||
args.certificates_path,
|
||||
)
|
||||
if args.servers_config is not None:
|
||||
with open(args.servers_config, "r", encoding="utf-8") as file:
|
||||
data: dict = yaml.safe_load(file)
|
||||
resolver: str = data.get("resolver", "127.0.0.1")
|
||||
acme_challenge_location: str | None = data.get("acme_challenge_location")
|
||||
servers: dict[str, dict[str, Any]] = data["servers"]
|
||||
|
||||
for server_name, params in servers.items():
|
||||
nginx_servers.append(
|
||||
Server(
|
||||
name=(server_name if "*" not in server_name else f"{server_name} {server_name.replace('*.', '')}"),
|
||||
all_names=params.get("all_names"),
|
||||
redirect=params.get("redirect"),
|
||||
certificate_dir=_get_certificate_path(
|
||||
args.http_only, domains_with_certs, args.certificates_path, server_name
|
||||
),
|
||||
port=params.get("port"),
|
||||
ssl_port=params.get("ssl_port"),
|
||||
server_options=params.get("server_options"),
|
||||
location_options=params.get("location_options"),
|
||||
)
|
||||
for domain in domains_certs_list:
|
||||
)
|
||||
for domain in domains_with_certs:
|
||||
if not any(
|
||||
(
|
||||
domain == server.server_name
|
||||
or (
|
||||
domain == f"*.{server.server_name[server.server_name.find('.') + 1:]}"
|
||||
if "." in server.server_name
|
||||
else False
|
||||
)
|
||||
domain == server.name
|
||||
or (domain == f"*.{server.name[server.name.find('.') + 1:]}" if "." in server.name else False)
|
||||
)
|
||||
for server in nginx_servers
|
||||
):
|
||||
nginx_servers.append(
|
||||
Server(domain, None, domain if not args.http_only else None, certificates_path=args.certificates_path)
|
||||
)
|
||||
nginx_servers.append(Server(name=domain))
|
||||
|
||||
nginx_servers_part = "\n\n".join(server.format(i) for i, server in enumerate(nginx_servers))
|
||||
with open(args.nginx_template, "r", encoding="utf-8") as file:
|
||||
template = jinja2.Environment().from_string(file.read())
|
||||
|
||||
print(f"Using following servers part for nginx.conf:\n\n{nginx_servers_part}")
|
||||
result = template.render(
|
||||
resolver=resolver,
|
||||
acme_challenge_location=acme_challenge_location,
|
||||
servers=[server.to_dict() for server in nginx_servers],
|
||||
)
|
||||
|
||||
if not args.dry_run:
|
||||
config_backup_filename = f"{args.nginx}.bak"
|
||||
print(f"Backing up old nginx config {args.nginx} as {config_backup_filename}")
|
||||
print(result)
|
||||
|
||||
with open(config_backup_filename, "w", encoding="utf-8") as file:
|
||||
file.write(nginx_config)
|
||||
if args.output is not None:
|
||||
with open(args.output, "w", encoding="utf-8") as file:
|
||||
file.write(result)
|
||||
|
||||
with open(args.nginx, "w", encoding="utf-8") as file:
|
||||
file.write(nginx_config.replace(replacement_substring, nginx_servers_part.lstrip()))
|
||||
|
||||
def _get_certificate_path(
|
||||
http_only: bool, domains_with_certs: list[str], base_certs_path: str, server_name: str
|
||||
) -> str | None:
|
||||
if http_only:
|
||||
return None
|
||||
if server_name in domains_with_certs:
|
||||
return os.path.join(base_certs_path, server_name.replace("*.", ""))
|
||||
if "." in server_name and f"*.{server_name[server_name.find('.') + 1:]}" in domains_with_certs:
|
||||
return os.path.join(base_certs_path, server_name[server_name.find(".") + 1 :])
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user