feat: Airwallex 发卡管理后台完整实现

- 后端: FastAPI + SQLAlchemy + SQLite, JWT认证, 代理支持的AirwallexClient
- 前端: React 18 + Vite + Ant Design 5, 中文界面
- 功能: 卡片管理, 持卡人管理, 交易记录, API令牌, 系统设置, 审计日志
- 第三方API: X-API-Key认证, 权限控制
- Docker部署: docker-compose编排前后端
This commit is contained in:
zqq61
2026-03-15 23:05:08 +08:00
commit 4f53889a8e
98 changed files with 10847 additions and 0 deletions

View File

@@ -0,0 +1,37 @@
"""
API modules for the Airwallex SDK.
"""
from .base import AirwallexAPIBase
from .account import Account
from .payment import Payment
from .beneficiary import Beneficiary
from .invoice import Invoice
from .financial_transaction import FinancialTransaction
from .account_detail import AccountDetail
# Issuing API
from .issuing_authorization import IssuingAuthorization
from .issuing_cardholder import IssuingCardholder
from .issuing_card import IssuingCard
from .issuing_digital_wallet_token import IssuingDigitalWalletToken
from .issuing_transaction_dispute import IssuingTransactionDispute
from .issuing_transaction import IssuingTransaction
from .issuing_config import IssuingConfig
__all__ = [
"AirwallexAPIBase",
"Account",
"Payment",
"Beneficiary",
"Invoice",
"FinancialTransaction",
"AccountDetail",
# Issuing API
"IssuingAuthorization",
"IssuingCardholder",
"IssuingCard",
"IssuingDigitalWalletToken",
"IssuingTransactionDispute",
"IssuingTransaction",
"IssuingConfig",
]

View File

@@ -0,0 +1,107 @@
"""
Airwallex Account API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, cast
from ..models.account import Account, AccountCreateRequest, AccountUpdateRequest
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Account)
class Account(AirwallexAPIBase[Account]):
"""
Operations for Airwallex accounts.
Accounts represent the global accounts that can hold balances
in multiple currencies.
"""
endpoint = "accounts"
model_class = cast(Type[Account], Account)
def fetch_balance(self, account_id: str) -> Account:
"""
Fetch the balance for a specific account.
Args:
account_id: The ID of the account to fetch the balance for.
Returns:
Account: Account with balance information.
"""
url = self._build_url(account_id, "balance")
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
data = response.json()
account_data = {"id": account_id, "balance": data}
return self.model_class.from_api_response(account_data)
else:
raise ValueError("Use fetch_balance_async for async clients")
async def fetch_balance_async(self, account_id: str) -> Account:
"""
Fetch the balance for a specific account asynchronously.
Args:
account_id: The ID of the account to fetch the balance for.
Returns:
Account: Account with balance information.
"""
url = self._build_url(account_id, "balance")
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
data = response.json()
account_data = {"id": account_id, "balance": data}
return self.model_class.from_api_response(account_data)
else:
raise ValueError("Use fetch_balance for sync clients")
def create_from_model(self, account: AccountCreateRequest) -> Account:
"""
Create a new account using a Pydantic model.
Args:
account: AccountCreateRequest model with account creation details.
Returns:
Account: The newly created account.
"""
return self.create(account)
async def create_from_model_async(self, account: AccountCreateRequest) -> Account:
"""
Create a new account using a Pydantic model asynchronously.
Args:
account: AccountCreateRequest model with account creation details.
Returns:
Account: The newly created account.
"""
return await self.create_async(account)
def update_from_model(self, account_id: str, account: AccountUpdateRequest) -> Account:
"""
Update an account using a Pydantic model.
Args:
account_id: The ID of the account to update.
account: AccountUpdateRequest model with account update details.
Returns:
Account: The updated account.
"""
return self.update(account_id, account)
async def update_from_model_async(self, account_id: str, account: AccountUpdateRequest) -> Account:
"""
Update an account using a Pydantic model asynchronously.
Args:
account_id: The ID of the account to update.
account: AccountUpdateRequest model with account update details.
Returns:
Account: The updated account.
"""
return await self.update_async(account_id, account)

View File

@@ -0,0 +1,469 @@
"""
Airwallex Account Detail API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.account_detail import (
AccountDetailModel, AccountCreateRequest, AccountUpdateRequest,
Amendment, AmendmentCreateRequest, WalletInfo, TermsAndConditionsRequest
)
from .base import AirwallexAPIBase
T = TypeVar("T", bound=AccountDetailModel)
class AccountDetail(AirwallexAPIBase[AccountDetailModel]):
"""
Operations for Airwallex account details.
Account details represent the complete information about an Airwallex account,
including business details, persons, and compliance information.
"""
endpoint = "accounts"
model_class = cast(Type[AccountDetailModel], AccountDetailModel)
def get_my_account(self) -> AccountDetailModel:
"""
Retrieve account details for your own Airwallex account.
Returns:
AccountDetailModel: Your account details.
"""
url = "/api/v1/account"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use get_my_account_async for async clients")
async def get_my_account_async(self) -> AccountDetailModel:
"""
Retrieve account details for your own Airwallex account asynchronously.
Returns:
AccountDetailModel: Your account details.
"""
url = "/api/v1/account"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use get_my_account for sync clients")
def get_amendment(self, amendment_id: str) -> Amendment:
"""
Get an account amendment.
Args:
amendment_id: The ID of the amendment to retrieve.
Returns:
Amendment: The amendment.
"""
url = f"/api/v1/account/amendments/{amendment_id}"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return Amendment.from_api_response(response.json())
else:
raise ValueError("Use get_amendment_async for async clients")
async def get_amendment_async(self, amendment_id: str) -> Amendment:
"""
Get an account amendment asynchronously.
Args:
amendment_id: The ID of the amendment to retrieve.
Returns:
Amendment: The amendment.
"""
url = f"/api/v1/account/amendments/{amendment_id}"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return Amendment.from_api_response(response.json())
else:
raise ValueError("Use get_amendment for sync clients")
def create_amendment(self, amendment: AmendmentCreateRequest) -> Amendment:
"""
Create an account amendment.
Args:
amendment: AmendmentCreateRequest model with amendment details.
Returns:
Amendment: The created amendment.
"""
url = "/api/v1/account/amendments/create"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=amendment.to_api_dict())
return Amendment.from_api_response(response.json())
else:
raise ValueError("Use create_amendment_async for async clients")
async def create_amendment_async(self, amendment: AmendmentCreateRequest) -> Amendment:
"""
Create an account amendment asynchronously.
Args:
amendment: AmendmentCreateRequest model with amendment details.
Returns:
Amendment: The created amendment.
"""
url = "/api/v1/account/amendments/create"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=amendment.to_api_dict())
return Amendment.from_api_response(response.json())
else:
raise ValueError("Use create_amendment for sync clients")
def get_wallet_info(self) -> WalletInfo:
"""
Retrieve account wallet information.
Returns:
WalletInfo: The wallet information.
"""
url = "/api/v1/account/wallet_info"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return WalletInfo.from_api_response(response.json())
else:
raise ValueError("Use get_wallet_info_async for async clients")
async def get_wallet_info_async(self) -> WalletInfo:
"""
Retrieve account wallet information asynchronously.
Returns:
WalletInfo: The wallet information.
"""
url = "/api/v1/account/wallet_info"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return WalletInfo.from_api_response(response.json())
else:
raise ValueError("Use get_wallet_info for sync clients")
def create_account(self, account: AccountCreateRequest) -> AccountDetailModel:
"""
Create a new Airwallex account.
Args:
account: AccountCreateRequest model with account creation details.
Returns:
AccountDetailModel: The created account.
"""
url = "/api/v1/accounts/create"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=account.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_account_async for async clients")
async def create_account_async(self, account: AccountCreateRequest) -> AccountDetailModel:
"""
Create a new Airwallex account asynchronously.
Args:
account: AccountCreateRequest model with account creation details.
Returns:
AccountDetailModel: The created account.
"""
url = "/api/v1/accounts/create"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=account.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_account for sync clients")
def update_account(self, account_id: str, account: AccountUpdateRequest) -> AccountDetailModel:
"""
Update a connected account.
Args:
account_id: The ID of the account to update.
account: AccountUpdateRequest model with account update details.
Returns:
AccountDetailModel: The updated account.
"""
url = f"/api/v1/accounts/{account_id}/update"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=account.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_account_async for async clients")
async def update_account_async(self, account_id: str, account: AccountUpdateRequest) -> AccountDetailModel:
"""
Update a connected account asynchronously.
Args:
account_id: The ID of the account to update.
account: AccountUpdateRequest model with account update details.
Returns:
AccountDetailModel: The updated account.
"""
url = f"/api/v1/accounts/{account_id}/update"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=account.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_account for sync clients")
def submit_account(self, account_id: str) -> AccountDetailModel:
"""
Submit account for activation.
Args:
account_id: The ID of the account to submit.
Returns:
AccountDetailModel: The submitted account.
"""
url = f"/api/v1/accounts/{account_id}/submit"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use submit_account_async for async clients")
async def submit_account_async(self, account_id: str) -> AccountDetailModel:
"""
Submit account for activation asynchronously.
Args:
account_id: The ID of the account to submit.
Returns:
AccountDetailModel: The submitted account.
"""
url = f"/api/v1/accounts/{account_id}/submit"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use submit_account for sync clients")
def get_account(self, account_id: str) -> AccountDetailModel:
"""
Get account by ID.
Args:
account_id: The ID of the account to retrieve.
Returns:
AccountDetailModel: The account.
"""
url = f"/api/v1/accounts/{account_id}"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use get_account_async for async clients")
async def get_account_async(self, account_id: str) -> AccountDetailModel:
"""
Get account by ID asynchronously.
Args:
account_id: The ID of the account to retrieve.
Returns:
AccountDetailModel: The account.
"""
url = f"/api/v1/accounts/{account_id}"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use get_account for sync clients")
def list_accounts(
self,
account_status: Optional[str] = None,
email: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
identifier: Optional[str] = None,
metadata: Optional[str] = None,
page_num: Optional[int] = None,
page_size: Optional[int] = None,
to_created_at: Optional[Union[str, datetime]] = None
) -> List[AccountDetailModel]:
"""
Get list of connected accounts with filtering options.
Args:
account_status: Filter by account status (CREATED, SUBMITTED, ACTION_REQUIRED, ACTIVE, SUSPENDED)
email: Filter by email
from_created_at: Filter by creation date (start, inclusive)
identifier: Filter by identifier
metadata: Filter by metadata (key:value format)
page_num: Page number (0-indexed)
page_size: Number of results per page (default 100, max 500)
to_created_at: Filter by creation date (end, inclusive)
Returns:
List[AccountDetailModel]: List of matching accounts.
"""
url = "/api/v1/accounts"
params = {}
if account_status:
params["account_status"] = account_status
if email:
params["email"] = email
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if identifier:
params["identifier"] = identifier
if metadata:
params["metadata"] = metadata
if page_num is not None:
params["page_num"] = page_num
if page_size is not None:
params["page_size"] = page_size
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url, params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_accounts_async for async clients")
async def list_accounts_async(
self,
account_status: Optional[str] = None,
email: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
identifier: Optional[str] = None,
metadata: Optional[str] = None,
page_num: Optional[int] = None,
page_size: Optional[int] = None,
to_created_at: Optional[Union[str, datetime]] = None
) -> List[AccountDetailModel]:
"""
Get list of connected accounts with filtering options asynchronously.
Args:
account_status: Filter by account status (CREATED, SUBMITTED, ACTION_REQUIRED, ACTIVE, SUSPENDED)
email: Filter by email
from_created_at: Filter by creation date (start, inclusive)
identifier: Filter by identifier
metadata: Filter by metadata (key:value format)
page_num: Page number (0-indexed)
page_size: Number of results per page (default 100, max 500)
to_created_at: Filter by creation date (end, inclusive)
Returns:
List[AccountDetailModel]: List of matching accounts.
"""
url = "/api/v1/accounts"
params = {}
if account_status:
params["account_status"] = account_status
if email:
params["email"] = email
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if identifier:
params["identifier"] = identifier
if metadata:
params["metadata"] = metadata
if page_num is not None:
params["page_num"] = page_num
if page_size is not None:
params["page_size"] = page_size
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url, params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_accounts for sync clients")
def agree_to_terms(self, account_id: str, request: TermsAndConditionsRequest) -> AccountDetailModel:
"""
Agree to terms and conditions.
Args:
account_id: The ID of the account agreeing to terms.
request: TermsAndConditionsRequest model with agreement details.
Returns:
AccountDetailModel: The updated account.
"""
url = f"/api/v1/accounts/{account_id}/terms_and_conditions/agree"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=request.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use agree_to_terms_async for async clients")
async def agree_to_terms_async(self, account_id: str, request: TermsAndConditionsRequest) -> AccountDetailModel:
"""
Agree to terms and conditions asynchronously.
Args:
account_id: The ID of the account agreeing to terms.
request: TermsAndConditionsRequest model with agreement details.
Returns:
AccountDetailModel: The updated account.
"""
url = f"/api/v1/accounts/{account_id}/terms_and_conditions/agree"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=request.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use agree_to_terms for sync clients")

