Initial commit
Some checks failed
Run linters on applied template / Python 3.13 lint and build (push) Failing after 34s

This is a FastAPI backend microservice template used with `copier` utility.

Features of applied template are:
- Configuration file processing logic
- Metrics and tracing (both optional) configuration available
- Debug endpoints
- Database migration commands, prepared Alembic environment
- Database usage example in ping_db endpoint
- gitea sanity check pipeline
This commit is contained in:
2025-11-29 21:42:27 +03:00
commit 5dd68b7114
52 changed files with 4563 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
__all__ = [
"__version__",
]
from .version import VERSION as __version__

View File

@@ -0,0 +1,136 @@
"""Executable application entrypoint is defined here."""
import os
import tempfile
import typing as tp
from pathlib import Path
import click
import uvicorn
from dotenv import load_dotenv
from .config import {{ProjectName}}Config, LoggingConfig
LogLevel = tp.Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR"]
def _run_uvicorn(configuration: dict[str, tp.Any]) -> tp.NoReturn:
uvicorn.run(
"{{project_slug}}.fastapi_init:app",
**configuration,
)
@click.group()
def cli():
"{{project_name}} service executable script"
@cli.command("config-example")
@click.option(
"--config_path",
envvar="CONFIG_PATH",
default="config.yaml",
type=click.Path(dir_okay=False, path_type=Path),
show_default=True,
show_envvar=True,
help="Path to YAML configuration file",
)
def get_config_example(config_path: Path):
config = {{ProjectName}}Config.get_example()
config.dump(config_path)
@cli.command("launch")
@click.option(
"--port",
"-p",
envvar="PORT",
type=int,
show_envvar=True,
help="Service port number",
)
@click.option(
"--host",
envvar="HOST",
show_envvar=True,
help="Service HOST address",
)
@click.option(
"--logger_verbosity",
"-v",
type=click.Choice(("TRACE", "DEBUG", "INFO", "WARNING", "ERROR")),
envvar="LOGGER_VERBOSITY",
show_envvar=True,
help="Logger verbosity",
)
@click.option(
"--debug",
envvar="DEBUG",
is_flag=True,
help="Enable debug mode (auto-reload on change, traceback returned to user, etc.)",
)
@click.option(
"--config_path",
envvar="CONFIG_PATH",
default="config.yaml",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
show_default=True,
show_envvar=True,
help="Path to YAML configuration file",
)
def launch(
port: int,
host: str,
logger_verbosity: LogLevel,
debug: bool,
config_path: Path,
):
"""
{{project_description}}
Performs configuration via config and command line + environment variables overrides.
"""
print(
"This is a simple method to run the API."
" You might want to use 'uvicorn {{project_slug}}.fastapi_init:app' instead"
)
config = {{ProjectName}}Config.load(config_path)
config.app.host = host or config.app.host
config.app.port = port or config.app.port
config.app.debug = debug or config.app.debug
config.logging = config.logging if logger_verbosity is None else LoggingConfig(level=logger_verbosity)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_yaml_config_path = temp_file.name
config.dump(temp_yaml_config_path)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_envfile_path = temp_file.name
with open(temp_envfile_path, "w", encoding="utf-8") as env_file:
env_file.write(f"CONFIG_PATH={temp_yaml_config_path}\n")
try:
uvicorn_config = {
"host": config.app.host,
"port": config.app.port,
"log_level": config.logging.level.lower(),
"env_file": temp_envfile_path,
}
if config.app.debug:
try:
_run_uvicorn(uvicorn_config | {"reload": True})
except: # pylint: disable=bare-except
print("Debug reload is not supported and will be disabled")
_run_uvicorn(uvicorn_config)
else:
_run_uvicorn(uvicorn_config)
finally:
if os.path.exists(temp_envfile_path):
os.remove(temp_envfile_path)
if os.path.exists(temp_yaml_config_path):
os.remove(temp_yaml_config_path)
if __name__ in ("__main__", "{{project_slug}}.__main__"):
load_dotenv(os.environ.get("ENVFILE", ".env"))
cli() # pylint: disable=no-value-for-parameter

View File

