Initial commit
Some checks failed
Run linters on applied template / Python 3.13 lint and build (push) Failing after 39s

This is a FastAPI backend microservice template used with `copier` utility.

Features of applied template are:
- Configuration file processing logic
- Metrics and tracing (both optional) configuration available
- Debug endpoints
- Database migration commands, prepared Alembic environment
- Database usage example in ping_db endpoint
- gitea sanity check pipeline
This commit is contained in:
2025-11-29 21:42:27 +03:00
commit 6e9ed87ec4
52 changed files with 4563 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Observability-related functionality is located here."""

View File

@@ -0,0 +1,47 @@
"""Application metrics are defined here."""
from dataclasses import dataclass
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, Histogram
@dataclass
class Metrics:
request_processing_duration: Histogram
"""Processing time histogram in seconds by `["method", "path"]`."""
requests_started: Counter
"""Total started requests counter by `["method", "path"]`."""
requests_finished: Counter
"""Total finished requests counter by `["method", "path", "status_code"]`."""
errors: Counter
"""Total errors (exceptions) counter by `["method", "path", "error_type", "status_code"]`."""
def init_metrics() -> Metrics:
meter = metrics.get_meter("{{project_name}}")
return Metrics(
request_processing_duration=meter.create_histogram(
"request_processing_duration",
"sec",
"Request processing duration time in seconds",
explicit_bucket_boundaries_advisory=[
0.05,
0.2,
0.3,
0.7,
1.0,
1.5,
2.5,
5.0,
10.0,
20.0,
40.0,
60.0,
120.0,
],
),
requests_started=meter.create_counter("requests_started_total", "1", "Total number of started requests"),
requests_finished=meter.create_counter("request_finished_total", "1", "Total number of finished requests"),
errors=meter.create_counter("request_errors_total", "1", "Total number of errors (exceptions) in requests"),
)

View File

@@ -0,0 +1,22 @@
"""Prometheus server configuration class is defined here."""
from threading import Thread
from wsgiref.simple_server import WSGIServer
from prometheus_client import start_http_server
class PrometheusServer: # pylint: disable=too-few-public-methods
def __init__(self, port: int = 9464, host: str = "0.0.0.0"):
self._host = host
self._port = port
self._server: WSGIServer
self._thread: Thread
self._server, self._thread = start_http_server(self._port)
def shutdown(self):
if self._server is not None:
self._server.shutdown()
self._server = None

View File

@@ -0,0 +1,46 @@
"""Open Telemetry agent initialization is defined here"""
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from {{project_slug}}.config import JaegerConfig, PrometheusConfig
from {{project_slug}}.version import VERSION as APP_VERSION
from .metrics_server import PrometheusServer
class OpenTelemetryAgent: # pylint: disable=too-few-public-methods
def __init__(self, prometheus_config: PrometheusConfig | None, jaeger_config: JaegerConfig | None):
self._resource = Resource.create(
attributes={
SERVICE_NAME: "{{project_name}}",
SERVICE_VERSION: APP_VERSION,
}
)
self._prometheus: PrometheusServer | None = None
self._span_exporter: OTLPSpanExporter | None = None
if prometheus_config is not None:
self._prometheus = PrometheusServer(port=prometheus_config.port, host=prometheus_config.host)
reader = PrometheusMetricReader()
provider = MeterProvider(resource=self._resource, metric_readers=[reader])
metrics.set_meter_provider(provider)
if jaeger_config is not None:
self._span_exporter = OTLPSpanExporter(endpoint=jaeger_config.endpoint)
tracer_provider = TracerProvider(resource=self._resource)
processor = BatchSpanProcessor(span_exporter=self._span_exporter)
tracer_provider.add_span_processor(processor)
trace.set_tracer_provider(tracer_provider)
def shutdown(self) -> None:
"""Stop metrics and tracing services if they were started."""
if self._prometheus is not None:
self._prometheus.shutdown()