Files
Airwallex/backend/app/routers/tokens.py
zqq61 4f53889a8e feat: Airwallex 发卡管理后台完整实现
- 后端: FastAPI + SQLAlchemy + SQLite, JWT认证, 代理支持的AirwallexClient
- 前端: React 18 + Vite + Ant Design 5, 中文界面
- 功能: 卡片管理, 持卡人管理, 交易记录, API令牌, 系统设置, 审计日志
- 第三方API: X-API-Key认证, 权限控制
- Docker部署: docker-compose编排前后端
2026-03-15 23:05:08 +08:00

115 lines
3.3 KiB
Python

"""API token management router."""
import secrets
import json
from datetime import datetime, timezone, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session
from app.auth import get_current_user, AdminUser
from app.database import get_db
from app.models.db_models import ApiToken
from app.services.audit_log import create_audit_log
router = APIRouter(prefix="/api/tokens", tags=["tokens"])
@router.get("")
def list_tokens(
db: Session = Depends(get_db),
user: AdminUser = Depends(get_current_user),
):
"""List all API tokens."""
tokens = db.query(ApiToken).order_by(ApiToken.created_at.desc()).all()
return [
{
"id": t.id,
"name": t.name,
"token": t.token[:8] + "..." + t.token[-4:] if t.token else "",
"permissions": json.loads(t.permissions) if t.permissions else [],
"is_active": t.is_active,
"created_at": t.created_at.isoformat() if t.created_at else None,
"expires_at": t.expires_at.isoformat() if t.expires_at else None,
"last_used_at": t.last_used_at.isoformat() if t.last_used_at else None,
}
for t in tokens
]
@router.post("")
def create_token(
data: dict,
request: Request,
db: Session = Depends(get_db),
user: AdminUser = Depends(get_current_user),
):
"""Create a new API token."""
name = data.get("name", "Unnamed Token")
permissions = data.get("permissions", [])
expires_in_days = data.get("expires_in_days")
raw_token = secrets.token_urlsafe(32)
expires_at = None
if expires_in_days:
expires_at = datetime.now(timezone.utc) + timedelta(days=int(expires_in_days))
token = ApiToken(
name=name,
token=raw_token,
permissions=json.dumps(permissions),
is_active=True,
created_at=datetime.now(timezone.utc),
expires_at=expires_at,
)
db.add(token)
db.commit()
db.refresh(token)
create_audit_log(
db,
action="create_token",
resource_type="api_token",
resource_id=str(token.id),
operator=user.username,
ip_address=request.client.host if request.client else "",
details=f"Created token '{name}' with permissions {permissions}",
)
return {
"id": token.id,
"name": token.name,
"token": raw_token, # Only shown once on creation
"permissions": permissions,
"is_active": True,
"created_at": token.created_at.isoformat(),
"expires_at": token.expires_at.isoformat() if token.expires_at else None,
}
@router.delete("/{token_id}")
def delete_token(
token_id: int,
request: Request,
db: Session = Depends(get_db),
user: AdminUser = Depends(get_current_user),
):
"""Revoke an API token."""
token = db.query(ApiToken).filter(ApiToken.id == token_id).first()
if not token:
raise HTTPException(status_code=404, detail="Token not found")
token.is_active = False
db.commit()
create_audit_log(
db,
action="revoke_token",
resource_type="api_token",
resource_id=str(token_id),
operator=user.username,
ip_address=request.client.host if request.client else "",
)
return {"message": "Token revoked"}