diff --git a/example.env b/example.env index 9b925a5..e443860 100644 --- a/example.env +++ b/example.env @@ -12,7 +12,7 @@ POSTGRES_TEST_DB=cars_test_db DATABASE_HOST_URL="postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432" DATABASE_URL="${DATABASE_HOST_URL}/${POSTGRES_DB}?sslmode=disable" -# Super token for trusted services. Send it in the api_token request header. +# Super token for trusted services. Send it as Authorization: Bearer ${API_TOKEN}. #API_TOKEN=change-me # Directory with latest camera snapshots named as {camera_id}.jpg diff --git a/src/dependencies.py b/src/dependencies.py index bf2c6e3..3beee06 100644 --- a/src/dependencies.py +++ b/src/dependencies.py @@ -1,7 +1,6 @@ """ Зависимости FastAPI: - - get_current_user — принимает API_TOKEN из заголовка api_token - или декодирует JWT, возвращает User из БД + - get_current_user — принимает API_TOKEN или JWT через Authorization: Bearer - require — фабрика зависимостей для проверки permissions - BASE_USER_PERMISSIONS — хардкод прав для роли 'user' """ @@ -14,7 +13,7 @@ from typing import Annotated import jwt -from fastapi import Depends, Header, HTTPException, status +from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from passlib.context import CryptContext from sqlalchemy.orm import Session @@ -29,7 +28,6 @@ JWT_SECRET: str = os.environ.get("JWT_SECRET", "CHANGE_ME_IN_PRODUCTION") JWT_ALGORITHM: str = "HS256" JWT_EXPIRE_SECONDS: int = int(os.environ.get("JWT_EXPIRE_SECONDS", 86400)) # 24 ч -API_TOKEN_HEADER_NAME = "api_token" API_TOKEN_USER_EMAIL = "api-token@parktrack.local" _API_TOKEN_AUTH_ATTR = "_authenticated_via_api_token" @@ -263,18 +261,19 @@ def get_effective_permissions(user: User) -> set[str]: def resolve_current_user( credentials: HTTPAuthorizationCredentials | None, db: Session, - api_token: str | None = None, ) -> User: - """Обязательная авторизация по API_TOKEN или JWT.""" - if _api_token_matches(api_token): - return _get_api_token_user(db) - + """Обязательная авторизация по Bearer API_TOKEN или Bearer JWT.""" if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_description": "Missing or invalid access token"}, ) - user_id = decode_access_token(credentials.credentials) + + token = credentials.credentials + if _api_token_matches(token): + return _get_api_token_user(db) + + user_id = decode_access_token(token) user = db.query(User).filter(User.user_id == user_id).one_or_none() if user is None or not user.is_active: raise HTTPException( @@ -287,10 +286,9 @@ def resolve_current_user( def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(_bearer_scheme)], db: Annotated[Session, Depends(get_db)], - api_token: Annotated[str | None, Header(alias=API_TOKEN_HEADER_NAME)] = None, ) -> User: """Обязательная авторизация. Бросает 401, если токен отсутствует или невалиден.""" - return resolve_current_user(credentials=credentials, db=db, api_token=api_token) + return resolve_current_user(credentials=credentials, db=db) CurrentUser = Annotated[User, Depends(get_current_user)] diff --git a/src/routers/zones.py b/src/routers/zones.py index 7e617f1..117435d 100644 --- a/src/routers/zones.py +++ b/src/routers/zones.py @@ -3,14 +3,14 @@ from datetime import datetime, timezone from typing import Annotated -from fastapi import APIRouter, Depends, Header, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import text from sqlalchemy.orm import Session from ..database import get_db from ..db_models import Camera, ParkingZone, Partner, User -from ..dependencies import API_TOKEN_HEADER_NAME, get_effective_permissions, require, resolve_current_user +from ..dependencies import get_effective_permissions, require, resolve_current_user from ..schemas.zones import ( CreateZoneRequest, UpdateZoneRequest, @@ -102,10 +102,9 @@ def list_zones( top: int = 100, offset: int = 0, credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(HTTPBearer(auto_error=False))] = None, - api_token: Annotated[str | None, Header(alias=API_TOKEN_HEADER_NAME)] = None, ): if view != "map": - current_user = resolve_current_user(credentials=credentials, db=db, api_token=api_token) + current_user = resolve_current_user(credentials=credentials, db=db) if "zones.view" not in get_effective_permissions(current_user): raise HTTPException( status.HTTP_403_FORBIDDEN,