@@ -0,0 +1,204 @@
"""Application configuration class is defined here."""
from collections import OrderedDict
from dataclasses import asdict, dataclass, field, fields
from pathlib import Path
from types import NoneType, UnionType
from typing import Any, Literal, TextIO, Type
import yaml
from {{project_slug}}.db.config import DBConfig, MultipleDBsConfig
from {{project_slug}}.utils.secrets import SecretStr, representSecretStrYAML
from .utils.observability import LoggingLevel
@dataclass
class CORSConfig:
allow_origins: list[str]
allow_methods: list[str]
allow_headers: list[str]
allow_credentials: bool
@dataclass
class AppConfig:
host: str
port: int
debug: bool
cors: CORSConfig
@dataclass
class FileLogger:
filename: str
level: LoggingLevel
@dataclass
class LoggingConfig:
level: LoggingLevel
files: list[FileLogger] = field(default_factory=list)
def __post_init__(self):
if len(self.files) > 0 and isinstance(self.files[0], dict):
self.files = [FileLogger(**f) for f in self.files]
@dataclass
class PrometheusConfig:
host: str
port: int
urls_mapping: dict[str, str] = field(default_factory=dict)
@dataclass
class JaegerConfig:
endpoint: str
@dataclass
class ObservabilityConfig:
prometheus: PrometheusConfig | None = None
jaeger: JaegerConfig | None = None
@dataclass
class {{ProjectName}}Config:
app: AppConfig
db: MultipleDBsConfig
logging: LoggingConfig
observability: ObservabilityConfig
def to_order_dict(self) -> OrderedDict:
"""OrderDict transformer."""
def to_ordered_dict_recursive(obj) -> OrderedDict:
"""Recursive OrderDict transformer."""
if isinstance(obj, (dict, OrderedDict)):
return OrderedDict((k, to_ordered_dict_recursive(v)) for k, v in obj.items())
if isinstance(obj, list):
return [to_ordered_dict_recursive(item) for item in obj]
if hasattr(obj, "__dataclass_fields__"):
return OrderedDict(
(field, to_ordered_dict_recursive(getattr(obj, field))) for field in obj.__dataclass_fields__
)
return obj
return OrderedDict([(section, to_ordered_dict_recursive(getattr(self, section))) for section in asdict(self)])
def dump(self, file: str | Path | TextIO) -> None:
"""Export current configuration to a file"""
class OrderedDumper(yaml.SafeDumper):
def represent_dict_preserve_order(self, data):
return self.represent_dict(data.items())
def increase_indent(self, flow=False, indentless=False):
return super().increase_indent(flow, False)
OrderedDumper.add_representer(OrderedDict, OrderedDumper.represent_dict_preserve_order)
OrderedDumper.add_representer(SecretStr, representSecretStrYAML)
if isinstance(file, (str, Path)):
with open(str(file), "w", encoding="utf-8") as file_w:
yaml.dump(self.to_order_dict(), file_w, Dumper=OrderedDumper)
else:
yaml.dump(self.to_order_dict(), file, Dumper=OrderedDumper)
@classmethod
def get_example(cls) -> "{{ProjectName}}Config":
"""Generate an example of configuration."""
res = cls(
app=AppConfig(host="0.0.0.0", port=8000, debug=False, cors=CORSConfig(["*"], ["*"], ["*"], True)),
db=MultipleDBsConfig(
master=DBConfig(
host="localhost",
port=5432,
database="{{project_slug}}_db",
user="postgres",
password="postgres",
pool_size=15,
),
replicas=[
DBConfig(
host="localhost",
port=5433,
user="readonly",
password="readonly",
database="{{project_slug}}_db",
pool_size=8,
)
],
),
logging=LoggingConfig(level="INFO", files=[FileLogger(filename="logs/info.log", level="INFO")]),
observability=ObservabilityConfig(
prometheus=PrometheusConfig(host="0.0.0.0", port=9090, urls_mapping={"/api/debug/.*": "/api/debug/*"}),
jaeger=JaegerConfig(endpoint="http://127.0.0.1:4318/v1/traces"),
),
)
return res
@classmethod
def load(cls, file: str | Path | TextIO) -> "{{ProjectName}}Config":
"""Import config from the given filename or raise `ValueError` on error."""
try:
if isinstance(file, (str, Path)):
with open(file, "r", encoding="utf-8") as file_r:
data = yaml.safe_load(file_r)
else:
data: dict = yaml.safe_load(file)
return {{ProjectName}}Config._initialize_from_dict({{ProjectName}}Config, data)
except TypeError as exc:
raise ValueError(f"Seems like config file is invalid: {file}") from exc
except Exception as exc:
raise ValueError(f"Could not read app config file: {file}") from exc
@staticmethod
def _initialize_from_dict(t: Type, data: Any) -> Any:
"""Try to initialize given type field-by-field recursively with data from dictionary substituting {} and None
if no value provided.
"""
if isinstance(t, UnionType):
for inner_type in t.__args__:
if inner_type is NoneType and data is None:
return None
try:
return {{ProjectName}}Config._initialize_from_dict(inner_type, data)
except Exception: # pylint: disable=broad-except
pass
raise ValueError(f"Cannot instanciate type '{t}' from {data}")
if hasattr(t, "__origin__") and t.__origin__ is dict:
return data
if not isinstance(data, dict):
if hasattr(t, "__origin__") and t.__origin__ is Literal and data in t.__args__:
return data
return t(data)
init_dict = {}
for fld in fields(t):
inner_data = data.get(fld.name)
if inner_data is None:
if isinstance(fld.type, UnionType) and NoneType in fld.type.__args__:
init_dict[fld.name] = None
continue
inner_data = {}
else:
init_dict[fld.name] = {{ProjectName}}Config._initialize_from_dict(fld.type, inner_data)
return t(**init_dict)
@classmethod
def from_file(cls, config_path: str) -> "{{ProjectName}}Config":
"""Load configuration from the given path."""
if not config_path or not Path(config_path).is_file():
raise ValueError(f"Requested config is not a valid file: {config_path}")
return cls.load(config_path)

View File

@@ -0,0 +1,21 @@
"""SQL naming convention for Alembic is defined here."""
from sqlalchemy import MetaData
from sqlalchemy.orm import declarative_base
convention = {
"all_column_names": lambda constraint, _: "_".join([str(column.name) for column in constraint.columns.values()]),
"ix": "ix_%(table_name)s_%(all_column_names)s",
"uq": "%(table_name)s_%(all_column_names)s_key",
"ck": "ck_%(table_name)s_%(column_0_name)s",
"fk": "%(table_name)s_fk_%(all_column_names)s__%(referred_table_name)s",
"pk": "%(table_name)s_pk",
}
metadata = MetaData(naming_convention=convention)
DeclarativeBase = declarative_base(metadata=metadata)
__all__ = [
"DeclarativeBase",
"metadata",
]

View File

@@ -0,0 +1,114 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrator/
# template used to generate migration files
file_template = %%(year)d-%%(month).2d-%%(day).2d_%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
path_separator = os
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrator/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrator/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql+asyncpg://%(POSTGRES_USER)s:%(POSTGRES_PASSWORD)s@%(POSTGRES_HOST)s:%(POSTGRES_PORT)s/%(POSTGRES_DB)s?target_session_attrs=read-write
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
hooks = black
black.type = console_scripts
black.entrypoint = black
black.options = -l 120 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,43 @@
"""Database configuration class is defined here."""
from dataclasses import dataclass
from typing import Any
from {{project_slug}}.utils.secrets import SecretStr
@dataclass
class DBConfig:
host: str
port: int
database: str
user: str
password: SecretStr
pool_size: int
def __post_init__(self):
self.password = SecretStr(self.password)
@dataclass
class MultipleDBsConfig:
master: DBConfig
replicas: list[DBConfig] | None = None
def __post_init__(self):
_dict_to_dataclass(self, "master", DBConfig)
if self.replicas is not None:
_list_dict_to_dataclasses(self, "replicas", DBConfig)
def _list_dict_to_dataclasses(config_entry: Any, field_name: str, need_type: type) -> None:
list_dict = getattr(config_entry, field_name)
for i in range(len(list_dict)): # pylint: disable=consider-using-enumerate
if isinstance(list_dict[i], dict):
list_dict[i] = need_type(**list_dict[i])
def _dict_to_dataclass(config_entry: Any, field_name: str, need_type: type) -> None:
value = getattr(config_entry, field_name)
if isinstance(value, dict):
setattr(config_entry, field_name, need_type(**value))

View File

