- 后端: FastAPI + SQLAlchemy + SQLite, JWT认证, 代理支持的AirwallexClient - 前端: React 18 + Vite + Ant Design 5, 中文界面 - 功能: 卡片管理, 持卡人管理, 交易记录, API令牌, 系统设置, 审计日志 - 第三方API: X-API-Key认证, 权限控制 - Docker部署: docker-compose编排前后端
392 lines
16 KiB
Python
392 lines
16 KiB
Python
"""
|
|
Base API class for the Airwallex SDK.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from typing import (
|
|
Any,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
Coroutine,
|
|
Generator,
|
|
AsyncGenerator,
|
|
Generic,
|
|
cast,
|
|
get_args,
|
|
get_origin
|
|
)
|
|
|
|
from ..models.base import AirwallexModel
|
|
from ..utils import snake_to_pascal_case
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
T = TypeVar("T", bound=AirwallexModel)
|
|
ClientType = TypeVar("ClientType")
|
|
|
|
class AirwallexAPIBase(Generic[T]):
|
|
"""
|
|
Base class for Airwallex API endpoints.
|
|
|
|
This class provides standard CRUD methods and pagination handling
|
|
for all API endpoints. It serves as the foundation for specific
|
|
API endpoint implementations.
|
|
"""
|
|
endpoint: str = ""
|
|
model_class: Type[T] = cast(Type[T], AirwallexModel) # Will be overridden by subclasses
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
client: Any,
|
|
data: Optional[Dict[str, Any]] = None,
|
|
parent: Optional["AirwallexAPIBase"] = None,
|
|
parent_path: Optional[str] = None # e.g. "/api/v1/accounts/{account_id}"
|
|
) -> None:
|
|
self.client = client
|
|
self.data: Dict[str, Any] = data or {}
|
|
self.parent: Optional["AirwallexAPIBase"] = parent
|
|
self.parent_path: Optional[str] = parent_path
|
|
|
|
def __getattr__(self, item: str) -> Any:
|
|
# If the attribute exists in the model's data, return it.
|
|
if item in self.data:
|
|
return self.data[item]
|
|
|
|
# If the model has an ID, we can try to access a subresource
|
|
if not getattr(self, 'id', None):
|
|
raise AttributeError(f"No such attribute '{item}' in {self.__class__.__name__} context.")
|
|
|
|
# Try to load an API module for this attribute.
|
|
try:
|
|
from importlib import import_module
|
|
base_package = self.client.__class__.__module__.split(".")[0]
|
|
module = import_module(f"{base_package}.api.{item.lower()}")
|
|
# We define modules in pascal case, but refer to them as attributes in snake case.
|
|
api_class = getattr(module, snake_to_pascal_case(item))
|
|
return api_class(client=self.client, parent=self, parent_path=self._build_url(self.id))
|
|
except (ModuleNotFoundError, AttributeError):
|
|
# Split snake case item into a path e.g. report_details -> report/details
|
|
path_item = "/".join(item.split("_"))
|
|
|
|
# If no module exists for this attribute and model has an id, then assume the attribute
|
|
# is a valid endpoint suffix. Return a callable that makes a GET request.
|
|
def dynamic_endpoint(*args, **kwargs):
|
|
"""
|
|
:param dataframe: If True, return a DataFrame instead of a list of dictionaries.
|
|
"""
|
|
url = self._build_url(resource_id=self.id, suffix=path_item)
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
response = self.client._request("GET", url, params=kwargs)
|
|
data = self._parse_response_data(response.json())
|
|
return data
|
|
else:
|
|
async def async_endpoint():
|
|
response = await self.client._request("GET", url, params=kwargs)
|
|
data = self._parse_response_data(response.json())
|
|
return data
|
|
return async_endpoint()
|
|
return dynamic_endpoint
|
|
|
|
def __repr__(self) -> str:
|
|
identifier = self.data.get("id", "unknown")
|
|
return f"<{self.__class__.__name__} id={identifier}>"
|
|
|
|
def __call__(self, resource_id: Optional[Any] = None, **kwargs: Any) -> Union[
|
|
T,
|
|
Generator[T, None, None],
|
|
Coroutine[Any, Any, AsyncGenerator[T, None]]
|
|
]:
|
|
"""
|
|
If a resource_id is provided, fetch and return a single instance;
|
|
otherwise, return a generator that yields resources one by one.
|
|
|
|
For sync clients, returns a Generator[T, None, None].
|
|
For async clients, returns a coroutine that yields an AsyncGenerator[T, None].
|
|
"""
|
|
if resource_id is not None:
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
return self.fetch(resource_id)
|
|
else:
|
|
return self.fetch_async(resource_id)
|
|
else:
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
return self.paginate_generator(**kwargs)
|
|
else:
|
|
return self.paginate_async_generator(**kwargs)
|
|
|
|
@classmethod
|
|
def get_endpoint(cls) -> str:
|
|
"""Get the API endpoint path."""
|
|
return cls.endpoint if cls.endpoint else cls.__name__.lower()
|
|
|
|
@staticmethod
|
|
def _parse_response_data(
|
|
response: Union[List[Any], Dict[str, Any]]
|
|
) -> List[Dict[str, Any]]:
|
|
"""Parse response data into a list of dictionaries."""
|
|
# If response is a dictionary with an 'items' key, it's paginated
|
|
if isinstance(response, dict) and 'items' in response:
|
|
return response['items']
|
|
# If response is a dictionary, wrap it in a list
|
|
if isinstance(response, dict):
|
|
return [response]
|
|
# If response is already a list, return it
|
|
return response
|
|
|
|
@property
|
|
def base_path(self) -> str:
|
|
"""Get the base API path for this endpoint."""
|
|
if self.parent_path:
|
|
return f"{self.parent_path}/{self.get_endpoint()}"
|
|
return f"/api/v1/{self.get_endpoint()}"
|
|
|
|
def _build_url(self, resource_id: Optional[Any] = None, suffix: str = "") -> str:
|
|
"""Build a URL for a specific resource."""
|
|
url = self.base_path
|
|
if resource_id is not None:
|
|
url = f"{url}/{resource_id}"
|
|
if suffix:
|
|
url = f"{url}/{suffix}"
|
|
return url
|
|
|
|
def show(self, indent: int = 0, indent_step: int = 2) -> str:
|
|
"""
|
|
Return a nicely formatted string representation of this model and its data.
|
|
"""
|
|
pad = " " * indent
|
|
lines = [f"{pad}{self.__class__.__name__}:"]
|
|
for key, value in self.data.items():
|
|
if isinstance(value, AirwallexAPIBase):
|
|
lines.append(f"{pad}{' ' * indent_step}{key}:")
|
|
lines.append(value.show(indent + indent_step, indent_step))
|
|
elif isinstance(value, list):
|
|
lines.append(f"{pad}{' ' * indent_step}{key}: [")
|
|
for item in value:
|
|
if isinstance(item, AirwallexAPIBase):
|
|
lines.append(item.show(indent + indent_step, indent_step))
|
|
else:
|
|
lines.append(f"{pad}{' ' * (indent_step * 2)}{item}")
|
|
lines.append(f"{pad}{' ' * indent_step}]")
|
|
else:
|
|
lines.append(f"{pad}{' ' * indent_step}{key}: {value}")
|
|
return "\n".join(lines)
|
|
|
|
def to_model(self) -> T:
|
|
"""Convert the raw data to a Pydantic model."""
|
|
if not self.data:
|
|
raise ValueError("No data available to convert to a model")
|
|
return self.model_class.from_api_response(self.data)
|
|
|
|
# Synchronous API methods
|
|
|
|
def fetch(self, resource_id: Any) -> T:
|
|
"""Fetch a single resource by ID."""
|
|
if str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires a sync client.")
|
|
url = self._build_url(resource_id)
|
|
response = self.client._request("GET", url)
|
|
data = self._parse_response_data(response.json())
|
|
# If the returned data is a list, take the first item.
|
|
if isinstance(data, list):
|
|
data = data[0] if data else {}
|
|
return self.model_class.from_api_response(data)
|
|
|
|
def list(self, **params: Any) -> List[T]:
|
|
"""List resources with optional filtering parameters."""
|
|
if str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires a sync client.")
|
|
url = self._build_url()
|
|
response = self.client._request("GET", url, params=params)
|
|
data_list = self._parse_response_data(response.json())
|
|
return [self.model_class.from_api_response(item) for item in data_list]
|
|
|
|
def create(self, payload: Union[Dict[str, Any], T]) -> T:
|
|
"""Create a new resource."""
|
|
if str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires a sync client.")
|
|
|
|
payload_dict = payload
|
|
# Convert Pydantic model to dict if needed
|
|
if isinstance(payload, AirwallexModel):
|
|
payload_dict = payload.to_api_dict()
|
|
|
|
url = self._build_url()
|
|
response = self.client._request("POST", url, json=payload_dict)
|
|
data = self._parse_response_data(response.json())
|
|
# If the returned data is a list, take the first item.
|
|
if isinstance(data, list):
|
|
data = data[0] if data else {}
|
|
return self.model_class.from_api_response(data)
|
|
|
|
def update(self, resource_id: Any, payload: Union[Dict[str, Any], T]) -> T:
|
|
"""Update an existing resource."""
|
|
if str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires a sync client.")
|
|
|
|
payload_dict = payload
|
|
# Convert Pydantic model to dict if needed
|
|
if isinstance(payload, AirwallexModel):
|
|
payload_dict = payload.to_api_dict()
|
|
|
|
url = self._build_url(resource_id)
|
|
response = self.client._request("PUT", url, json=payload_dict)
|
|
data = self._parse_response_data(response.json())
|
|
# If the returned data is a list, take the first item.
|
|
if isinstance(data, list):
|
|
data = data[0] if data else {}
|
|
return self.model_class.from_api_response(data)
|
|
|
|
def delete(self, resource_id: Any) -> None:
|
|
"""Delete a resource."""
|
|
if str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires a sync client.")
|
|
url = self._build_url(resource_id)
|
|
self.client._request("DELETE", url)
|
|
|
|
def paginate(self, stop_page: Optional[int] = None, **params: Any) -> Generator[T, None, None]:
|
|
"""
|
|
Generate items one by one from paginated results.
|
|
|
|
Args:
|
|
stop_page: The page number to stop at (optional).
|
|
**params: Filter parameters to pass to the API.
|
|
|
|
Yields:
|
|
T: Each item from the paginated results.
|
|
"""
|
|
if str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires a sync client.")
|
|
|
|
page_num = params.get("page_num", 1)
|
|
page_size = params.get("page_size", 100)
|
|
|
|
while True:
|
|
params["page_num"] = page_num
|
|
params["page_size"] = page_size
|
|
url = self._build_url()
|
|
response = self.client._request("GET", url, params=params)
|
|
data = response.json()
|
|
|
|
items = data.get("items", [])
|
|
has_more = data.get("has_more", False)
|
|
|
|
for item in items:
|
|
yield self.model_class.from_api_response(item)
|
|
|
|
if not has_more or not items:
|
|
break
|
|
|
|
page_num += 1
|
|
|
|
if stop_page and page_num > stop_page:
|
|
break
|
|
|
|
|
|
# Asynchronous API methods
|
|
|
|
async def fetch_async(self, resource_id: Any) -> T:
|
|
"""Fetch a single resource by ID asynchronously."""
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires an async client.")
|
|
url = self._build_url(resource_id)
|
|
response = await self.client._request("GET", url)
|
|
data = self._parse_response_data(response.json())
|
|
# If the returned data is a list, take the first item.
|
|
if isinstance(data, list):
|
|
data = data[0] if data else {}
|
|
return self.model_class.from_api_response(data)
|
|
|
|
async def list_async(self, **params: Any) -> List[T]:
|
|
"""List resources with optional filtering parameters asynchronously."""
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires an async client.")
|
|
url = self._build_url()
|
|
response = await self.client._request("GET", url, params=params)
|
|
data_list = self._parse_response_data(response.json())
|
|
return [self.model_class.from_api_response(item) for item in data_list]
|
|
|
|
async def create_async(self, payload: Union[Dict[str, Any], T]) -> T:
|
|
"""Create a new resource asynchronously."""
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires an async client.")
|
|
|
|
payload_dict = payload
|
|
# Convert Pydantic model to dict if needed
|
|
if isinstance(payload, AirwallexModel):
|
|
payload_dict = payload.to_api_dict()
|
|
|
|
url = self._build_url()
|
|
response = await self.client._request("POST", url, json=payload_dict)
|
|
data = self._parse_response_data(response.json())
|
|
# If the returned data is a list, take the first item.
|
|
if isinstance(data, list):
|
|
data = data[0] if data else {}
|
|
return self.model_class.from_api_response(data)
|
|
|
|
async def update_async(self, resource_id: Any, payload: Union[Dict[str, Any], T]) -> T:
|
|
"""Update an existing resource asynchronously."""
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires an async client.")
|
|
|
|
payload_dict = payload
|
|
# Convert Pydantic model to dict if needed
|
|
if isinstance(payload, AirwallexModel):
|
|
payload_dict = payload.to_api_dict()
|
|
|
|
url = self._build_url(resource_id)
|
|
response = await self.client._request("PUT", url, json=payload_dict)
|
|
data = self._parse_response_data(response.json())
|
|
# If the returned data is a list, take the first item.
|
|
if isinstance(data, list):
|
|
data = data[0] if data else {}
|
|
return self.model_class.from_api_response(data)
|
|
|
|
async def delete_async(self, resource_id: Any) -> None:
|
|
"""Delete a resource asynchronously."""
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires an async client.")
|
|
url = self._build_url(resource_id)
|
|
await self.client._request("DELETE", url)
|
|
|
|
async def paginate_async(self, stop_page: Optional[int] = None, **params: Any) -> AsyncGenerator[T, None]:
|
|
"""
|
|
Generate items one by one from paginated results, asynchronously.
|
|
|
|
Args:
|
|
stop_page: The page number to stop at (optional).
|
|
**params: Filter parameters to pass to the API.
|
|
|
|
Yields:
|
|
T: Each item from the paginated results.
|
|
"""
|
|
if not str(self.client.__class__.__name__).startswith('Async'):
|
|
raise ValueError("This method requires an async client.")
|
|
|
|
page_num = params.get("page_num", 1)
|
|
page_size = params.get("page_size", 100)
|
|
|
|
while True:
|
|
params["page_num"] = page_num
|
|
params["page_size"] = page_size
|
|
url = self._build_url()
|
|
response = await self.client._request("GET", url, params=params)
|
|
data = response.json()
|
|
|
|
items = data.get("items", [])
|
|
has_more = data.get("has_more", False)
|
|
|
|
for item in items:
|
|
yield self.model_class.from_api_response(item)
|
|
|
|
if not has_more or not items:
|
|
break
|
|
|
|
page_num += 1
|
|
|
|
if stop_page and page_num > stop_page:
|
|
break |