Version 0.4.0
All checks were successful
Run linters on applied template / Python 3.13 lint and build (push) Successful in 1m40s
All checks were successful
Run linters on applied template / Python 3.13 lint and build (push) Successful in 1m40s
Changes: - put ObservabilityMiddleware before ExceptionHandlerMiddleware to avoid repetative code - add application startup and last metrics update metrics along with CPU usage metric and threads count - move host and port to new uvicorn section at config along with new reload and forwarded_allow_ips - add request_id and remove trace_id/span_id generation if tracing is disabled - move logging logic from utils to observability - pass trace_id/span_id in HEX form
This commit is contained in:
46
{{project_slug}}/dependencies/auth_dep.py.jinja
Normal file
46
{{project_slug}}/dependencies/auth_dep.py.jinja
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Authentication dependency function is defined here."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import jwt
|
||||
from fastapi import Request
|
||||
|
||||
from {{project_slug}}.exceptions.auth import NotAuthorizedError
|
||||
|
||||
from . import logger_dep
|
||||
|
||||
|
||||
@dataclass
|
||||
class AuthenticationData:
|
||||
api_key: str | None
|
||||
jwt_payload: dict | None
|
||||
|
||||
|
||||
def _from_request(request: Request, required: bool = True) -> AuthenticationData | None:
|
||||
if not hasattr(request.state, "auth_dep"):
|
||||
auth = AuthenticationData(None, None)
|
||||
if (value := request.headers.get("X-API-Key")) is not None:
|
||||
auth.api_key = value
|
||||
if (value := request.headers.get("Authorization")) is not None and value.startswith("Bearer "):
|
||||
value = value[7:]
|
||||
try:
|
||||
auth.jwt_payload = jwt.decode(value, algorithms=["HS256"], options={"verify_signature": False})
|
||||
except Exception: # pylint: disable=broad-except
|
||||
logger = logger_dep.from_request(request)
|
||||
logger.warning("failed to parse Authorization header as jwt", value=value)
|
||||
logger.debug("failed to parse Authorization header as jwt", exc_info=True)
|
||||
if auth.api_key is not None or auth.jwt_payload is not None:
|
||||
request.state.auth_dep = auth
|
||||
else:
|
||||
request.state.auth_dep = None
|
||||
if required and request.state.auth_dep is None:
|
||||
raise NotAuthorizedError()
|
||||
return request.state.auth_dep
|
||||
|
||||
|
||||
def from_request_optional(request: Request) -> AuthenticationData | None:
|
||||
return _from_request(request, required=False)
|
||||
|
||||
|
||||
def from_request(request: Request) -> AuthenticationData:
|
||||
return _from_request(request, required=True)
|
||||
Reference in New Issue
Block a user