@@ -0,0 +1,7 @@
"""Module responsible for managing database connections."""
from .manager import PostgresConnectionManager
__all__ = [
"PostgresConnectionManager",
]

View File

@@ -0,0 +1,195 @@
"""Connection manager class and get_connection function are defined here."""
from asyncio import Lock
from contextlib import asynccontextmanager
from itertools import cycle
from typing import Any, AsyncIterator
import structlog
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
from {{project_slug}}.db.config import DBConfig
class PostgresConnectionManager: # pylint: disable=too-many-instance-attributes
"""Connection manager for PostgreSQL database"""
def __init__( # pylint: disable=too-many-arguments
self,
master: DBConfig,
replicas: list[DBConfig] | None,
logger: structlog.stdlib.BoundLogger,
*,
engine_options: dict[str, Any] | None = None,
application_name: str | None = None,
) -> None:
"""Initialize connection manager entity."""
self._master_engine: AsyncEngine | None = None
self._replica_engines: list[AsyncEngine] = []
self._master = master
self._replicas = replicas or []
self._lock = Lock()
self._logger = logger
self._engine_options = engine_options or {}
self._application_name = application_name
# Iterator for round-robin through replicas
self._replica_cycle = None
async def update( # pylint: disable=too-many-arguments
self,
*,
master: DBConfig | None = None,
replicas: list[DBConfig] | None = None,
logger: structlog.stdlib.BoundLogger | None = None,
application_name: str | None = None,
engine_options: dict[str, Any] | None = None,
) -> None:
"""Update connection manager parameters and refresh connection."""
self._master = master or self._master
self._replicas = replicas or self._replicas
self._logger = logger or self._logger
self._application_name = application_name or self._application_name
self._engine_options = engine_options or self._engine_options
if self.initialized:
await self.refresh()
@property
def initialized(self) -> bool:
return self._master_engine is not None
async def refresh(self, no_force_refresh: bool = False) -> None:
"""Initialize or reinitialize connection engine.
Params:
no_force_refresh (bool): if set to True and ConnectionManager is already initialized,
no refresh is performed
"""
async with self._lock:
if no_force_refresh and self.initialized:
return
await self.shutdown(use_lock=False)
await self._logger.ainfo(
"creating postgres master connection pool",
max_size=self._master.pool_size,
user=self._master.user,
host=self._master.host,
port=self._master.port,
database=self._master.database,
)
self._master_engine = create_async_engine(
f"postgresql+asyncpg://{self._master.user}:{self._master.password.get_secret_value()}"
f"@{self._master.host}:{self._master.port}/{self._master.database}",
future=True,
pool_size=max(1, self._master.pool_size - 5),
max_overflow=min(self._master.pool_size - 1, 5),
**self._engine_options,
)
try:
async with self._master_engine.connect() as conn:
cur = await conn.execute(select(text("1")))
assert cur.fetchone()[0] == 1
except Exception as exc:
self._master_engine = None
raise RuntimeError("something wrong with database connection, aborting") from exc
if len(self._replicas) > 0:
for replica in self._replicas:
await self._logger.ainfo(
"creating postgres readonly connection pool",
max_size=replica.pool_size,
user=replica.user,
host=replica.host,
port=replica.port,
database=replica.database,
)
replica_engine = create_async_engine(
f"postgresql+asyncpg://{replica.user}:{replica.password.get_secret_value()}@"
f"{replica.host}:{replica.port}/{replica.database}",
future=True,
pool_size=max(1, self._master.pool_size - 5),
max_overflow=min(self._master.pool_size - 1, 5),
**self._engine_options,
)
try:
async with replica_engine.connect() as conn:
cur = await conn.execute(select(1))
assert cur.fetchone()[0] == 1
self._replica_engines.append(replica_engine)
except Exception as exc: # pylint: disable=broad-except
await replica_engine.dispose()
await self._logger.aexception("error connecting to replica", host=replica.host, error=repr(exc))
if self._replica_engines:
self._replica_cycle = cycle(self._replica_engines)
else:
self._replica_cycle = None
await self._logger.awarning("no available replicas, read queries will go to the master")
async def shutdown(self, use_lock: bool = True) -> None:
"""Dispose connection pool and deinitialize. Can be called multiple times."""
if use_lock:
async with self._lock:
await self.shutdown(use_lock=False)
return
if self.initialized:
self._logger.info("shutting down postgres connection engine")
await self._master_engine.dispose()
self._master_engine = None
for engine in self._replica_engines:
await engine.dispose()
self._replica_engines.clear()
@asynccontextmanager
async def get_connection(self) -> AsyncIterator[AsyncConnection]:
"""Get an async connection to the database with read-write ability."""
if not self.initialized:
await self.refresh(no_force_refresh=True)
async with self._master_engine.connect() as conn:
if self._application_name is not None:
await conn.execute(text(f'SET application_name TO "{self._application_name}"'))
await conn.commit()
yield conn
@asynccontextmanager
async def get_ro_connection(self) -> AsyncIterator[AsyncConnection]:
"""Get an async connection to the database which can be read-only and will attempt to use replica instances
of the database."""
if not self.initialized:
await self.refresh(no_force_refresh=True)
# If there are no replicas, use master
if self._replica_cycle is None:
async with self.get_connection() as conn:
yield conn
return
# Select the next replica (round-robin), `self._replica_cycle` is guaranteed to have values here
engine = next(self._replica_cycle) # pylint: disable=stop-iteration-return
conn = None
try:
conn = await engine.connect()
if self._application_name is not None:
await conn.execute(text(f'SET application_name TO "{self._application_name}"'))
await conn.commit()
except Exception as exc: # pylint: disable=broad-except
if conn is not None:
try:
conn.close()
except Exception: # pylint: disable=broad-except
pass
await self._logger.awarning(
"error connecting to replica, falling back to master", error=repr(exc), error_type=type(exc).__name__
)
# On exception from replica fallback to master connection
async with self.get_connection() as conn:
yield conn
return
try:
yield conn
finally:
await conn.close()

View File

View File

