- 设置__module__为airwallex.client,让SDK的__getattr__正确加载api模块 - 修复issuing_cardholder等API属性找不到的问题
92 lines
3.4 KiB
Python
92 lines
3.4 KiB
Python
"""Airwallex client with proxy support (HTTP and SOCKS5).
|
|
Uses http.client for authentication to avoid Cloudflare TLS fingerprint blocking.
|
|
"""
|
|
import http.client
|
|
import json
|
|
import logging
|
|
import ssl
|
|
import httpx
|
|
from datetime import datetime, timezone, timedelta
|
|
from urllib.parse import urlparse
|
|
from airwallex.client import AirwallexClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProxiedAirwallexClient(AirwallexClient):
|
|
"""AirwallexClient that routes requests through an HTTP or SOCKS5 proxy.
|
|
Uses stdlib http.client for auth to bypass Cloudflare fingerprinting.
|
|
"""
|
|
|
|
# Override __module__ so SDK's __getattr__ finds api modules under 'airwallex' package
|
|
__module__ = "airwallex.client"
|
|
|
|
def __init__(self, proxy_url: str | None = None, login_as: str | None = None, **kwargs):
|
|
self._proxy_url = proxy_url
|
|
self._login_as = login_as
|
|
super().__init__(**kwargs)
|
|
if proxy_url:
|
|
self._client.close()
|
|
self._client = httpx.Client(
|
|
base_url=self.base_url,
|
|
timeout=self.request_timeout,
|
|
proxy=proxy_url,
|
|
)
|
|
safe_url = proxy_url.split("@")[-1] if "@" in proxy_url else proxy_url
|
|
logger.info("Airwallex client initialized with proxy: %s", safe_url)
|
|
else:
|
|
logger.info("Airwallex client initialized without proxy")
|
|
|
|
def authenticate(self) -> None:
|
|
"""Authenticate using stdlib http.client to avoid CF blocking."""
|
|
if self._token and self._token_expiry and datetime.now(timezone.utc) < self._token_expiry:
|
|
return
|
|
|
|
parsed = urlparse(self.auth_url)
|
|
host = parsed.hostname
|
|
path = parsed.path
|
|
port = parsed.port or (443 if parsed.scheme == "https" else 80)
|
|
|
|
logger.info("Authenticating at %s (via http.client, login_as: %s)", self.auth_url, self._login_as or "none")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"x-client-id": self.client_id,
|
|
"x-api-key": self.api_key,
|
|
}
|
|
if self._login_as:
|
|
headers["x-login-as"] = self._login_as
|
|
|
|
try:
|
|
if parsed.scheme == "https":
|
|
ctx = ssl.create_default_context()
|
|
conn = http.client.HTTPSConnection(host, port, timeout=self.request_timeout, context=ctx)
|
|
else:
|
|
conn = http.client.HTTPConnection(host, port, timeout=self.request_timeout)
|
|
|
|
conn.request("POST", path, json.dumps({}).encode("utf-8"), headers)
|
|
resp = conn.getresponse()
|
|
body = resp.read().decode("utf-8")
|
|
|
|
if resp.status not in (200, 201):
|
|
logger.error("Auth failed: HTTP %d, body: %s", resp.status, body[:300])
|
|
raise Exception(f"Airwallex 认证失败 (HTTP {resp.status}): {body[:200]}")
|
|
|
|
data = json.loads(body)
|
|
self._token = data.get("token")
|
|
self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=28)
|
|
logger.info("Authentication successful, token expires in 28 minutes")
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
@property
|
|
def headers(self):
|
|
"""Add x-login-as to all API request headers if configured."""
|
|
h = super().headers
|
|
if self._login_as:
|
|
h["x-login-as"] = self._login_as
|
|
return h
|