All checks were successful
Run linters on applied template / Python 3.13 lint and build (push) Successful in 1m40s
Changes: - put ObservabilityMiddleware before ExceptionHandlerMiddleware to avoid repetative code - add application startup and last metrics update metrics along with CPU usage metric and threads count - move host and port to new uvicorn section at config along with new reload and forwarded_allow_ips - add request_id and remove trace_id/span_id generation if tracing is disabled - move logging logic from utils to observability - pass trace_id/span_id in HEX form
35 lines
1.5 KiB
Django/Jinja
35 lines
1.5 KiB
Django/Jinja
"""PostgresConnectionManager dependency functions are defined here."""
|
|
|
|
from fastapi import FastAPI, Request
|
|
|
|
from {{project_slug}}.db.connection.manager import PostgresConnectionManager
|
|
|
|
|
|
def init_dispencer(app: FastAPI, connection_manager: PostgresConnectionManager) -> None:
|
|
"""Initialize PostgresConnectionManager dispencer at app's state."""
|
|
if hasattr(app.state, "postgres_connection_manager_dep"):
|
|
if not isinstance(app.state.postgres_connection_manager_dep, PostgresConnectionManager):
|
|
raise ValueError(
|
|
"postgres_connection_manager_dep attribute of app's state is already set"
|
|
f"with other value ({app.state.postgres_connection_manager_dep})"
|
|
)
|
|
return
|
|
|
|
app.state.postgres_connection_manager_dep = connection_manager
|
|
|
|
|
|
def from_app(app: FastAPI) -> PostgresConnectionManager:
|
|
"""Get a connection_manager from app state."""
|
|
if not hasattr(app.state, "postgres_connection_manager_dep"):
|
|
raise ValueError("PostgresConnectionManager dispencer was not initialized at app preparation")
|
|
return app.state.postgres_connection_manager_dep
|
|
|
|
|
|
async def from_request(request: Request) -> PostgresConnectionManager:
|
|
"""Get a PostgresConnectionManager from request or app state."""
|
|
if hasattr(request.state, "postgres_connection_manager_dep"):
|
|
connection_manager = request.state.postgres_connection_manager_dep
|
|
if isinstance(connection_manager, PostgresConnectionManager):
|
|
return connection_manager
|
|
return from_app(request.app)
|