@@ -0,0 +1,96 @@
# pylint: disable=wrong-import-position
"""Environment preparation for Alembic is performed here."""
import asyncio
import os
from logging.config import fileConfig
from alembic import context
from dotenv import load_dotenv
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from {{project_slug}}.config import {{ProjectName}}Config
from {{project_slug}}.db import DeclarativeBase
from {{project_slug}}.db.entities import * # pylint: disable=wildcard-import,unused-wildcard-import
envfile_path = os.environ.get("ENVFILE", ".env")
if os.path.isfile(envfile_path):
try:
load_dotenv(envfile_path)
except Exception as exc:
print(f"Got an error while loading envfile '{envfile_path}': {exc!r}")
config = context.config
section = config.config_ini_section
app_settings = {{ProjectName}}Config.from_file(os.getenv("CONFIG_PATH"))
config.set_section_option(section, "POSTGRES_DB", app_settings.db.master.database)
config.set_section_option(section, "POSTGRES_HOST", app_settings.db.master.host)
config.set_section_option(section, "POSTGRES_USER", app_settings.db.master.user)
config.set_section_option(section, "POSTGRES_PASSWORD", app_settings.db.master.password.get_secret_value())
config.set_section_option(section, "POSTGRES_PORT", str(app_settings.db.master.port))
fileConfig(config.config_file_name, disable_existing_loggers=False)
target_metadata = DeclarativeBase.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,27 @@
# pylint: disable=no-member,invalid-name,missing-function-docstring,too-many-statements
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1 @@
"""Dependencies-related functions are located here."""

View File

@@ -0,0 +1,25 @@
"""PostgresConnectionManager dependency functions are defined here."""
from fastapi import FastAPI, Request
from {{project_slug}}.db.connection.manager import PostgresConnectionManager
def init_dispencer(app: FastAPI, connection_manager: PostgresConnectionManager) -> None:
"""Initialize PostgresConnectionManager dispencer at app's state."""
if hasattr(app.state, "postgres_connection_manager"):
if not isinstance(app.state.postgres_connection_manager, PostgresConnectionManager):
raise ValueError(
"postgres_connection_manager attribute of app's state is already set"
f"with other value ({app.state.postgres_connection_manager})"
)
return
app.state.postgres_connection_manager = connection_manager
def obtain(request: Request) -> PostgresConnectionManager:
"""Get a PostgresConnectionManager from request's app state."""
if not hasattr(request.app.state, "postgres_connection_manager"):
raise ValueError("PostgresConnectionManager dispencer was not initialized at app preparation")
return request.app.state.postgres_connection_manager

View File

@@ -0,0 +1,33 @@
"""PostgresConnectionManager dependency functions are defined here."""
from fastapi import FastAPI, Request
from structlog.stdlib import BoundLogger
def init_dispencer(app: FastAPI, logger: BoundLogger) -> None:
"""Initialize BoundLogger dispencer at app's state."""
if hasattr(app.state, "logger"):
if not isinstance(app.state.logger, BoundLogger):
raise ValueError("logger attribute of app's state is already set" f"with other value ({app.state.logger})")
return
app.state.logger = logger
def attach_to_request(request: Request, logger: BoundLogger) -> None:
"""Set logger for a concrete request. If request had already had a logger, replace it."""
if hasattr(request.state, "logger"):
if not isinstance(request.state.logger, BoundLogger):
logger.warning("request.state.logger is already set with other value", value=request.state.logger)
request.state.logger = logger
def obtain(request: Request) -> BoundLogger:
"""Get a logger from request or app state."""
if hasattr(request.state, "logger"):
logger = request.state.logger
if isinstance(logger, BoundLogger):
return logger
if not hasattr(request.app.state, "logger"):
raise ValueError("BoundLogger dispencer was not initialized at app preparation")
return request.app.state.logger

View File

@@ -0,0 +1,5 @@
from .base import {{ProjectName}}Error
__all__ = [
"{{ProjectName}}Error",
]

View File

@@ -0,0 +1,10 @@
"""Base application exception class is defined here."""
class {{ProjectName}}Error(RuntimeError):
"""
Base application exception to inherit from.
"""
def __str__(self) -> str:
return "Unexpected error happened in {{project_name}}"

View File

@@ -0,0 +1,34 @@
"""Mapper from exceptions to HTTP responses is defined here"""
from typing import Callable, Type
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class ExceptionMapper:
def __init__(self):
self._known_exceptions: dict[Type, Callable[[Exception], JSONResponse]] = {}
def register_simple(self, exception_type: Type, status_code: int, detail: str) -> None:
self._known_exceptions[exception_type] = lambda _: JSONResponse(
{"code": status_code, "detail": detail}, status_code=status_code
)
def register_func(self, exception_type: Type, func: Callable[[Exception], JSONResponse]) -> None:
self._known_exceptions[exception_type] = func
def is_known(self, exc: Exception) -> bool:
return type(exc) in self._known_exceptions or isinstance(exc, HTTPException)
def apply(self, exc: Exception) -> JSONResponse:
if (res := self.apply_if_known(exc)) is not None:
return res
return JSONResponse({"code": 500, "detail": "unhandled exception"}, status_code=500)
def apply_if_known(self, exc: Exception) -> JSONResponse | None:
if isinstance(exc, HTTPException):
return JSONResponse({"code": exc.status_code, "detail": exc.detail}, status_code=exc.status_code)
if (t := type(exc)) in self._known_exceptions:
return self._known_exceptions[t](exc)
return None

View File