View File

@@ -0,0 +1,392 @@
"""
Base API class for the Airwallex SDK.
"""
import asyncio
import logging
from typing import (
Any,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
Coroutine,
Generator,
AsyncGenerator,
Generic,
cast,
get_args,
get_origin
)
from ..models.base import AirwallexModel
from ..utils import snake_to_pascal_case
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=AirwallexModel)
ClientType = TypeVar("ClientType")
class AirwallexAPIBase(Generic[T]):
"""
Base class for Airwallex API endpoints.
This class provides standard CRUD methods and pagination handling
for all API endpoints. It serves as the foundation for specific
API endpoint implementations.
"""
endpoint: str = ""
model_class: Type[T] = cast(Type[T], AirwallexModel) # Will be overridden by subclasses
def __init__(
self,
*,
client: Any,
data: Optional[Dict[str, Any]] = None,
parent: Optional["AirwallexAPIBase"] = None,
parent_path: Optional[str] = None # e.g. "/api/v1/accounts/{account_id}"
) -> None:
self.client = client
self.data: Dict[str, Any] = data or {}
self.parent: Optional["AirwallexAPIBase"] = parent
self.parent_path: Optional[str] = parent_path
def __getattr__(self, item: str) -> Any:
# If the attribute exists in the model's data, return it.
if item in self.data:
return self.data[item]
# If the model has an ID, we can try to access a subresource
if not getattr(self, 'id', None):
raise AttributeError(f"No such attribute '{item}' in {self.__class__.__name__} context.")
# Try to load an API module for this attribute.
try:
from importlib import import_module
base_package = self.client.__class__.__module__.split(".")[0]
module = import_module(f"{base_package}.api.{item.lower()}")
# We define modules in pascal case, but refer to them as attributes in snake case.
api_class = getattr(module, snake_to_pascal_case(item))
return api_class(client=self.client, parent=self, parent_path=self._build_url(self.id))
except (ModuleNotFoundError, AttributeError):
# Split snake case item into a path e.g. report_details -> report/details
path_item = "/".join(item.split("_"))
# If no module exists for this attribute and model has an id, then assume the attribute
# is a valid endpoint suffix. Return a callable that makes a GET request.
def dynamic_endpoint(*args, **kwargs):
"""
:param dataframe: If True, return a DataFrame instead of a list of dictionaries.
"""
url = self._build_url(resource_id=self.id, suffix=path_item)
if not str(self.client.__class__.__name__).startswith('Async'):
response = self.client._request("GET", url, params=kwargs)
data = self._parse_response_data(response.json())
return data
else:
async def async_endpoint():
response = await self.client._request("GET", url, params=kwargs)
data = self._parse_response_data(response.json())
return data
return async_endpoint()
return dynamic_endpoint
def __repr__(self) -> str:
identifier = self.data.get("id", "unknown")
return f"<{self.__class__.__name__} id={identifier}>"
def __call__(self, resource_id: Optional[Any] = None, **kwargs: Any) -> Union[
T,
Generator[T, None, None],
Coroutine[Any, Any, AsyncGenerator[T, None]]
]:
"""
If a resource_id is provided, fetch and return a single instance;
otherwise, return a generator that yields resources one by one.
For sync clients, returns a Generator[T, None, None].
For async clients, returns a coroutine that yields an AsyncGenerator[T, None].
"""
if resource_id is not None:
if not str(self.client.__class__.__name__).startswith('Async'):
return self.fetch(resource_id)
else:
return self.fetch_async(resource_id)
else:
if not str(self.client.__class__.__name__).startswith('Async'):
return self.paginate_generator(**kwargs)
else:
return self.paginate_async_generator(**kwargs)
@classmethod
def get_endpoint(cls) -> str:
"""Get the API endpoint path."""
return cls.endpoint if cls.endpoint else cls.__name__.lower()
@staticmethod
def _parse_response_data(
response: Union[List[Any], Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Parse response data into a list of dictionaries."""
# If response is a dictionary with an 'items' key, it's paginated
if isinstance(response, dict) and 'items' in response:
return response['items']
# If response is a dictionary, wrap it in a list
if isinstance(response, dict):
return [response]
# If response is already a list, return it
return response
@property
def base_path(self) -> str:
"""Get the base API path for this endpoint."""
if self.parent_path:
return f"{self.parent_path}/{self.get_endpoint()}"
return f"/api/v1/{self.get_endpoint()}"
def _build_url(self, resource_id: Optional[Any] = None, suffix: str = "") -> str:
"""Build a URL for a specific resource."""
url = self.base_path
if resource_id is not None:
url = f"{url}/{resource_id}"
if suffix:
url = f"{url}/{suffix}"
return url
def show(self, indent: int = 0, indent_step: int = 2) -> str:
"""
Return a nicely formatted string representation of this model and its data.
"""
pad = " " * indent
lines = [f"{pad}{self.__class__.__name__}:"]
for key, value in self.data.items():
if isinstance(value, AirwallexAPIBase):
lines.append(f"{pad}{' ' * indent_step}{key}:")
lines.append(value.show(indent + indent_step, indent_step))
elif isinstance(value, list):
lines.append(f"{pad}{' ' * indent_step}{key}: [")
for item in value:
if isinstance(item, AirwallexAPIBase):
lines.append(item.show(indent + indent_step, indent_step))
else:
lines.append(f"{pad}{' ' * (indent_step * 2)}{item}")
lines.append(f"{pad}{' ' * indent_step}]")
else:
lines.append(f"{pad}{' ' * indent_step}{key}: {value}")
return "\n".join(lines)
def to_model(self) -> T:
"""Convert the raw data to a Pydantic model."""
if not self.data:
raise ValueError("No data available to convert to a model")
return self.model_class.from_api_response(self.data)
# Synchronous API methods
def fetch(self, resource_id: Any) -> T:
"""Fetch a single resource by ID."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
url = self._build_url(resource_id)
response = self.client._request("GET", url)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
def list(self, **params: Any) -> List[T]:
"""List resources with optional filtering parameters."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
url = self._build_url()
response = self.client._request("GET", url, params=params)
data_list = self._parse_response_data(response.json())
return [self.model_class.from_api_response(item) for item in data_list]
def create(self, payload: Union[Dict[str, Any], T]) -> T:
"""Create a new resource."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url()
response = self.client._request("POST", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
def update(self, resource_id: Any, payload: Union[Dict[str, Any], T]) -> T:
"""Update an existing resource."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url(resource_id)
response = self.client._request("PUT", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
def delete(self, resource_id: Any) -> None:
"""Delete a resource."""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
url = self._build_url(resource_id)
self.client._request("DELETE", url)
def paginate(self, stop_page: Optional[int] = None, **params: Any) -> Generator[T, None, None]:
"""
Generate items one by one from paginated results.
Args:
stop_page: The page number to stop at (optional).
**params: Filter parameters to pass to the API.
Yields:
T: Each item from the paginated results.
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
page_num = params.get("page_num", 1)
page_size = params.get("page_size", 100)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
url = self._build_url()
response = self.client._request("GET", url, params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
for item in items:
yield self.model_class.from_api_response(item)
if not has_more or not items:
break
page_num += 1
if stop_page and page_num > stop_page:
break
# Asynchronous API methods
async def fetch_async(self, resource_id: Any) -> T:
"""Fetch a single resource by ID asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
url = self._build_url(resource_id)
response = await self.client._request("GET", url)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
async def list_async(self, **params: Any) -> List[T]:
"""List resources with optional filtering parameters asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
url = self._build_url()
response = await self.client._request("GET", url, params=params)
data_list = self._parse_response_data(response.json())
return [self.model_class.from_api_response(item) for item in data_list]
async def create_async(self, payload: Union[Dict[str, Any], T]) -> T:
"""Create a new resource asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url()
response = await self.client._request("POST", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
async def update_async(self, resource_id: Any, payload: Union[Dict[str, Any], T]) -> T:
"""Update an existing resource asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
payload_dict = payload
# Convert Pydantic model to dict if needed
if isinstance(payload, AirwallexModel):
payload_dict = payload.to_api_dict()
url = self._build_url(resource_id)
response = await self.client._request("PUT", url, json=payload_dict)
data = self._parse_response_data(response.json())
# If the returned data is a list, take the first item.
if isinstance(data, list):
data = data[0] if data else {}
return self.model_class.from_api_response(data)
async def delete_async(self, resource_id: Any) -> None:
"""Delete a resource asynchronously."""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
url = self._build_url(resource_id)
await self.client._request("DELETE", url)
async def paginate_async(self, stop_page: Optional[int] = None, **params: Any) -> AsyncGenerator[T, None]:
"""
Generate items one by one from paginated results, asynchronously.
Args:
stop_page: The page number to stop at (optional).
**params: Filter parameters to pass to the API.
Yields:
T: Each item from the paginated results.
"""
if not str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires an async client.")
page_num = params.get("page_num", 1)
page_size = params.get("page_size", 100)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
url = self._build_url()
response = await self.client._request("GET", url, params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
for item in items:
yield self.model_class.from_api_response(item)
if not has_more or not items:
break
page_num += 1
if stop_page and page_num > stop_page:
break

View File

@@ -0,0 +1,156 @@
"""
Airwallex Beneficiary API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from ..models.beneficiary import Beneficiary as BeneficiaryModel, BeneficiaryCreateRequest, BeneficiaryUpdateRequest
from .base import AirwallexAPIBase
T = TypeVar("T", bound=BeneficiaryModel)
class Beneficiary(AirwallexAPIBase[BeneficiaryModel]):
"""
Operations for Airwallex beneficiaries.
Beneficiaries represent recipients of payments.
"""
endpoint = "beneficiaries"
model_class = cast(Type[BeneficiaryModel], BeneficiaryModel)
def create_from_model(self, beneficiary: BeneficiaryCreateRequest) -> BeneficiaryModel:
"""
Create a new beneficiary using a Pydantic model.
Args:
beneficiary: BeneficiaryCreateRequest model with beneficiary creation details.
Returns:
Beneficiary: The created beneficiary.
"""
return self.create(beneficiary)
async def create_from_model_async(self, beneficiary: BeneficiaryCreateRequest) -> BeneficiaryModel:
"""
Create a new beneficiary using a Pydantic model asynchronously.
Args:
beneficiary: BeneficiaryCreateRequest model with beneficiary creation details.
Returns:
Beneficiary: The created beneficiary.
"""
return await self.create_async(beneficiary)
def update_from_model(self, beneficiary_id: str, beneficiary: BeneficiaryUpdateRequest) -> BeneficiaryModel:
"""
Update a beneficiary using a Pydantic model.
Args:
beneficiary_id: The ID of the beneficiary to update.
beneficiary: BeneficiaryUpdateRequest model with beneficiary update details.
Returns:
Beneficiary: The updated beneficiary.
"""
return self.update(beneficiary_id, beneficiary)
async def update_from_model_async(self, beneficiary_id: str, beneficiary: BeneficiaryUpdateRequest) -> BeneficiaryModel:
"""
Update a beneficiary using a Pydantic model asynchronously.
Args:
beneficiary_id: The ID of the beneficiary to update.
beneficiary: BeneficiaryUpdateRequest model with beneficiary update details.
Returns:
Beneficiary: The updated beneficiary.
"""
return await self.update_async(beneficiary_id, beneficiary)
def validate(self, beneficiary: BeneficiaryCreateRequest) -> Dict[str, Any]:
"""
Validate a beneficiary without creating it.
Args:
beneficiary: BeneficiaryCreateRequest model with beneficiary details.
Returns:
Dict[str, Any]: Validation results.
"""
url = self._build_url(suffix="validate")
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=beneficiary.to_api_dict())
return response.json()
else:
raise ValueError("Use validate_async for async clients")
async def validate_async(self, beneficiary: BeneficiaryCreateRequest) -> Dict[str, Any]:
"""
Validate a beneficiary without creating it asynchronously.
Args:
beneficiary: BeneficiaryCreateRequest model with beneficiary details.
Returns:
Dict[str, Any]: Validation results.
"""
url = self._build_url(suffix="validate")
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=beneficiary.to_api_dict())
return response.json()
else:
raise ValueError("Use validate for sync clients")
def deactivate(self, beneficiary_id: str) -> BeneficiaryModel:
"""
Deactivate a beneficiary.
Args:
beneficiary_id: The ID of the beneficiary to deactivate.
Returns:
Beneficiary: The deactivated beneficiary.
"""
update_request = BeneficiaryUpdateRequest(status="disabled")
return self.update(beneficiary_id, update_request)
async def deactivate_async(self, beneficiary_id: str) -> BeneficiaryModel:
"""
Deactivate a beneficiary asynchronously.
Args:
beneficiary_id: The ID of the beneficiary to deactivate.
Returns:
Beneficiary: The deactivated beneficiary.
"""
update_request = BeneficiaryUpdateRequest(status="disabled")
return await self.update_async(beneficiary_id, update_request)
def activate(self, beneficiary_id: str) -> BeneficiaryModel:
"""
Activate a beneficiary.
Args:
beneficiary_id: The ID of the beneficiary to activate.
Returns:
Beneficiary: The activated beneficiary.
"""
update_request = BeneficiaryUpdateRequest(status="active")
return self.update(beneficiary_id, update_request)
async def activate_async(self, beneficiary_id: str) -> BeneficiaryModel:
"""
Activate a beneficiary asynchronously.
Args:
beneficiary_id: The ID of the beneficiary to activate.
Returns:
Beneficiary: The activated beneficiary.
"""
update_request = BeneficiaryUpdateRequest(status="active")
return await self.update_async(beneficiary_id, update_request)

View File

@@ -0,0 +1,123 @@
"""
Airwallex Financial Transaction API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.financial_transaction import FinancialTransaction
from .base import AirwallexAPIBase
T = TypeVar("T", bound=FinancialTransaction)
class FinancialTransaction(AirwallexAPIBase[FinancialTransaction]):
"""
Operations for Airwallex financial transactions.
Financial transactions represent the transactions that contributed to the Airwallex account balance.
"""
endpoint = "financial_transactions"
model_class = cast(Type[FinancialTransaction], FinancialTransaction)
def list_with_filters(
self,
batch_id: Optional[str] = None,
currency: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
to_created_at: Optional[Union[str, datetime]] = None,
source_id: Optional[str] = None,
status: Optional[str] = None,
page_num: int = 0,
page_size: int = 100
) -> List[FinancialTransaction]:
"""
List financial transactions with filtering options.
Args:
batch_id: Filter by batch ID
currency: Filter by currency (3-letter ISO-4217 code)
from_created_at: Filter by creation date (start, inclusive)
to_created_at: Filter by creation date (end, inclusive)
source_id: Filter by source ID
status: Filter by status (PENDING, SETTLED)
page_num: Page number (0-indexed) for pagination
page_size: Number of transactions per page (max 1000)
Returns:
List[FinancialTransaction]: List of matching financial transactions
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if batch_id:
params["batch_id"] = batch_id
if currency:
params["currency"] = currency
if source_id:
params["source_id"] = source_id
if status:
params["status"] = status
if from_created_at:
params["from_created_at"] = from_created_at
if to_created_at:
params["to_created_at"] = to_created_at
return self.list(**params)
async def list_with_filters_async(
self,
batch_id: Optional[str] = None,
currency: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
to_created_at: Optional[Union[str, datetime]] = None,
source_id: Optional[str] = None,
status: Optional[str] = None,
page_num: int = 0,
page_size: int = 100
) -> List[FinancialTransaction]:
"""
List financial transactions with filtering options asynchronously.
Args:
batch_id: Filter by batch ID
currency: Filter by currency (3-letter ISO-4217 code)
from_created_at: Filter by creation date (start, inclusive)
to_created_at: Filter by creation date (end, inclusive)
source_id: Filter by source ID
status: Filter by status (PENDING, SETTLED)
page_num: Page number (0-indexed) for pagination
page_size: Number of transactions per page (max 1000)
Returns:
List[FinancialTransaction]: List of matching financial transactions
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if batch_id:
params["batch_id"] = batch_id
if currency:
params["currency"] = currency
if source_id:
params["source_id"] = source_id
if status:
params["status"] = status
if from_created_at:
params["from_created_at"] = from_created_at
if to_created_at:
params["to_created_at"] = to_created_at
return await self.list_async(**params)

View File

@@ -0,0 +1,257 @@
"""
Airwallex Invoice API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.invoice import Invoice, InvoiceItem, InvoicePreviewRequest, InvoicePreviewResponse
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Invoice)
class Invoice(AirwallexAPIBase[Invoice]):
"""
Operations for Airwallex invoices.
Invoices record one-off sales transactions between you and your customers.
"""
endpoint = "invoices"
model_class = cast(Type[Invoice], Invoice)
def preview(self, preview_request: InvoicePreviewRequest) -> InvoicePreviewResponse:
"""
Preview an upcoming invoice.
This method allows you to preview the upcoming invoice of an existing subscription
or the first invoice before creating a new subscription.
Args:
preview_request: InvoicePreviewRequest model with preview details
Returns:
InvoicePreviewResponse: The preview of the upcoming invoice
"""
url = self._build_url(suffix="preview")
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=preview_request.to_api_dict())
return InvoicePreviewResponse.from_api_response(response.json())
else:
raise ValueError("Use preview_async for async clients")
async def preview_async(self, preview_request: InvoicePreviewRequest) -> InvoicePreviewResponse:
"""
Preview an upcoming invoice asynchronously.
This method allows you to preview the upcoming invoice of an existing subscription
or the first invoice before creating a new subscription.
Args:
preview_request: InvoicePreviewRequest model with preview details
Returns:
InvoicePreviewResponse: The preview of the upcoming invoice
"""
url = self._build_url(suffix="preview")
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=preview_request.to_api_dict())
return InvoicePreviewResponse.from_api_response(response.json())
else:
raise ValueError("Use preview for sync clients")
def list_items(self, invoice_id: str, page_num: int = 0, page_size: int = 20) -> List[InvoiceItem]:
"""
List all items for a specific invoice.
Args:
invoice_id: The ID of the invoice to fetch items for
page_num: Page number (0-indexed) for pagination
page_size: Number of items per page
Returns:
List[InvoiceItem]: List of invoice items
"""
url = f"{self._build_url(invoice_id)}/items"
params = {
"page_num": page_num,
"page_size": page_size
}
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url, params=params)
data = response.json()
if "items" in data:
return [InvoiceItem.from_api_response(item) for item in data["items"]]
return []
else:
raise ValueError("Use list_items_async for async clients")
async def list_items_async(self, invoice_id: str, page_num: int = 0, page_size: int = 20) -> List[InvoiceItem]:
"""
List all items for a specific invoice asynchronously.
Args:
invoice_id: The ID of the invoice to fetch items for
page_num: Page number (0-indexed) for pagination
page_size: Number of items per page
Returns:
List[InvoiceItem]: List of invoice items
"""
url = f"{self._build_url(invoice_id)}/items"
params = {
"page_num": page_num,
"page_size": page_size
}
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url, params=params)
data = response.json()
if "items" in data:
return [InvoiceItem.from_api_response(item) for item in data["items"]]
return []
else:
raise ValueError("Use list_items for sync clients")
def get_item(self, invoice_id: str, item_id: str) -> InvoiceItem:
"""
Retrieve a specific invoice item.
Args:
invoice_id: The ID of the invoice that contains the item
item_id: The ID of the invoice item to retrieve
Returns:
InvoiceItem: The requested invoice item
"""
url = f"{self._build_url(invoice_id)}/items/{item_id}"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return InvoiceItem.from_api_response(response.json())
else:
raise ValueError("Use get_item_async for async clients")
async def get_item_async(self, invoice_id: str, item_id: str) -> InvoiceItem:
"""
Retrieve a specific invoice item asynchronously.
Args:
invoice_id: The ID of the invoice that contains the item
item_id: The ID of the invoice item to retrieve
Returns:
InvoiceItem: The requested invoice item
"""
url = f"{self._build_url(invoice_id)}/items/{item_id}"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return InvoiceItem.from_api_response(response.json())
else:
raise ValueError("Use get_item for sync clients")
def list_with_filters(
self,
customer_id: Optional[str] = None,
subscription_id: Optional[str] = None,
status: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
to_created_at: Optional[Union[str, datetime]] = None,
page_num: int = 0,
page_size: int = 20
) -> List[Invoice]:
"""
List invoices with filtering options.
Args:
customer_id: Filter by customer ID
subscription_id: Filter by subscription ID
status: Filter by status (SENT, PAID, PAYMENT_FAILED)
from_created_at: Filter by creation date (start, inclusive)
to_created_at: Filter by creation date (end, exclusive)
page_num: Page number (0-indexed) for pagination
page_size: Number of invoices per page
Returns:
List[Invoice]: List of matching invoices
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if customer_id:
params["customer_id"] = customer_id
if subscription_id:
params["subscription_id"] = subscription_id
if status:
params["status"] = status
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
return self.list(**params)
async def list_with_filters_async(
self,
customer_id: Optional[str] = None,
subscription_id: Optional[str] = None,
status: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
to_created_at: Optional[Union[str, datetime]] = None,
page_num: int = 0,
page_size: int = 20
) -> List[Invoice]:
"""
List invoices with filtering options asynchronously.
Args:
customer_id: Filter by customer ID
subscription_id: Filter by subscription ID
status: Filter by status (SENT, PAID, PAYMENT_FAILED)
from_created_at: Filter by creation date (start, inclusive)
to_created_at: Filter by creation date (end, exclusive)
page_num: Page number (0-indexed) for pagination
page_size: Number of invoices per page
Returns:
List[Invoice]: List of matching invoices
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if customer_id:
params["customer_id"] = customer_id
if subscription_id:
params["subscription_id"] = subscription_id
if status:
params["status"] = status
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
return await self.list_async(**params)

View File

@@ -0,0 +1,313 @@
"""
Airwallex Issuing Authorization API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.issuing_authorization import Authorization, AuthorizationListResponse
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Authorization)
class IssuingAuthorization(AirwallexAPIBase[Authorization]):
"""
Operations for Airwallex issuing authorizations.
Authorizations represent pre-auth and capture processed against individual cards.
"""
endpoint = "issuing/authorizations"
model_class = cast(Type[Authorization], Authorization)
def list_with_filters(
self,
billing_currency: Optional[str] = None,
card_id: Optional[str] = None,
digital_wallet_token_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
lifecycle_id: Optional[str] = None,
page_num: int = 0,
page_size: int = 10,
retrieval_ref: Optional[str] = None,
status: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
) -> List[Authorization]:
"""
List authorizations with filtering options.
Args:
billing_currency: Currency in which transition was billed (3-letter ISO-4217 code)
card_id: Unique Identifier for card
digital_wallet_token_id: Unique Identifier for digital token
from_created_at: Start of Transaction Date in ISO8601 format (inclusive)
lifecycle_id: Unique Identifier for lifecycle
page_num: Page number, starts from 0
page_size: Number of results per page
retrieval_ref: Retrieval reference number
status: Authorization status (CLEARED, EXPIRED, FAILED, PENDING, REVERSED)
to_created_at: End of Transaction Date in ISO8601 format (exclusive)
Returns:
List[Authorization]: List of matching authorizations
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if billing_currency:
params["billing_currency"] = billing_currency
if card_id:
params["card_id"] = card_id
if digital_wallet_token_id:
params["digital_wallet_token_id"] = digital_wallet_token_id
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if lifecycle_id:
params["lifecycle_id"] = lifecycle_id
if retrieval_ref:
params["retrieval_ref"] = retrieval_ref
if status:
params["status"] = status
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters_async for async clients")
async def list_with_filters_async(
self,
billing_currency: Optional[str] = None,
card_id: Optional[str] = None,
digital_wallet_token_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
lifecycle_id: Optional[str] = None,
page_num: int = 0,
page_size: int = 10,
retrieval_ref: Optional[str] = None,
status: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
) -> List[Authorization]:
"""
List authorizations with filtering options asynchronously.
Args:
billing_currency: Currency in which transition was billed (3-letter ISO-4217 code)
card_id: Unique Identifier for card
digital_wallet_token_id: Unique Identifier for digital token
from_created_at: Start of Transaction Date in ISO8601 format (inclusive)
lifecycle_id: Unique Identifier for lifecycle
page_num: Page number, starts from 0
page_size: Number of results per page
retrieval_ref: Retrieval reference number
status: Authorization status (CLEARED, EXPIRED, FAILED, PENDING, REVERSED)
to_created_at: End of Transaction Date in ISO8601 format (exclusive)
Returns:
List[Authorization]: List of matching authorizations
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if billing_currency:
params["billing_currency"] = billing_currency
if card_id:
params["card_id"] = card_id
if digital_wallet_token_id:
params["digital_wallet_token_id"] = digital_wallet_token_id
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if lifecycle_id:
params["lifecycle_id"] = lifecycle_id
if retrieval_ref:
params["retrieval_ref"] = retrieval_ref
if status:
params["status"] = status
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters for sync clients")
def paginate(self, **params: Any) -> List[Authorization]:
"""
Fetch all pages of authorizations.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Authorization]: All authorizations matching the filters
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]
async def paginate_async(self, **params: Any) -> List[Authorization]:
"""
Fetch all pages of authorizations asynchronously.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Authorization]: All authorizations matching the filters
"""
if not self.client.__class__.__name__.startswith('Async'):
raise ValueError("This method requires an async client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]
def paginate_generator(self, **params: Any):
"""
Generate items one by one from paginated results.
Args:
**params: Filter parameters to pass to the API
Yields:
Authorization: Authorization objects one by one
"""
if self.client.__class__.__name__.startswith('Async'):
raise ValueError("This method requires a sync client.")
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
for item in items:
yield self.model_class.from_api_response(item)
if not has_more:
break
page_num += 1
async def paginate_async_generator(self, **params: Any):
"""
Generate items one by one from paginated results asynchronously.
Args:
**params: Filter parameters to pass to the API
Yields:
Authorization: Authorization objects one by one
"""
if not self.client.__class__.__name__.startswith('Async'):
raise ValueError("This method requires an async client.")
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
for item in items:
yield self.model_class.from_api_response(item)
if not has_more:
break
page_num += 1

View File

@@ -0,0 +1,411 @@
"""
Airwallex Issuing Card API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.issuing_card import Card, CardCreateRequest, CardUpdateRequest, CardDetails, CardLimits
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Card)
class IssuingCard(AirwallexAPIBase[Card]):
"""
Operations for Airwallex issuing cards.
Cards represent virtual or physical payment cards associated with cardholders.
"""
endpoint = "issuing/cards"
model_class = cast(Type[Card], Card)
def create_card(self, card: CardCreateRequest) -> Card:
"""
Create a new card.
Args:
card: CardCreateRequest model with card details
Returns:
Card: The created card
"""
url = f"{self.base_path}/create"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=card.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_card_async for async clients")
async def create_card_async(self, card: CardCreateRequest) -> Card:
"""
Create a new card asynchronously.
Args:
card: CardCreateRequest model with card details
Returns:
Card: The created card
"""
url = f"{self.base_path}/create"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=card.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_card for sync clients")
def get_card_details(self, card_id: str) -> CardDetails:
"""
Get sensitive card details.
Args:
card_id: The ID of the card
Returns:
CardDetails: Sensitive card details
"""
url = f"{self._build_url(card_id)}/details"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return CardDetails.from_api_response(response.json())
else:
raise ValueError("Use get_card_details_async for async clients")
async def get_card_details_async(self, card_id: str) -> CardDetails:
"""
Get sensitive card details asynchronously.
Args:
card_id: The ID of the card
Returns:
CardDetails: Sensitive card details
"""
url = f"{self._build_url(card_id)}/details"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return CardDetails.from_api_response(response.json())
else:
raise ValueError("Use get_card_details for sync clients")
def activate_card(self, card_id: str) -> None:
"""
Activate a physical card.
Args:
card_id: The ID of the card to activate
"""
url = f"{self._build_url(card_id)}/activate"
if not self.client.__class__.__name__.startswith('Async'):
self.client._request("POST", url)
else:
raise ValueError("Use activate_card_async for async clients")
async def activate_card_async(self, card_id: str) -> None:
"""
Activate a physical card asynchronously.
Args:
card_id: The ID of the card to activate
"""
url = f"{self._build_url(card_id)}/activate"
if self.client.__class__.__name__.startswith('Async'):
await self.client._request("POST", url)
else:
raise ValueError("Use activate_card for sync clients")
def get_card_limits(self, card_id: str) -> CardLimits:
"""
Get card remaining limits.
Args:
card_id: The ID of the card
Returns:
CardLimits: Card remaining limits
"""
url = f"{self._build_url(card_id)}/limits"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", url)
return CardLimits.from_api_response(response.json())
else:
raise ValueError("Use get_card_limits_async for async clients")
async def get_card_limits_async(self, card_id: str) -> CardLimits:
"""
Get card remaining limits asynchronously.
Args:
card_id: The ID of the card
Returns:
CardLimits: Card remaining limits
"""
url = f"{self._build_url(card_id)}/limits"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", url)
return CardLimits.from_api_response(response.json())
else:
raise ValueError("Use get_card_limits for sync clients")
def update_card(self, card_id: str, update_data: CardUpdateRequest) -> Card:
"""
Update a card.
Args:
card_id: The ID of the card to update
update_data: CardUpdateRequest model with update details
Returns:
Card: The updated card
"""
url = f"{self._build_url(card_id)}/update"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_card_async for async clients")
async def update_card_async(self, card_id: str, update_data: CardUpdateRequest) -> Card:
"""
Update a card asynchronously.
Args:
card_id: The ID of the card to update
update_data: CardUpdateRequest model with update details
Returns:
Card: The updated card
"""
url = f"{self._build_url(card_id)}/update"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_card for sync clients")
def list_with_filters(
self,
card_status: Optional[str] = None,
cardholder_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
from_updated_at: Optional[Union[str, datetime]] = None,
nick_name: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
to_updated_at: Optional[Union[str, datetime]] = None,
page_num: int = 0,
page_size: int = 10
) -> List[Card]:
"""
List cards with filtering options.
Args:
card_status: Filter by status
cardholder_id: Filter by cardholder ID
from_created_at: Filter by creation date (start, inclusive)
from_updated_at: Filter by update date (start, inclusive)
nick_name: Filter by card nickname
to_created_at: Filter by creation date (end, inclusive)
to_updated_at: Filter by update date (end, inclusive)
page_num: Page number, starts from 0
page_size: Number of results per page
Returns:
List[Card]: List of matching cards
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if card_status:
params["card_status"] = card_status
if cardholder_id:
params["cardholder_id"] = cardholder_id
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if from_updated_at:
if isinstance(from_updated_at, datetime):
from_updated_at = from_updated_at.isoformat()
params["from_updated_at"] = from_updated_at
if nick_name:
params["nick_name"] = nick_name
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if to_updated_at:
if isinstance(to_updated_at, datetime):
to_updated_at = to_updated_at.isoformat()
params["to_updated_at"] = to_updated_at
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters_async for async clients")
async def list_with_filters_async(
self,
card_status: Optional[str] = None,
cardholder_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
from_updated_at: Optional[Union[str, datetime]] = None,
nick_name: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
to_updated_at: Optional[Union[str, datetime]] = None,
page_num: int = 0,
page_size: int = 10
) -> List[Card]:
"""
List cards with filtering options asynchronously.
Args:
card_status: Filter by status
cardholder_id: Filter by cardholder ID
from_created_at: Filter by creation date (start, inclusive)
from_updated_at: Filter by update date (start, inclusive)
nick_name: Filter by card nickname
to_created_at: Filter by creation date (end, inclusive)
to_updated_at: Filter by update date (end, inclusive)
page_num: Page number, starts from 0
page_size: Number of results per page
Returns:
List[Card]: List of matching cards
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if card_status:
params["card_status"] = card_status
if cardholder_id:
params["cardholder_id"] = cardholder_id
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if from_updated_at:
if isinstance(from_updated_at, datetime):
from_updated_at = from_updated_at.isoformat()
params["from_updated_at"] = from_updated_at
if nick_name:
params["nick_name"] = nick_name
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if to_updated_at:
if isinstance(to_updated_at, datetime):
to_updated_at = to_updated_at.isoformat()
params["to_updated_at"] = to_updated_at
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters for sync clients")
def paginate(self, **params: Any) -> List[Card]:
"""
Fetch all pages of cards.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Card]: All cards matching the filters
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]
async def paginate_async(self, **params: Any) -> List[Card]:
"""
Fetch all pages of cards asynchronously.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Card]: All cards matching the filters
"""
if not self.client.__class__.__name__.startswith('Async'):
raise ValueError("This method requires an async client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]

View File

@@ -0,0 +1,234 @@
"""
Airwallex Issuing Cardholder API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from ..models.issuing_cardholder import Cardholder, CardholderCreateRequest, CardholderUpdateRequest
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Cardholder)
class IssuingCardholder(AirwallexAPIBase[Cardholder]):
"""
Operations for Airwallex issuing cardholders.
Cardholders are authorized representatives that can be issued cards.
"""
endpoint = "issuing/cardholders"
model_class = cast(Type[Cardholder], Cardholder)
def create_cardholder(self, cardholder: CardholderCreateRequest) -> Cardholder:
"""
Create a new cardholder.
Args:
cardholder: CardholderCreateRequest model with cardholder details
Returns:
Cardholder: The created cardholder
"""
url = f"{self.base_path}/create"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=cardholder.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_cardholder_async for async clients")
async def create_cardholder_async(self, cardholder: CardholderCreateRequest) -> Cardholder:
"""
Create a new cardholder asynchronously.
Args:
cardholder: CardholderCreateRequest model with cardholder details
Returns:
Cardholder: The created cardholder
"""
url = f"{self.base_path}/create"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=cardholder.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_cardholder for sync clients")
def list_with_filters(
self,
cardholder_status: Optional[str] = None,
page_num: int = 0,
page_size: int = 10
) -> List[Cardholder]:
"""
List cardholders with filtering options.
Args:
cardholder_status: Filter by status (PENDING, READY, INCOMPLETE, DISABLED)
page_num: Page number, starts from 0
page_size: Number of results per page
Returns:
List[Cardholder]: List of matching cardholders
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if cardholder_status:
params["cardholder_status"] = cardholder_status
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters_async for async clients")
async def list_with_filters_async(
self,
cardholder_status: Optional[str] = None,
page_num: int = 0,
page_size: int = 10
) -> List[Cardholder]:
"""
List cardholders with filtering options asynchronously.
Args:
cardholder_status: Filter by status (PENDING, READY, INCOMPLETE, DISABLED)
page_num: Page number, starts from 0
page_size: Number of results per page
Returns:
List[Cardholder]: List of matching cardholders
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if cardholder_status:
params["cardholder_status"] = cardholder_status
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters for sync clients")
def update_cardholder(self, cardholder_id: str, update_data: CardholderUpdateRequest) -> Cardholder:
"""
Update a cardholder.
Args:
cardholder_id: The ID of the cardholder to update
update_data: CardholderUpdateRequest model with update details
Returns:
Cardholder: The updated cardholder
"""
url = f"{self._build_url(cardholder_id)}/update"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_cardholder_async for async clients")
async def update_cardholder_async(self, cardholder_id: str, update_data: CardholderUpdateRequest) -> Cardholder:
"""
Update a cardholder asynchronously.
Args:
cardholder_id: The ID of the cardholder to update
update_data: CardholderUpdateRequest model with update details
Returns:
Cardholder: The updated cardholder
"""
url = f"{self._build_url(cardholder_id)}/update"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_cardholder for sync clients")
def paginate(self, **params: Any) -> List[Cardholder]:
"""
Fetch all pages of cardholders.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Cardholder]: All cardholders matching the filters
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]
async def paginate_async(self, **params: Any) -> List[Cardholder]:
"""
Fetch all pages of cardholders asynchronously.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Cardholder]: All cardholders matching the filters
"""
if not self.client.__class__.__name__.startswith('Async'):
raise ValueError("This method requires an async client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]

View File

@@ -0,0 +1,80 @@
"""
Airwallex Issuing Config API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from ..models.issuing_config import IssuingConfig, IssuingConfigUpdateRequest
from .base import AirwallexAPIBase
T = TypeVar("T", bound=IssuingConfig)
class IssuingConfig(AirwallexAPIBase[IssuingConfig]):
"""
Operations for Airwallex issuing configuration.
Configuration for issuance settings and controls.
"""
endpoint = "issuing/config"
model_class = cast(Type[IssuingConfig], IssuingConfig)
def get_config(self) -> IssuingConfig:
"""
Get the current issuing configuration.
Returns:
IssuingConfig: The current issuing configuration
"""
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", self._build_url())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use get_config_async for async clients")
async def get_config_async(self) -> IssuingConfig:
"""
Get the current issuing configuration asynchronously.
Returns:
IssuingConfig: The current issuing configuration
"""
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", self._build_url())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use get_config for sync clients")
def update_config(self, update_data: IssuingConfigUpdateRequest) -> IssuingConfig:
"""
Update the issuing configuration.
Args:
update_data: IssuingConfigUpdateRequest model with update details
Returns:
IssuingConfig: The updated issuing configuration
"""
url = f"{self._build_url()}/update"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_config_async for async clients")
async def update_config_async(self, update_data: IssuingConfigUpdateRequest) -> IssuingConfig:
"""
Update the issuing configuration asynchronously.
Args:
update_data: IssuingConfigUpdateRequest model with update details
Returns:
IssuingConfig: The updated issuing configuration
"""
url = f"{self._build_url()}/update"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_config for sync clients")

