"""API token management router.""" import secrets import json from datetime import datetime, timezone, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from app.auth import get_current_user, AdminUser from app.database import get_db from app.models.db_models import ApiToken from app.services.audit_log import create_audit_log router = APIRouter(prefix="/api/tokens", tags=["tokens"]) @router.get("") def list_tokens( db: Session = Depends(get_db), user: AdminUser = Depends(get_current_user), ): """List all API tokens.""" tokens = db.query(ApiToken).order_by(ApiToken.created_at.desc()).all() return [ { "id": t.id, "name": t.name, "token": t.token[:8] + "..." + t.token[-4:] if t.token else "", "permissions": json.loads(t.permissions) if t.permissions else [], "is_active": t.is_active, "created_at": t.created_at.isoformat() if t.created_at else None, "expires_at": t.expires_at.isoformat() if t.expires_at else None, "last_used_at": t.last_used_at.isoformat() if t.last_used_at else None, } for t in tokens ] @router.post("") def create_token( data: dict, request: Request, db: Session = Depends(get_db), user: AdminUser = Depends(get_current_user), ): """Create a new API token.""" name = data.get("name", "Unnamed Token") permissions = data.get("permissions", []) expires_in_days = data.get("expires_in_days") raw_token = secrets.token_urlsafe(32) expires_at = None if expires_in_days: expires_at = datetime.now(timezone.utc) + timedelta(days=int(expires_in_days)) token = ApiToken( name=name, token=raw_token, permissions=json.dumps(permissions), is_active=True, created_at=datetime.now(timezone.utc), expires_at=expires_at, ) db.add(token) db.commit() db.refresh(token) create_audit_log( db, action="create_token", resource_type="api_token", resource_id=str(token.id), operator=user.username, ip_address=request.client.host if request.client else "", details=f"Created token '{name}' with permissions {permissions}", ) return { "id": token.id, "name": token.name, "token": raw_token, # Only shown once on creation "permissions": permissions, "is_active": True, "created_at": token.created_at.isoformat(), "expires_at": token.expires_at.isoformat() if token.expires_at else None, } @router.delete("/{token_id}") def delete_token( token_id: int, request: Request, db: Session = Depends(get_db), user: AdminUser = Depends(get_current_user), ): """Revoke an API token.""" token = db.query(ApiToken).filter(ApiToken.id == token_id).first() if not token: raise HTTPException(status_code=404, detail="Token not found") token.is_active = False db.commit() create_audit_log( db, action="revoke_token", resource_type="api_token", resource_id=str(token_id), operator=user.username, ip_address=request.client.host if request.client else "", ) return {"message": "Token revoked"}