@@ -0,0 +1,150 @@
"""FastAPI application initialization is performed here."""
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.responses import JSONResponse
from {{project_slug}}.config import {{ProjectName}}Config
from {{project_slug}}.db.connection.manager import PostgresConnectionManager
from {{project_slug}}.dependencies import connection_manager_dep, logger_dep
from {{project_slug}}.exceptions.mapper import ExceptionMapper
from {{project_slug}}.handlers.debug import DebugException, DebugExceptionWithParams
from {{project_slug}}.middlewares.exception_handler import ExceptionHandlerMiddleware
from {{project_slug}}.middlewares.observability import ObservabilityMiddleware
from {{project_slug}}.observability.metrics import init_metrics
from {{project_slug}}.observability.otel_agent import OpenTelemetryAgent
from {{project_slug}}.utils.observability import URLsMapper, configure_logging
from .handlers import list_of_routers
from .version import LAST_UPDATE, VERSION
def _get_exception_mapper(debug: bool) -> ExceptionMapper:
mapper = ExceptionMapper()
if debug:
mapper.register_simple(DebugException, 506, "That's how a debug exception look like")
mapper.register_func(
DebugExceptionWithParams,
lambda exc: JSONResponse(
{"error": "That's how a debug exception with params look like", "message": exc.message},
status_code=exc.status_code,
),
)
return mapper
def bind_routes(application: FastAPI, prefix: str, debug: bool) -> None:
"""Bind all routes to application."""
for router in list_of_routers:
if not debug:
to_remove = []
for i, route in enumerate(router.routes):
if "debug" in route.path:
to_remove.append(i)
for i in to_remove[::-1]:
del router.routes[i]
if len(router.routes) > 0:
application.include_router(router, prefix=(prefix if "/" not in {r.path for r in router.routes} else ""))
def get_app(prefix: str = "/api") -> FastAPI:
"""Create application and all dependable objects."""
if "CONFIG_PATH" not in os.environ:
raise ValueError("CONFIG_PATH environment variable is not set")
app_config: {{ProjectName}}Config = {{ProjectName}}Config.from_file(os.getenv("CONFIG_PATH"))
description = "{{project_description}}"
application = FastAPI(
title="{{project_name}}",
description=description,
docs_url=None,
openapi_url=f"{prefix}/openapi",
version=f"{VERSION} ({LAST_UPDATE})",
terms_of_service="http://swagger.io/terms/",
contact={"email": "idu@itmo.ru"},
license_info={"name": "Apache 2.0", "url": "http://www.apache.org/licenses/LICENSE-2.0.html"},
lifespan=lifespan,
)
bind_routes(application, prefix, app_config.app.debug)
@application.get(f"{prefix}/docs", include_in_schema=False)
async def custom_swagger_ui_html():
return get_swagger_ui_html(
openapi_url=application.openapi_url,
title=application.title + " - Swagger UI",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url="https://unpkg.com/swagger-ui-dist@5.11.7/swagger-ui-bundle.js",
swagger_css_url="https://unpkg.com/swagger-ui-dist@5.11.7/swagger-ui.css",
)
application.add_middleware(
CORSMiddleware,
allow_origins=app_config.app.cors.allow_origins,
allow_credentials=app_config.app.cors.allow_credentials,
allow_methods=app_config.app.cors.allow_methods,
allow_headers=app_config.app.cors.allow_headers,
)
application.state.config = app_config
exception_mapper = _get_exception_mapper(app_config.app.debug)
metrics = init_metrics()
urls_mapper = URLsMapper(app_config.observability.prometheus.urls_mapping)
application.add_middleware(
ObservabilityMiddleware,
exception_mapper=exception_mapper,
metrics=metrics,
urls_mapper=urls_mapper,
)
application.add_middleware(
ExceptionHandlerMiddleware,
debug=[False], # reinitialized on startup
exception_mapper=exception_mapper,
)
return application
@asynccontextmanager
async def lifespan(application: FastAPI):
"""Lifespan function.
Initializes database connection in pass_services_dependencies middleware.
"""
app_config: {{ProjectName}}Config = application.state.config
loggers_dict = {logger_config.filename: logger_config.level for logger_config in app_config.logging.files}
logger = configure_logging(app_config.logging.level, loggers_dict)
await logger.ainfo("application is being configured", config=app_config.to_order_dict())
connection_manager = PostgresConnectionManager(
master=app_config.db.master,
replicas=app_config.db.replicas,
logger=logger,
application_name=f"{{project_slug}}_{VERSION}",
)
connection_manager_dep.init_dispencer(application, connection_manager)
logger_dep.init_dispencer(application, logger)
for middleware in application.user_middleware:
if middleware.cls == ExceptionHandlerMiddleware:
middleware.kwargs["debug"][0] = app_config.app.debug
otel_agent = OpenTelemetryAgent(
app_config.observability.prometheus,
app_config.observability.jaeger,
)
yield
otel_agent.shutdown()
app = get_app()

View File

@@ -0,0 +1,20 @@
"""All FastAPI handlers are exported from this module."""
import importlib
from pathlib import Path
from .routers import routers_list
for file in sorted(Path(__file__).resolve().parent.iterdir()):
if file.name.endswith(".py"):
importlib.import_module(f".{file.name[:-3]}", __package__)
list_of_routers = [
*routers_list,
]
__all__ = [
"list_of_routers",
]

View File

@@ -0,0 +1,114 @@
"""Debug and testing endpoints are defined here. They should be included in router only in debug-mode."""
import asyncio
from typing import Literal
import aiohttp
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from opentelemetry import trace
from starlette import status
from {{project_slug}}.schemas import PingResponse
from {{project_slug}}.utils.observability import get_span_headers
from .routers import debug_errors_router
_tracer = trace.get_tracer(__name__)
class DebugException(Exception):
pass
class DebugExceptionWithParams(Exception):
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
@debug_errors_router.get(
"/status_code",
response_model=PingResponse,
status_code=status.HTTP_200_OK,
)
async def get_status_code(status_code: int, as_exception: bool = False):
"""
Return given status code. If `as_exception` is set to True, return as HTTPException.
"""
if as_exception:
raise HTTPException(
status_code=status_code, detail=f"debugging with status code {status_code} as http_exception"
)
return JSONResponse(PingResponse().model_dump(), status_code=status_code)
@debug_errors_router.get(
"/exception",
response_model=PingResponse,
status_code=status.HTTP_200_OK,
)
async def get_exception(error_type: Literal["RuntimeError", "DebugException", "DebugExceptionWithParams"]):
"""
Raise exception: `RuntimeError`, `DebugException` or `DebugExceptionWithParams` depending on given parameter.
"""
if error_type == "DebugException":
raise DebugException()
if error_type == "DebugExceptionWithParams":
raise DebugExceptionWithParams(522, "Message goes here")
raise RuntimeError("That's how runtime errors look like")
@debug_errors_router.get(
"/tracing_check",
status_code=status.HTTP_200_OK,
)
async def tracing_check():
"""
Add event to span, and sleep 2 seconds inside an inner span.
"""
span = trace.get_current_span()
span.add_event("successful log entry", attributes={"parameters": "go here"})
with _tracer.start_as_current_span("long operation") as inner_span:
inner_span.add_event("going to sleep")
await asyncio.sleep(2)
inner_span.add_event("woke up")
return JSONResponse({"finished": "ok"})
@debug_errors_router.get(
"/inner_get_tracing",
status_code=status.HTTP_200_OK,
)
async def inner_get(host: str = "http://127.0.0.1:8080"):
"""
Perform GET request with span proxying to get more complicated trace.
"""
def perform_get(session: aiohttp.ClientSession) -> asyncio.Task:
return asyncio.create_task(
session.get(
"/api/debug/tracing_check",
headers=get_span_headers(),
)
)
with _tracer.start_as_current_span("inner request"):
inner_results = []
async with aiohttp.ClientSession(host) as session:
requests_futures = [perform_get(session) for _ in range(2)]
results: list[aiohttp.ClientResponse] = await asyncio.gather(*requests_futures)
results.append(await perform_get(session))
for response in results:
inner_results.append(
{
"inner_response": await response.json(),
"headers": dict(response.headers.items()),
"status_code": response.status,
}
)
return JSONResponse({"inner_results": inner_results})