View File

@@ -0,0 +1,249 @@
"""
Airwallex Issuing Digital Wallet Token API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.issuing_digital_wallet_token import DigitalWalletToken
from .base import AirwallexAPIBase
T = TypeVar("T", bound=DigitalWalletToken)
class IssuingDigitalWalletToken(AirwallexAPIBase[DigitalWalletToken]):
"""
Operations for Airwallex issuing digital wallet tokens.
Digital wallet tokens represent tokenized cards in digital wallets.
"""
endpoint = "issuing/digital_wallet_tokens"
model_class = cast(Type[DigitalWalletToken], DigitalWalletToken)
def list_with_filters(
self,
card_id: Optional[str] = None,
cardholder_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
from_token_expires_on: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
to_token_expires_on: Optional[str] = None,
token_reference_id: Optional[str] = None,
token_statuses: Optional[str] = None,
token_types: Optional[str] = None,
page_num: int = 0,
page_size: int = 10
) -> List[DigitalWalletToken]:
"""
List digital wallet tokens with filtering options.
Args:
card_id: Filter by card ID
cardholder_id: Filter by cardholder ID
from_created_at: Filter by creation date (start, inclusive)
from_token_expires_on: Filter by expiration date (start, inclusive, format MMyy)
to_created_at: Filter by creation date (end, inclusive)
to_token_expires_on: Filter by expiration date (end, inclusive, format MMyy)
token_reference_id: Filter by token reference ID
token_statuses: Filter by token statuses (comma-separated)
token_types: Filter by token types (comma-separated)
page_num: Page number, starts from 0
page_size: Number of results per page
Returns:
List[DigitalWalletToken]: List of matching digital wallet tokens
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if card_id:
params["card_id"] = card_id
if cardholder_id:
params["cardholder_id"] = cardholder_id
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if from_token_expires_on:
params["from_token_expires_on"] = from_token_expires_on
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if to_token_expires_on:
params["to_token_expires_on"] = to_token_expires_on
if token_reference_id:
params["token_reference_id"] = token_reference_id
if token_statuses:
params["token_statuses"] = token_statuses
if token_types:
params["token_types"] = token_types
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters_async for async clients")
async def list_with_filters_async(
self,
card_id: Optional[str] = None,
cardholder_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
from_token_expires_on: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
to_token_expires_on: Optional[str] = None,
token_reference_id: Optional[str] = None,
token_statuses: Optional[str] = None,
token_types: Optional[str] = None,
page_num: int = 0,
page_size: int = 10
) -> List[DigitalWalletToken]:
"""
List digital wallet tokens with filtering options asynchronously.
Args:
card_id: Filter by card ID
cardholder_id: Filter by cardholder ID
from_created_at: Filter by creation date (start, inclusive)
from_token_expires_on: Filter by expiration date (start, inclusive, format MMyy)
to_created_at: Filter by creation date (end, inclusive)
to_token_expires_on: Filter by expiration date (end, inclusive, format MMyy)
token_reference_id: Filter by token reference ID
token_statuses: Filter by token statuses (comma-separated)
token_types: Filter by token types (comma-separated)
page_num: Page number, starts from 0
page_size: Number of results per page
Returns:
List[DigitalWalletToken]: List of matching digital wallet tokens
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if card_id:
params["card_id"] = card_id
if cardholder_id:
params["cardholder_id"] = cardholder_id
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if from_token_expires_on:
params["from_token_expires_on"] = from_token_expires_on
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if to_token_expires_on:
params["to_token_expires_on"] = to_token_expires_on
if token_reference_id:
params["token_reference_id"] = token_reference_id
if token_statuses:
params["token_statuses"] = token_statuses
if token_types:
params["token_types"] = token_types
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters for sync clients")
def paginate(self, **params: Any) -> List[DigitalWalletToken]:
"""
Fetch all pages of digital wallet tokens.
Args:
**params: Filter parameters to pass to the API
Returns:
List[DigitalWalletToken]: All digital wallet tokens matching the filters
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]
async def paginate_async(self, **params: Any) -> List[DigitalWalletToken]:
"""
Fetch all pages of digital wallet tokens asynchronously.
Args:
**params: Filter parameters to pass to the API
Returns:
List[DigitalWalletToken]: All digital wallet tokens matching the filters
"""
if not self.client.__class__.__name__.startswith('Async'):
raise ValueError("This method requires an async client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]

View File

@@ -0,0 +1,192 @@
"""
Airwallex Issuing Transaction API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.issuing_transaction import Transaction
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Transaction)
class IssuingTransaction(AirwallexAPIBase[Transaction]):
"""
Operations for Airwallex issuing transactions.
Transactions represent payments processed against cards.
"""
endpoint = "issuing/transactions"
model_class = cast(Type[Transaction], Transaction)
def list_with_filters(
self,
billing_currency: Optional[str] = None,
card_id: Optional[str] = None,
digital_wallet_token_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
lifecycle_id: Optional[str] = None,
page_num: int = 0,
page_size: int = 10,
retrieval_ref: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
transaction_type: Optional[str] = None
) -> List[Transaction]:
"""
List transactions with filtering options.
Args:
billing_currency: Currency in which transition was billed (3-letter ISO-4217 code)
card_id: Unique Identifier for card
digital_wallet_token_id: Unique Identifier for digital token
from_created_at: Start of Transaction Date in ISO8601 format (inclusive)
lifecycle_id: Unique Identifier for lifecycle
page_num: Page number, starts from 0
page_size: Number of results per page
retrieval_ref: Retrieval reference number
to_created_at: End of Transaction Date in ISO8601 format (inclusive)
transaction_type: Transaction type (AUTHORIZATION, CLEARING, REFUND, REVERSAL, ORIGINAL_CREDIT)
Returns:
List[Transaction]: List of matching transactions
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if billing_currency:
params["billing_currency"] = billing_currency
if card_id:
params["card_id"] = card_id
if digital_wallet_token_id:
params["digital_wallet_token_id"] = digital_wallet_token_id
if from_created_at:
params["from_created_at"] = from_created_at
if lifecycle_id:
params["lifecycle_id"] = lifecycle_id
if retrieval_ref:
params["retrieval_ref"] = retrieval_ref
if to_created_at:
params["to_created_at"] = to_created_at
if transaction_type:
params["transaction_type"] = transaction_type
if not str(self.client.__class__.__name__).startswith('Async'):
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters_async for async clients")
async def list_with_filters_async(
self,
billing_currency: Optional[str] = None,
card_id: Optional[str] = None,
digital_wallet_token_id: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
lifecycle_id: Optional[str] = None,
page_num: int = 0,
page_size: int = 10,
retrieval_ref: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
transaction_type: Optional[str] = None
) -> List[Transaction]:
"""
List transactions with filtering options asynchronously.
Args:
billing_currency: Currency in which transition was billed (3-letter ISO-4217 code)
card_id: Unique Identifier for card
digital_wallet_token_id: Unique Identifier for digital token
from_created_at: Start of Transaction Date in ISO8601 format (inclusive)
lifecycle_id: Unique Identifier for lifecycle
page_num: Page number, starts from 0
page_size: Number of results per page
retrieval_ref: Retrieval reference number
to_created_at: End of Transaction Date in ISO8601 format (inclusive)
transaction_type: Transaction type (AUTHORIZATION, CLEARING, REFUND, REVERSAL, ORIGINAL_CREDIT)
Returns:
List[Transaction]: List of matching transactions
"""
params = {
"page_num": page_num,
"page_size": page_size
}
if billing_currency:
params["billing_currency"] = billing_currency
if card_id:
params["card_id"] = card_id
if digital_wallet_token_id:
params["digital_wallet_token_id"] = digital_wallet_token_id
if from_created_at:
params["from_created_at"] = from_created_at
if lifecycle_id:
params["lifecycle_id"] = lifecycle_id
if retrieval_ref:
params["retrieval_ref"] = retrieval_ref
if to_created_at:
params["to_created_at"] = to_created_at
if transaction_type:
params["transaction_type"] = transaction_type
if str(self.client.__class__.__name__).startswith('Async'):
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters for sync clients")
def paginate(self, **params: Any) -> List[Transaction]:
"""
Fetch all pages of transactions.
Args:
**params: Filter parameters to pass to the API
Returns:
List[Transaction]: All transactions matching the filters
"""
if str(self.client.__class__.__name__).startswith('Async'):
raise ValueError("This method requires a sync client.")
all_items: List[Dict[str, Any]] = []
page_num = params.get("page_num", 0)
page_size = params.get("page_size", 10)
while True:
params["page_num"] = page_num
params["page_size"] = page_size
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
items = data.get("items", [])
has_more = data.get("has_more", False)
if not items:
break
all_items.extend(items)
if not has_more:
break
page_num += 1
return [self.model_class.from_api_response(item) for item in all_items]

View File

@@ -0,0 +1,339 @@
"""
Airwallex Issuing Transaction Dispute API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from datetime import datetime
from ..models.issuing_transaction_dispute import TransactionDispute, TransactionDisputeCreateRequest, TransactionDisputeUpdateRequest
from .base import AirwallexAPIBase
T = TypeVar("T", bound=TransactionDispute)
class IssuingTransactionDispute(AirwallexAPIBase[TransactionDispute]):
"""
Operations for Airwallex issuing transaction disputes.
Transaction disputes represent disputes against card transactions.
"""
endpoint = "issuing/transaction_disputes"
model_class = cast(Type[TransactionDispute], TransactionDispute)
def create_dispute(self, dispute: TransactionDisputeCreateRequest) -> TransactionDispute:
"""
Create a new transaction dispute.
Args:
dispute: TransactionDisputeCreateRequest model with dispute details
Returns:
TransactionDispute: The created transaction dispute
"""
url = f"{self.base_path}/create"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=dispute.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_dispute_async for async clients")
async def create_dispute_async(self, dispute: TransactionDisputeCreateRequest) -> TransactionDispute:
"""
Create a new transaction dispute asynchronously.
Args:
dispute: TransactionDisputeCreateRequest model with dispute details
Returns:
TransactionDispute: The created transaction dispute
"""
url = f"{self.base_path}/create"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=dispute.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use create_dispute for sync clients")
def update_dispute(self, dispute_id: str, update_data: TransactionDisputeUpdateRequest) -> TransactionDispute:
"""
Update a transaction dispute.
Args:
dispute_id: The ID of the dispute to update
update_data: TransactionDisputeUpdateRequest model with update details
Returns:
TransactionDispute: The updated transaction dispute
"""
url = f"{self._build_url(dispute_id)}/update"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_dispute_async for async clients")
async def update_dispute_async(self, dispute_id: str, update_data: TransactionDisputeUpdateRequest) -> TransactionDispute:
"""
Update a transaction dispute asynchronously.
Args:
dispute_id: The ID of the dispute to update
update_data: TransactionDisputeUpdateRequest model with update details
Returns:
TransactionDispute: The updated transaction dispute
"""
url = f"{self._build_url(dispute_id)}/update"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=update_data.to_api_dict())
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use update_dispute for sync clients")
def submit_dispute(self, dispute_id: str) -> TransactionDispute:
"""
Submit a transaction dispute.
Args:
dispute_id: The ID of the dispute to submit
Returns:
TransactionDispute: The submitted transaction dispute
"""
url = f"{self._build_url(dispute_id)}/submit"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use submit_dispute_async for async clients")
async def submit_dispute_async(self, dispute_id: str) -> TransactionDispute:
"""
Submit a transaction dispute asynchronously.
Args:
dispute_id: The ID of the dispute to submit
Returns:
TransactionDispute: The submitted transaction dispute
"""
url = f"{self._build_url(dispute_id)}/submit"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use submit_dispute for sync clients")
def cancel_dispute(self, dispute_id: str) -> TransactionDispute:
"""
Cancel a transaction dispute.
Args:
dispute_id: The ID of the dispute to cancel
Returns:
TransactionDispute: The cancelled transaction dispute
"""
url = f"{self._build_url(dispute_id)}/cancel"
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use cancel_dispute_async for async clients")
async def cancel_dispute_async(self, dispute_id: str) -> TransactionDispute:
"""
Cancel a transaction dispute asynchronously.
Args:
dispute_id: The ID of the dispute to cancel
Returns:
TransactionDispute: The cancelled transaction dispute
"""
url = f"{self._build_url(dispute_id)}/cancel"
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url)
return self.model_class.from_api_response(response.json())
else:
raise ValueError("Use cancel_dispute for sync clients")
def list_with_filters(
self,
detailed_status: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
from_updated_at: Optional[Union[str, datetime]] = None,
page: Optional[str] = None,
page_size: int = 10,
reason: Optional[str] = None,
reference: Optional[str] = None,
status: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
to_updated_at: Optional[Union[str, datetime]] = None,
transaction_id: Optional[str] = None,
updated_by: Optional[str] = None
) -> List[TransactionDispute]:
"""
List transaction disputes with filtering options.
Args:
detailed_status: Filter by detailed status
from_created_at: Filter by creation date (start, inclusive)
from_updated_at: Filter by update date (start, inclusive)
page: Page bookmark for pagination
page_size: Number of results per page
reason: Filter by dispute reason
reference: Filter by reference
status: Filter by status
to_created_at: Filter by creation date (end, exclusive)
to_updated_at: Filter by update date (end, exclusive)
transaction_id: Filter by transaction ID
updated_by: Filter by who last updated the dispute
Returns:
List[TransactionDispute]: List of matching transaction disputes
"""
params = {
"page_size": page_size
}
if detailed_status:
params["detailed_status"] = detailed_status
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if from_updated_at:
if isinstance(from_updated_at, datetime):
from_updated_at = from_updated_at.isoformat()
params["from_updated_at"] = from_updated_at
if page:
params["page"] = page
if reason:
params["reason"] = reason
if reference:
params["reference"] = reference
if status:
params["status"] = status
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if to_updated_at:
if isinstance(to_updated_at, datetime):
to_updated_at = to_updated_at.isoformat()
params["to_updated_at"] = to_updated_at
if transaction_id:
params["transaction_id"] = transaction_id
if updated_by:
params["updated_by"] = updated_by
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters_async for async clients")
async def list_with_filters_async(
self,
detailed_status: Optional[str] = None,
from_created_at: Optional[Union[str, datetime]] = None,
from_updated_at: Optional[Union[str, datetime]] = None,
page: Optional[str] = None,
page_size: int = 10,
reason: Optional[str] = None,
reference: Optional[str] = None,
status: Optional[str] = None,
to_created_at: Optional[Union[str, datetime]] = None,
to_updated_at: Optional[Union[str, datetime]] = None,
transaction_id: Optional[str] = None,
updated_by: Optional[str] = None
) -> List[TransactionDispute]:
"""
List transaction disputes with filtering options asynchronously.
Args:
detailed_status: Filter by detailed status
from_created_at: Filter by creation date (start, inclusive)
from_updated_at: Filter by update date (start, inclusive)
page: Page bookmark for pagination
page_size: Number of results per page
reason: Filter by dispute reason
reference: Filter by reference
status: Filter by status
to_created_at: Filter by creation date (end, exclusive)
to_updated_at: Filter by update date (end, exclusive)
transaction_id: Filter by transaction ID
updated_by: Filter by who last updated the dispute
Returns:
List[TransactionDispute]: List of matching transaction disputes
"""
params = {
"page_size": page_size
}
if detailed_status:
params["detailed_status"] = detailed_status
if from_created_at:
if isinstance(from_created_at, datetime):
from_created_at = from_created_at.isoformat()
params["from_created_at"] = from_created_at
if from_updated_at:
if isinstance(from_updated_at, datetime):
from_updated_at = from_updated_at.isoformat()
params["from_updated_at"] = from_updated_at
if page:
params["page"] = page
if reason:
params["reason"] = reason
if reference:
params["reference"] = reference
if status:
params["status"] = status
if to_created_at:
if isinstance(to_created_at, datetime):
to_created_at = to_created_at.isoformat()
params["to_created_at"] = to_created_at
if to_updated_at:
if isinstance(to_updated_at, datetime):
to_updated_at = to_updated_at.isoformat()
params["to_updated_at"] = to_updated_at
if transaction_id:
params["transaction_id"] = transaction_id
if updated_by:
params["updated_by"] = updated_by
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("GET", self._build_url(), params=params)
data = response.json()
return [self.model_class.from_api_response(item) for item in data.get("items", [])]
else:
raise ValueError("Use list_with_filters for sync clients")

View File

@@ -0,0 +1,148 @@
"""
Airwallex Payment API.
"""
from typing import Dict, Any, List, Optional, Type, TypeVar, Union, cast
from ..models.payment import Payment, PaymentCreateRequest, PaymentUpdateRequest, PaymentQuote
from .base import AirwallexAPIBase
T = TypeVar("T", bound=Payment)
class Payment(AirwallexAPIBase[Payment]):
"""
Operations for Airwallex payments.
Payments represent money transfers to beneficiaries.
"""
endpoint = "payments"
model_class = cast(Type[Payment], Payment)
def create_from_model(self, payment: PaymentCreateRequest) -> Payment:
"""
Create a new payment using a Pydantic model.
Args:
payment: PaymentCreateRequest model with payment creation details.
Returns:
Payment: The created payment.
"""
return self.create(payment)
async def create_from_model_async(self, payment: PaymentCreateRequest) -> Payment:
"""
Create a new payment using a Pydantic model asynchronously.
Args:
payment: PaymentCreateRequest model with payment creation details.
Returns:
Payment: The created payment.
"""
return await self.create_async(payment)
def update_from_model(self, payment_id: str, payment: PaymentUpdateRequest) -> Payment:
"""
Update a payment using a Pydantic model.
Args:
payment_id: The ID of the payment to update.
payment: PaymentUpdateRequest model with payment update details.
Returns:
Payment: The updated payment.
"""
return self.update(payment_id, payment)
async def update_from_model_async(self, payment_id: str, payment: PaymentUpdateRequest) -> Payment:
"""
Update a payment using a Pydantic model asynchronously.
Args:
payment_id: The ID of the payment to update.
payment: PaymentUpdateRequest model with payment update details.
Returns:
Payment: The updated payment.
"""
return await self.update_async(payment_id, payment)
def cancel(self, payment_id: str) -> Payment:
"""
Cancel a payment.
Args:
payment_id: The ID of the payment to cancel.
Returns:
Payment: The cancelled payment.
"""
update_request = PaymentUpdateRequest(status="cancelled")
return self.update(payment_id, update_request)
async def cancel_async(self, payment_id: str) -> Payment:
"""
Cancel a payment asynchronously.
Args:
payment_id: The ID of the payment to cancel.
Returns:
Payment: The cancelled payment.
"""
update_request = PaymentUpdateRequest(status="cancelled")
return await self.update_async(payment_id, update_request)
def get_quote(self, source_currency: str, target_currency: str, amount: float, source_type: str = "source") -> PaymentQuote:
"""
Get a quote for a payment.
Args:
source_currency: Source currency code (ISO 4217)
target_currency: Target currency code (ISO 4217)
amount: Amount to convert
source_type: Whether the amount is in the source or target currency ('source' or 'target')
Returns:
PaymentQuote: The payment quote.
"""
url = self._build_url(suffix="quote")
payload = {
"source_currency": source_currency,
"target_currency": target_currency,
"amount": amount,
"source_type": source_type
}
if not self.client.__class__.__name__.startswith('Async'):
response = self.client._request("POST", url, json=payload)
return PaymentQuote.from_api_response(response.json())
else:
raise ValueError("Use get_quote_async for async clients")
async def get_quote_async(self, source_currency: str, target_currency: str, amount: float, source_type: str = "source") -> PaymentQuote:
"""
Get a quote for a payment asynchronously.
Args:
source_currency: Source currency code (ISO 4217)
target_currency: Target currency code (ISO 4217)
amount: Amount to convert
source_type: Whether the amount is in the source or target currency ('source' or 'target')
Returns:
PaymentQuote: The payment quote.
"""
url = self._build_url(suffix="quote")
payload = {
"source_currency": source_currency,
"target_currency": target_currency,
"amount": amount,
"source_type": source_type
}
if self.client.__class__.__name__.startswith('Async'):
response = await self.client._request("POST", url, json=payload)
return PaymentQuote.from_api_response(response.json())
else:
raise ValueError("Use get_quote for sync clients")