Files
zqq61 4f53889a8e feat: Airwallex 发卡管理后台完整实现
- 后端: FastAPI + SQLAlchemy + SQLite, JWT认证, 代理支持的AirwallexClient
- 前端: React 18 + Vite + Ant Design 5, 中文界面
- 功能: 卡片管理, 持卡人管理, 交易记录, API令牌, 系统设置, 审计日志
- 第三方API: X-API-Key认证, 权限控制
- Docker部署: docker-compose编排前后端
2026-03-15 23:05:08 +08:00

392 lines
16 KiB
Python

"""
Base API class for the Airwallex SDK.
"""
import asyncio
import logging
from typing import (
Any,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
Coroutine,
Generator,
AsyncGenerator,
Generic,
cast,
get_args,
get_origin
)
from ..models.base import AirwallexModel
from ..utils import snake_to_pascal_case
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=AirwallexModel)
ClientType = TypeVar("ClientType")
class AirwallexAPIBase(Generic[T]):
"""
Base class for Airwallex API endpoints.
This class provides standard CRUD methods and pagination handling
for all API endpoints. It serves as the foundation for specific
API endpoint implementations.
"""
endpoint: str = ""
model_class: Type[T] = cast(Type[T], AirwallexModel) # Will be overridden by subclasses
def __init__(
self,
*,
client: Any,
data: Optional[Dict[str, Any]] = None,
parent: Optional["AirwallexAPIBase"] = None,
parent_path: Optional[str] = None # e.g. "/api/v1/accounts/{account_id}"
) -> None:
self.client = client
self.data: Dict[str, Any] = data or {}
self.parent: Optional["AirwallexAPIBase"] = parent
self.parent_path: Optional[str] = parent_path
def __getattr__(self, item: str) -> Any:
# If the attribute exists in the model's data, return it.
if item in self.data:
return self.data[item]
# If the model has an ID, we can try to access a subresource
if not getattr(self, 'id', None):
raise AttributeError(f"No such attribute '{item}' in {self.__class__.__name__} context.")
# Try to load an API module for this attribute.
try:
from importlib import import_module
base_package = self.client.__class__.__module__.split(".")[0]
module = import_module(f"{base_package}.api.{item.lower()}")
# We define modules in pascal case, but refer to them as attributes in snake case.
api_class = getattr(module, snake_to_pascal_case(item))
return api_class(client=self.client, parent=self, parent_path=self._build_url(self.id))
except (ModuleNotFoundError, AttributeError):
# Split snake case item into a path e.g. report_details -> report/details
path_item = "/".join(item.split("_"))
# If no module exists for this attribute and model has an id, then assume the attribute
# is a valid endpoint suffix. Return a callable that makes a GET request.
def dynamic_endpoint(*args, **kwargs):
"""
:param dataframe: If True, return a DataFrame instead of a list of dictionaries.
"""
url = self._build_url(resource_id=self.id, suffix=path_item)
if not str(self.client.__class__.__name__).startswith('Async'):
response = self.client._request("GET", url, params=kwargs)
data = self._parse_response_data(response.json())
return data
else:
async def async_endpoint():
response = await self.client._request("GET", url, params=kwargs)
data = self._parse_response_data(response.json())
return data
return async_endpoint()
return dynamic_endpoint
def __repr__(self) -> str:
identifier = self.data.get("id", "unknown")
return f"<{self.__class__.__name__} id={identifier}>"
def __call__(self, resource_id: Optional[Any] = None, **kwargs: Any) -> Union[
T,
Generator[T, None, None],
Coroutine[Any, Any, AsyncGenerator[T, None]]
]:
"""
If a resource_id is provided, fetch and return a single instance;
otherwise, return a generator that yields resources one by one.
For sync clients, returns a Generator[T, None, None].
For async clients, returns a coroutine that yields an AsyncGenerator[T, None].
"""
if resource_id is not None:
if not str(self.client.__class__.__name__).startswith('Async'):
return self.fetch(resource_id)
else:
return self.fetch_async(resource_id)
else:
if not str(self.client.__class__.__name__).startswith('Async'):
return self.paginate_generator(**kwargs)
else:
return self.paginate_async_generator(**kwargs)
@classmethod
def get_endpoint(cls) -> str:
"""Get the API endpoint path."""
return cls.endpoint if cls.endpoint else cls.__name__.lower()
@staticmethod
def _parse_response_data(
response: Union[List[Any], Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Parse response data into a list of dictionaries."""
# If response is a dictionary with an 'items' key, it's paginated
if isinstance(response, dict) and 'items' in response:
return response['items']
# If response is a dictionary, wrap it in a list
if isinstance(response, dict):
return [response]
# If response is already a list, return it
return response
@property
def base_path(self) -> str:
"""Get the base API path for this endpoint."""
if self.parent_path:
return f"{self.parent_path}/{self.get_endpoint()}"
return f"/api/v1/{self.get_endpoint()}"
def _build_url(self, resource_id: Optional[Any] = None, suffix: str = "") -> str:
"""Build a URL for a specific resource."""
url = self.base_path
if resource_id is not None:
url = f"{url}/{resource_id}"
if suffix:
url = f"{url}/{suffix}"
return url
def show(self, indent: int = 0, indent_step: int = 2) -> str:
"""
Return a nicely formatted string representation of this model and its data.
"""
pad = " " * indent
lines = [f"{pad}{self.__class__.__name__}:"]
for key, value in self.data.items():
if isinstance(value, AirwallexAPIBase):
lines.append(f"{pad}{' ' * indent_step}{key}:")
lines.append(value.show(indent + indent_step, indent_step))
elif isinstance(value, list):
lines.append(f"{pad}{' ' * indent_step}{key}: [")
for item in value:
if isinstance(item, AirwallexAPIBase):
lines.append(item.show(indent + indent_step, indent_step))
else:
lines.append(f"{pad}{' ' * (indent_step * 2)}{item}")
lines.append(f"{pad}{' ' * indent_step}]")
else:
lines.append(f"{pad}{' ' * indent_step}{key}: {value}")
return "\n".join(lines)
def to_model(self) -> T:
"""Convert the raw data to a Pydantic model."""
if not self.data:
raise ValueError("No data available to convert to a model")
return self.model_class.from_api_response(self.data)
# Synchronous API methods
def fetch(self, resource_id: Any) -> T:
"""Fetch a single resource by ID."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
url = self._build_url(resource_id)
response = self.client._request("GET", url)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
def list(self, **params: Any) -> List[T]:
"""List resources with optional filtering parameters."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
url = self._build_url()
response = self.client._request("GET", url, params=params)
data_list = self._parse_response_data(response.json())
return [self.model_class.from_api_response(item) for item in data_list]
def create(self, payload: Union[Dict[str, Any], T]) -> T:
"""Create a new resource."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url()
response = self.client._request("POST", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
def update(self, resource_id: Any, payload: Union[Dict[str, Any], T]) -> T:
"""Update an existing resource."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url(resource_id)
response = self.client._request("PUT", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
def delete(self, resource_id: Any) -> None:
"""Delete a resource."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
url = self._build_url(resource_id)
self.client._request("DELETE", url)
def paginate(self, stop_page: Optional[int] = None, **params: Any) -> Generator[T, None, None]:
"""
Generate items one by one from paginated results.
Args:
stop_page: The page number to stop at (optional).
**params: Filter parameters to pass to the API.
Yields:
T: Each item from the paginated results.
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
page_num = params.get("page_num", 1)
page_size = params.get("page_size", 100)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
url = self._build_url()
response = self.client._request("GET", url, params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
for item in items:
yield self.model_class.from_api_response(item)
if not has_more or not items:
break
page_num += 1
if stop_page and page_num > stop_page:
break
# Asynchronous API methods
async def fetch_async(self, resource_id: Any) -> T:
"""Fetch a single resource by ID asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
url = self._build_url(resource_id)
response = await self.client._request("GET", url)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
async def list_async(self, **params: Any) -> List[T]:
"""List resources with optional filtering parameters asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
url = self._build_url()
response = await self.client._request("GET", url, params=params)
data_list = self._parse_response_data(response.json())
return [self.model_class.from_api_response(item) for item in data_list]
async def create_async(self, payload: Union[Dict[str, Any], T]) -> T:
"""Create a new resource asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url()
response = await self.client._request("POST", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
async def update_async(self, resource_id: Any, payload: Union[Dict[str, Any], T]) -> T:
"""Update an existing resource asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url(resource_id)
response = await self.client._request("PUT", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
async def delete_async(self, resource_id: Any) -> None:
"""Delete a resource asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
url = self._build_url(resource_id)
await self.client._request("DELETE", url)
async def paginate_async(self, stop_page: Optional[int] = None, **params: Any) -> AsyncGenerator[T, None]:
"""
Generate items one by one from paginated results, asynchronously.
Args:
stop_page: The page number to stop at (optional).
**params: Filter parameters to pass to the API.
Yields:
T: Each item from the paginated results.
"""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
page_num = params.get("page_num", 1)
page_size = params.get("page_size", 100)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
url = self._build_url()
response = await self.client._request("GET", url, params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
for item in items:
yield self.model_class.from_api_response(item)
if not has_more or not items:
break
page_num += 1
if stop_page and page_num > stop_page:
break