View File

@@ -0,0 +1,18 @@
"""API routers are defined here.
It is needed to import files which use these routers to initialize handlers.
"""
from fastapi import APIRouter
system_router = APIRouter(tags=["system"])
debug_errors_router = APIRouter(tags=["debug"], prefix="/debug")
routers_list = [
system_router,
debug_errors_router,
]
__all__ = [
"routers_list",
]

View File

@@ -0,0 +1,43 @@
"""System endpoints are defined here."""
import fastapi
from starlette import status
from {{project_slug}}.dependencies import connection_manager_dep
from {{project_slug}}.logic import system as system_logic
from {{project_slug}}.schemas import PingResponse
from .routers import system_router
@system_router.get("/", status_code=status.HTTP_307_TEMPORARY_REDIRECT, include_in_schema=False)
@system_router.get("/api/", status_code=status.HTTP_307_TEMPORARY_REDIRECT, include_in_schema=False)
async def redirect_to_swagger_docs():
"""Redirects to **/api/docs** from **/**"""
return fastapi.responses.RedirectResponse("/api/docs", status_code=status.HTTP_307_TEMPORARY_REDIRECT)
@system_router.get(
"/health_check/ping",
response_model=PingResponse,
status_code=status.HTTP_200_OK,
)
async def ping():
"""
Check that application is alive.
"""
return PingResponse()
@system_router.get(
"/health_check/ping_db",
response_model=PingResponse,
status_code=status.HTTP_200_OK,
)
async def ping_db(request: fastapi.Request, readonly: bool = False):
"""
Check that database connection is valid.
"""
connection_manager = connection_manager_dep.obtain(request)
return await system_logic.ping_db(connection_manager, readonly)

View File

@@ -0,0 +1 @@
"""Handlers logic functions are located here."""

View File

@@ -0,0 +1,16 @@
"""/system handlers logic is defined here."""
from sqlalchemy import select, text
from {{project_slug}}.db.connection.manager import PostgresConnectionManager
from {{project_slug}}.schemas.health_check import PingResponse
async def ping_db(connection_manager: PostgresConnectionManager, readonly: bool) -> PingResponse:
conn_future = connection_manager.get_connection() if not readonly else connection_manager.get_ro_connection()
query = select(text("1"))
async with conn_future as conn:
res = (await conn.execute(query)).scalar_one()
if res == 1:
return PingResponse()

View File

@@ -0,0 +1 @@
"""FastAPI middlewares are located here."""

View File

@@ -0,0 +1,79 @@
"""Exception handling middleware is defined here."""
import itertools
import traceback
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from {{project_slug}}.exceptions.mapper import ExceptionMapper
from .observability import ObservableException
class ExceptionHandlerMiddleware(BaseHTTPMiddleware): # pylint: disable=too-few-public-methods
"""Handle exceptions, so they become http response code 500 - Internal Server Error.
If debug is activated in app configuration, then stack trace is returned, otherwise only a generic error message.
Message is sent to logger error stream anyway.
"""
def __init__(
self,
app: FastAPI,
debug: list[bool],
exception_mapper: ExceptionMapper,
):
"""Passing debug as a list with single element is a hack to be able to change the value on the fly."""
super().__init__(app)
self._debug = debug
self._mapper = exception_mapper
async def dispatch(self, request: Request, call_next):
try:
return await call_next(request)
except Exception as exc: # pylint: disable=broad-except
additional_headers: dict[str, str] | None = None
if isinstance(exc, ObservableException):
additional_headers = {"X-Trace-Id": exc.trace_id, "X-Span-Id": str(exc.span_id)}
exc = exc.__cause__
status_code = 500
detail = "exception occured"
if isinstance(exc, HTTPException):
status_code = exc.status_code # pylint: disable=no-member
detail = exc.detail # pylint: disable=no-member
if self._debug[0]:
if (res := self._mapper.apply_if_known(exc)) is not None:
response = res
else:
response = JSONResponse(
{
"code": status_code,
"detail": detail,
"error": str(exc),
"error_type": type(exc).__name__,
"path": request.url.path,
"params": request.url.query,
"tracebacks": _get_tracebacks(exc),
},
status_code=status_code,
)
else:
response = self._mapper.apply(exc)
if additional_headers is not None:
response.headers.update(additional_headers)
return response
def _get_tracebacks(exc: Exception) -> list[list[str]]:
tracebacks: list[list[str]] = []
while exc is not None:
tracebacks.append(
list(itertools.chain.from_iterable(map(lambda x: x.split("\n"), traceback.format_tb(exc.__traceback__))))
)
tracebacks[-1].append(f"{exc.__class__.__module__}.{exc.__class__.__qualname__}: {exc}")
exc = exc.__cause__
return tracebacks

View File

