Files
Airwallex/backend/app/proxy_client.py
zqq61 b774270704 fix: 用http.client替代httpx认证,绕过CF指纹拦截
- 认证请求改用stdlib http.client,避免httpx TLS指纹被Cloudflare拦截
- login_as为空时不传x-login-as header
- 服务器直连api.airwallex.com已验证201成功
2026-03-16 00:55:34 +08:00

89 lines
3.3 KiB
Python

"""Airwallex client with proxy support (HTTP and SOCKS5).
Uses http.client for authentication to avoid Cloudflare TLS fingerprint blocking.
"""
import http.client
import json
import logging
import ssl
import httpx
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
from airwallex.client import AirwallexClient
logger = logging.getLogger(__name__)
class ProxiedAirwallexClient(AirwallexClient):
"""AirwallexClient that routes requests through an HTTP or SOCKS5 proxy.
Uses stdlib http.client for auth to bypass Cloudflare fingerprinting.
"""
def __init__(self, proxy_url: str | None = None, login_as: str | None = None, **kwargs):
self._proxy_url = proxy_url
self._login_as = login_as
super().__init__(**kwargs)
if proxy_url:
self._client.close()
self._client = httpx.Client(
base_url=self.base_url,
timeout=self.request_timeout,
proxy=proxy_url,
)
safe_url = proxy_url.split("@")[-1] if "@" in proxy_url else proxy_url
logger.info("Airwallex client initialized with proxy: %s", safe_url)
else:
logger.info("Airwallex client initialized without proxy")
def authenticate(self) -> None:
"""Authenticate using stdlib http.client to avoid CF blocking."""
if self._token and self._token_expiry and datetime.now(timezone.utc) < self._token_expiry:
return
parsed = urlparse(self.auth_url)
host = parsed.hostname
path = parsed.path
port = parsed.port or (443 if parsed.scheme == "https" else 80)
logger.info("Authenticating at %s (via http.client, login_as: %s)", self.auth_url, self._login_as or "none")
headers = {
"Content-Type": "application/json",
"x-client-id": self.client_id,
"x-api-key": self.api_key,
}
if self._login_as:
headers["x-login-as"] = self._login_as
try:
if parsed.scheme == "https":
ctx = ssl.create_default_context()
conn = http.client.HTTPSConnection(host, port, timeout=self.request_timeout, context=ctx)
else:
conn = http.client.HTTPConnection(host, port, timeout=self.request_timeout)
conn.request("POST", path, json.dumps({}).encode("utf-8"), headers)
resp = conn.getresponse()
body = resp.read().decode("utf-8")
if resp.status not in (200, 201):
logger.error("Auth failed: HTTP %d, body: %s", resp.status, body[:300])
raise Exception(f"Airwallex 认证失败 (HTTP {resp.status}): {body[:200]}")
data = json.loads(body)
self._token = data.get("token")
self._token_expiry = datetime.now(timezone.utc) + timedelta(minutes=28)
logger.info("Authentication successful, token expires in 28 minutes")
finally:
try:
conn.close()
except Exception:
pass
@property
def headers(self):
"""Add x-login-as to all API request headers if configured."""
h = super().headers
if self._login_as:
h["x-login-as"] = self._login_as
return h