Files
Airwallex/backend/app/proxy_client.py
zqq61 81d9c3a7e1 feat: 支持x-login-as三参数认证、auth_url跟随base_url
- 认证请求增加x-login-as header支持连接账户
- auth_url根据base_url动态构建,不再硬编码生产环境
- 默认Base URL改为demo环境
- 设置页面新增Account ID字段,带tooltip说明
2026-03-16 00:44:27 +08:00

80 lines
2.9 KiB
Python

"""Airwallex client with proxy support (HTTP and SOCKS5)."""
import logging
import httpx
from datetime import datetime, timezone, timedelta
from typing import Optional
from airwallex.client import AirwallexClient
logger = logging.getLogger(__name__)
class ProxiedAirwallexClient(AirwallexClient):
"""AirwallexClient that routes requests through an HTTP or SOCKS5 proxy.
Also supports x-login-as header for connected accounts.
"""
def __init__(self, proxy_url: str | None = None, login_as: str | None = None, **kwargs):
self._proxy_url = proxy_url
self._login_as = login_as
super().__init__(**kwargs)
# Replace the default httpx client with a proxied one
if proxy_url:
self._client.close()
self._client = httpx.Client(
base_url=self.base_url,
timeout=self.request_timeout,
proxy=proxy_url,
)
safe_url = proxy_url.split("@")[-1] if "@" in proxy_url else proxy_url
logger.info("Airwallex client initialized with proxy: %s", safe_url)
else:
logger.info("Airwallex client initialized without proxy")
def authenticate(self) -> None:
"""Override authenticate to use proxy and x-login-as for auth requests."""
if self._token and self._token_expiry and datetime.now(timezone.utc) < self._token_expiry:
return
logger.info("Authenticating with Airwallex API at %s (proxy: %s, login_as: %s)",
self.auth_url, bool(self._proxy_url), self._login_as or "none")
auth_kwargs: dict = {"timeout": self.request_timeout}
if self._proxy_url:
auth_kwargs["proxy"] = self._proxy_url
auth_headers = {
"Content-Type": "application/json",
"x-client-id": self.client_id,
"x-api-key": self.api_key,
}
if self._login_as:
auth_headers["x-login-as"] = self._login_as
auth_client = httpx.Client(**auth_kwargs)
try:
response = auth_client.post(
self.auth_url,
headers=auth_headers,
content="{}",
)
if response.status_code != 200:
body = response.text[:300]
logger.error("Auth failed: HTTP %d, body: %s", response.status_code, body)
response.raise_for_status()
data = response.json()
self._token = data.get("token")
self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=28)
logger.info("Authentication successful, token expires in 28 minutes")
finally:
auth_client.close()
@property
def headers(self):
"""Add x-login-as to all API request headers."""
h = super().headers
if self._login_as:
h["x-login-as"] = self._login_as
return h