@@ -0,0 +1,177 @@
"""Observability middleware is defined here."""
import time
from random import randint
import structlog
from fastapi import FastAPI, HTTPException, Request, Response
from opentelemetry import context as tracing_context
from opentelemetry import trace
from opentelemetry.semconv.attributes import exception_attributes, http_attributes, url_attributes
from opentelemetry.trace import NonRecordingSpan, Span, SpanContext, TraceFlags
from starlette.middleware.base import BaseHTTPMiddleware
from {{project_slug}}.dependencies import logger_dep
from {{project_slug}}.exceptions.mapper import ExceptionMapper
from {{project_slug}}.observability.metrics import Metrics
from {{project_slug}}.utils.observability import URLsMapper, get_handler_from_path
_tracer = trace.get_tracer_provider().get_tracer(__name__)
class ObservableException(RuntimeError):
"""Runtime Error with `trace_id` and `span_id` set. Guranteed to have `.__cause__` as its parent exception."""
def __init__(self, trace_id: str, span_id: int):
super().__init__()
self.trace_id = trace_id
self.span_id = span_id
class ObservabilityMiddleware(BaseHTTPMiddleware): # pylint: disable=too-few-public-methods
"""Middleware for global observability requests.
- Generate tracing span and adds response header 'X-Trace-Id' and X-Span-Id'
- Binds trace_id it to logger passing it in request state (`request.state.logger`)
- Collects metrics for Prometheus
In case when jaeger is not enabled, trace_id and span_id are generated randomly.
"""
def __init__(self, app: FastAPI, exception_mapper: ExceptionMapper, metrics: Metrics, urls_mapper: URLsMapper):
super().__init__(app)
self._exception_mapper = exception_mapper
self._metrics = metrics
self._urls_mapper = urls_mapper
async def dispatch(self, request: Request, call_next):
logger = logger_dep.obtain(request)
_try_get_parent_span_id(request)
with _tracer.start_as_current_span("http-request") as span:
trace_id = hex(span.get_span_context().trace_id or randint(1, 1 << 63))[2:]
span_id = span.get_span_context().span_id or randint(1, 1 << 32)
span.set_attributes(
{
http_attributes.HTTP_REQUEST_METHOD: request.method,
url_attributes.URL_PATH: request.url.path,
url_attributes.URL_QUERY: str(request.query_params),
"request_client": request.client.host,
}
)
logger = logger.bind(trace_id=trace_id, span_id=span_id)
request.state.logger = logger
await logger.ainfo(
"handling request",
client=request.client.host,
path_params=request.path_params,
method=request.method,
url=str(request.url),
)
path_for_metric = self._urls_mapper.map(request.url.path)
self._metrics.requests_started.add(1, {"method": request.method, "path": path_for_metric})
time_begin = time.monotonic()
try:
result = await call_next(request)
duration_seconds = time.monotonic() - time_begin
result.headers.update({"X-Trace-Id": trace_id, "X-Span-Id": str(span_id)})
await self._handle_success(
request=request,
result=result,
logger=logger,
span=span,
path_for_metric=path_for_metric,
duration_seconds=duration_seconds,
)
return result
except Exception as exc:
duration_seconds = time.monotonic() - time_begin
await self._handle_exception(
request=request, exc=exc, logger=logger, span=span, duration_seconds=duration_seconds
)
raise ObservableException(trace_id=trace_id, span_id=span_id) from exc
finally:
self._metrics.request_processing_duration.record(
duration_seconds, {"method": request.method, "path": path_for_metric}
)
async def _handle_success( # pylint: disable=too-many-arguments
self,
*,
request: Request,
result: Response,
logger: structlog.stdlib.BoundLogger,
span: Span,
path_for_metric: str,
duration_seconds: float,
) -> None:
await logger.ainfo("request handled successfully", time_consumed=round(duration_seconds, 3))
self._metrics.requests_finished.add(
1, {"method": request.method, "path": path_for_metric, "status_code": result.status_code}
)
span.set_attribute(http_attributes.HTTP_RESPONSE_STATUS_CODE, result.status_code)
async def _handle_exception( # pylint: disable=too-many-arguments
self,
*,
request: Request,
exc: Exception,
logger: structlog.stdlib.BoundLogger,
span: Span,
duration_seconds: float,
) -> None:
cause = exc
status_code = 500
if isinstance(exc, HTTPException):
status_code = getattr(exc, "status_code")
if exc.__cause__ is not None:
cause = exc.__cause__
self._metrics.errors.add(
1,
{
"method": request.method,
"path": get_handler_from_path(request.url.path),
"error_type": type(cause).__name__,
"status_code": status_code,
},
)
span.record_exception(exc)
if self._exception_mapper.is_known(exc):
log_func = logger.aerror
else:
log_func = logger.aexception
await log_func(
"failed to handle request", time_consumed=round(duration_seconds, 3), error_type=type(exc).__name__
)
span.set_attributes(
{
exception_attributes.EXCEPTION_TYPE: type(exc).__name__,
exception_attributes.EXCEPTION_MESSAGE: repr(exc),
http_attributes.HTTP_RESPONSE_STATUS_CODE: status_code,
}
)
def _try_get_parent_span_id(request: Request) -> None:
trace_id_str = request.headers.get("X-Trace-Id")
span_id_str = request.headers.get("X-Span-Id")
if trace_id_str is None or span_id_str is None:
return
if not trace_id_str.isnumeric() or not span_id_str.isnumeric():
return
span_context = SpanContext(
trace_id=int(trace_id_str), span_id=int(span_id_str), is_remote=True, trace_flags=TraceFlags(0x01)
)
ctx = trace.set_span_in_context(NonRecordingSpan(span_context))
tracing_context.attach(ctx)

View File

@@ -0,0 +1 @@
"""Observability-related functionality is located here."""

View File

@@ -0,0 +1,47 @@
"""Application metrics are defined here."""
from dataclasses import dataclass
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, Histogram
@dataclass
class Metrics:
request_processing_duration: Histogram
"""Processing time histogram in seconds by `["method", "path"]`."""
requests_started: Counter
"""Total started requests counter by `["method", "path"]`."""
requests_finished: Counter
"""Total finished requests counter by `["method", "path", "status_code"]`."""
errors: Counter
"""Total errors (exceptions) counter by `["method", "path", "error_type", "status_code"]`."""
def init_metrics() -> Metrics:
meter = metrics.get_meter("{{project_name}}")
return Metrics(
request_processing_duration=meter.create_histogram(
"request_processing_duration",
"sec",
"Request processing duration time in seconds",
explicit_bucket_boundaries_advisory=[
0.05,
0.2,
0.3,
0.7,
1.0,
1.5,
2.5,
5.0,
10.0,
20.0,
40.0,
60.0,
120.0,
],
),
requests_started=meter.create_counter("requests_started_total", "1", "Total number of started requests"),
requests_finished=meter.create_counter("request_finished_total", "1", "Total number of finished requests"),
errors=meter.create_counter("request_errors_total", "1", "Total number of errors (exceptions) in requests"),
)

View File

