Files
Airwallex/backend/app/auth.py
zqq61 4f53889a8e feat: Airwallex 发卡管理后台完整实现
- 后端: FastAPI + SQLAlchemy + SQLite, JWT认证, 代理支持的AirwallexClient
- 前端: React 18 + Vite + Ant Design 5, 中文界面
- 功能: 卡片管理, 持卡人管理, 交易记录, API令牌, 系统设置, 审计日志
- 第三方API: X-API-Key认证, 权限控制
- Docker部署: docker-compose编排前后端
2026-03-15 23:05:08 +08:00

75 lines
2.3 KiB
Python

"""Authentication utilities: JWT tokens, password hashing, and dependencies."""
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from .config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
class AdminUser(BaseModel):
"""Represents the authenticated admin user."""
username: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Create a JWT access token.
Args:
data: Payload data to encode in the token.
expires_delta: Optional custom expiration time.
Returns:
Encoded JWT string.
"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.JWT_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> AdminUser:
"""FastAPI dependency that extracts and validates the current user from a JWT token.
Raises:
HTTPException: If the token is invalid or missing required claims.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
credentials.credentials,
settings.SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM],
)
username: str | None = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
return AdminUser(username=username)