All checks were successful
Run linters on applied template / Python 3.13 lint and build (push) Successful in 1m40s
Changes: - put ObservabilityMiddleware before ExceptionHandlerMiddleware to avoid repetative code - add application startup and last metrics update metrics along with CPU usage metric and threads count - move host and port to new uvicorn section at config along with new reload and forwarded_allow_ips - add request_id and remove trace_id/span_id generation if tracing is disabled - move logging logic from utils to observability - pass trace_id/span_id in HEX form
47 lines
1.6 KiB
Django/Jinja
47 lines
1.6 KiB
Django/Jinja
"""Authentication dependency function is defined here."""
|
|
|
|
from dataclasses import dataclass
|
|
|
|
import jwt
|
|
from fastapi import Request
|
|
|
|
from {{project_slug}}.exceptions.auth import NotAuthorizedError
|
|
|
|
from . import logger_dep
|
|
|
|
|
|
@dataclass
|
|
class AuthenticationData:
|
|
api_key: str | None
|
|
jwt_payload: dict | None
|
|
|
|
|
|
def _from_request(request: Request, required: bool = True) -> AuthenticationData | None:
|
|
if not hasattr(request.state, "auth_dep"):
|
|
auth = AuthenticationData(None, None)
|
|
if (value := request.headers.get("X-API-Key")) is not None:
|
|
auth.api_key = value
|
|
if (value := request.headers.get("Authorization")) is not None and value.startswith("Bearer "):
|
|
value = value[7:]
|
|
try:
|
|
auth.jwt_payload = jwt.decode(value, algorithms=["HS256"], options={"verify_signature": False})
|
|
except Exception: # pylint: disable=broad-except
|
|
logger = logger_dep.from_request(request)
|
|
logger.warning("failed to parse Authorization header as jwt", value=value)
|
|
logger.debug("failed to parse Authorization header as jwt", exc_info=True)
|
|
if auth.api_key is not None or auth.jwt_payload is not None:
|
|
request.state.auth_dep = auth
|
|
else:
|
|
request.state.auth_dep = None
|
|
if required and request.state.auth_dep is None:
|
|
raise NotAuthorizedError()
|
|
return request.state.auth_dep
|
|
|
|
|
|
def from_request_optional(request: Request) -> AuthenticationData | None:
|
|
return _from_request(request, required=False)
|
|
|
|
|
|
def from_request(request: Request) -> AuthenticationData:
|
|
return _from_request(request, required=True)
|