@@ -0,0 +1,22 @@
"""Prometheus server configuration class is defined here."""
from threading import Thread
from wsgiref.simple_server import WSGIServer
from prometheus_client import start_http_server
class PrometheusServer: # pylint: disable=too-few-public-methods
def __init__(self, port: int = 9464, host: str = "0.0.0.0"):
self._host = host
self._port = port
self._server: WSGIServer
self._thread: Thread
self._server, self._thread = start_http_server(self._port)
def shutdown(self):
if self._server is not None:
self._server.shutdown()
self._server = None

View File

@@ -0,0 +1,46 @@
"""Open Telemetry agent initialization is defined here"""
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from {{project_slug}}.config import JaegerConfig, PrometheusConfig
from {{project_slug}}.version import VERSION as APP_VERSION
from .metrics_server import PrometheusServer
class OpenTelemetryAgent: # pylint: disable=too-few-public-methods
def __init__(self, prometheus_config: PrometheusConfig | None, jaeger_config: JaegerConfig | None):
self._resource = Resource.create(
attributes={
SERVICE_NAME: "{{project_name}}",
SERVICE_VERSION: APP_VERSION,
}
)
self._prometheus: PrometheusServer | None = None
self._span_exporter: OTLPSpanExporter | None = None
if prometheus_config is not None:
self._prometheus = PrometheusServer(port=prometheus_config.port, host=prometheus_config.host)
reader = PrometheusMetricReader()
provider = MeterProvider(resource=self._resource, metric_readers=[reader])
metrics.set_meter_provider(provider)
if jaeger_config is not None:
self._span_exporter = OTLPSpanExporter(endpoint=jaeger_config.endpoint)
tracer_provider = TracerProvider(resource=self._resource)
processor = BatchSpanProcessor(span_exporter=self._span_exporter)
tracer_provider.add_span_processor(processor)
trace.set_tracer_provider(tracer_provider)
def shutdown(self) -> None:
"""Stop metrics and tracing services if they were started."""
if self._prometheus is not None:
self._prometheus.shutdown()

View File

@@ -0,0 +1,5 @@
from .health_check import PingResponse
__all__ = [
"PingResponse",
]

View File

@@ -0,0 +1,9 @@
"""Ping endpoint response is defined here."""
from pydantic import BaseModel, Field
class PingResponse(BaseModel):
"""Ping response model, contains default message."""
message: str = Field(default="Pong!")

View File

@@ -0,0 +1,104 @@
"""Observability helper functions are defined here."""
import logging
import re
import sys
from pathlib import Path
from typing import Literal
import structlog
from opentelemetry import trace
LoggingLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
def configure_logging(
log_level: LoggingLevel, files: dict[str, LoggingLevel] | None = None, root_logger_level: LoggingLevel = "INFO"
) -> structlog.stdlib.BoundLogger:
level_name_mapping = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
files = files or {}
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger: structlog.stdlib.BoundLogger = structlog.get_logger("main")
logger.setLevel(level_name_mapping[log_level])
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(
structlog.stdlib.ProcessorFormatter(processor=structlog.dev.ConsoleRenderer(colors=True))
)
root_logger = logging.getLogger()
root_logger.addHandler(console_handler)
for filename, level in files.items():
try:
Path(filename).parent.mkdir(parents=True, exist_ok=True)
except Exception as exc:
print(f"Cannot create directory for log file {filename}, application will crash most likely. {exc!r}")
file_handler = logging.FileHandler(filename=filename, encoding="utf-8")
file_handler.setFormatter(structlog.stdlib.ProcessorFormatter(processor=structlog.processors.JSONRenderer()))
file_handler.setLevel(level_name_mapping[level])
root_logger.addHandler(file_handler)
root_logger.setLevel(root_logger_level)
return logger
def get_handler_from_path(path: str) -> str:
parts = path.split("/")
return "/".join(part if not part.rstrip(".0").isdigit() else "*" for part in parts)
class URLsMapper:
"""Helper to change URL from given regex pattern to the given static value.
For example, with map {"/api/debug/.*": "/api/debug/*"} all requests with URL starting with "/api/debug/"
will be placed in path "/api/debug/*" in metrics.
"""
def __init__(self, urls_map: dict[str, str]):
self._map: dict[re.Pattern, str] = {}
for pattern, value in urls_map.items():
self.add(pattern, value)
def add(self, pattern: str, mapped_to: str) -> None:
"""Add entry to the map. If pattern compilation is failed, ValueError is raised."""
regexp = re.compile(pattern)
self._map[regexp] = mapped_to
def map(self, url: str) -> str:
"""Check every map entry with `re.match` and return matched value. If not found, return original string."""
for regexp, mapped_to in self._map.items():
if regexp.match(url) is not None:
return mapped_to
return url
def get_span_headers() -> dict[str, str]:
ctx = trace.get_current_span().get_span_context()
return {
"X-Span-Id": str(ctx.span_id),
"X-Trace-Id": str(ctx.trace_id),
}

View File

@@ -0,0 +1,49 @@
"""Basic functionality to work with sensitive data in configs is defined here."""
import os
import re
from typing import Any
import yaml
_env_re = re.compile(r"^!env\((?P<env_var>.+)\)$")
class SecretStr(str):
"""String value which returns "<REDACTED>" on str() and repr() calls.
If given value matches pattern `^!env\\(.+\\)$` then try to get value from environment variables by the given name.
To get a value inside one should use `get_secret_value` method.
"""
def __new__(cls, other: Any):
if isinstance(other, SecretStr):
return super().__new__(cls, other.get_secret_value())
if isinstance(other, str):
if (m := _env_re.match(other)) is not None:
env_var = m.group("env_var")
if env_var in os.environ:
other = os.environ[env_var]
else:
print(
f"CAUTION: secret variable '{other}' looks like a mapping from env,"
f" but no '{env_var} value is found'"
)
return super().__new__(cls, other)
def __str__(self) -> str:
return "<REDACTED>"
def __repr__(self) -> str:
return "'<REDACTED!r>'"
def get_secret_value(self) -> str:
return super().__str__()
def representSecretStrYAML(dumper: yaml.Dumper, s: SecretStr):
return dumper.represent_str(s.get_secret_value())
yaml.add_representer(SecretStr, representSecretStrYAML)

View File

@@ -0,0 +1,4 @@
"""Version constants are defined here and should be manually updated on release."""
VERSION = "0.1.0"
LAST_UPDATE = "2025-11-29"