Initial sanitized code sync

This commit is contained in:
zqq61
2026-03-15 20:48:19 +08:00
commit 17ee51ba04
126 changed files with 22546 additions and 0 deletions

589
tools/cf_email_subdomain.py Normal file
View File

@@ -0,0 +1,589 @@
#!/usr/bin/env python3
"""
Cloudflare 二级域名邮箱 DNS 批量配置工具
功能:选择域名,批量为二级域名创建邮箱所需的 DNS 记录MX + SPF + DKIM
支持并发创建、自动跳过已存在的记录
"""
import httpx
import sys
import time
import asyncio
# ============ 配置 ============
CF_API_TOKEN = "nJWYVC1s7P9PpEdELjRTlJaYSMs0XOML1auCv6R8"
CF_API_BASE = "https://api.cloudflare.com/client/v4"
PROXY = "http://192.168.1.2:10810"
MAX_CONCURRENCY = 5 # 最大并发数Cloudflare 限流,不宜太高)
MAX_RETRIES = 3 # 限流重试次数
# Cloudflare Email Routing MX 记录
MX_RECORDS = [
{"content": "route1.mx.cloudflare.net", "priority": 10},
{"content": "route2.mx.cloudflare.net", "priority": 20},
{"content": "route3.mx.cloudflare.net", "priority": 30},
]
SPF_VALUE = "v=spf1 include:_spf.mx.cloudflare.net ~all"
def sync_client():
return httpx.Client(
proxy=PROXY,
headers={
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
},
timeout=30,
)
def async_client():
return httpx.AsyncClient(
proxy=PROXY,
headers={
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
},
timeout=30,
)
def api_get_sync(client, path, params=None):
resp = client.get(f"{CF_API_BASE}{path}", params=params)
data = resp.json()
if not data["success"]:
print(f" API 错误: {data['errors']}")
return None
return data
def list_zones(client):
data = api_get_sync(client, "/zones", {"per_page": 50})
if not data:
return []
return data["result"]
def get_existing_records(client, zone_id):
"""获取 zone 下所有 DNS 记录,用于去重"""
all_records = []
page = 1
while True:
data = api_get_sync(client, f"/zones/{zone_id}/dns_records", {
"per_page": 100,
"page": page,
})
if not data:
break
all_records.extend(data["result"])
if page >= data["result_info"]["total_pages"]:
break
page += 1
return all_records
def get_dkim_record(client, zone_id, domain):
data = api_get_sync(client, f"/zones/{zone_id}/dns_records", {
"type": "TXT",
"name": f"cf2024-1._domainkey.{domain}",
})
if data and data["result"]:
return data["result"][0]["content"]
return None
def build_records_for_subdomain(domain, subdomain, dkim_value=None):
"""构建一个子域名所需的所有 DNS 记录"""
full_sub = f"{subdomain}.{domain}"
records = []
for mx in MX_RECORDS:
records.append({
"type": "MX",
"name": full_sub,
"content": mx["content"],
"priority": mx["priority"],
"ttl": 1,
"proxied": False,
"comment": f"Email routing for {full_sub}",
})
records.append({
"type": "TXT",
"name": full_sub,
"content": SPF_VALUE,
"ttl": 1,
"proxied": False,
"comment": f"SPF for {full_sub}",
})
if dkim_value:
records.append({
"type": "TXT",
"name": f"cf2024-1._domainkey.{full_sub}",
"content": dkim_value,
"ttl": 1,
"proxied": False,
"comment": f"DKIM for {full_sub}",
})
return records
def filter_existing(records_to_create, existing_records):
"""过滤掉已存在的记录"""
existing_set = set()
for r in existing_records:
key = (r["type"], r["name"], r["content"])
existing_set.add(key)
new_records = []
skipped = 0
for r in records_to_create:
key = (r["type"], r["name"], r["content"])
if key in existing_set:
skipped += 1
else:
new_records.append(r)
return new_records, skipped
async def create_record_async(sem, client, zone_id, record):
"""并发创建单条 DNS 记录,带限流重试"""
async with sem:
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = await client.post(
f"{CF_API_BASE}/zones/{zone_id}/dns_records",
json=record,
)
data = resp.json()
if data["success"]:
label = record["type"]
if record["type"] == "TXT":
if "spf1" in record["content"]:
label = "TXT(SPF)"
elif "DKIM" in record.get("comment", ""):
label = "TXT(DKIM)"
print(f" OK {label:10s} {record['name']} -> {record['content'][:50]}")
return "ok"
else:
# 检查是否限流 (code 971)
is_throttle = any(e.get("code") == 971 for e in data.get("errors", []))
if is_throttle and attempt < MAX_RETRIES:
print(f" THROTTLE {record['name']} 等待 5min 后重试 ({attempt}/{MAX_RETRIES})...")
await asyncio.sleep(300)
continue
print(f" FAIL {record['type']:10s} {record['name']} 错误: {data['errors']}")
return "throttle" if is_throttle else "fail"
except Exception as e:
if attempt < MAX_RETRIES:
wait = attempt * 2
print(f" ERR {record['name']} {e} 重试 ({attempt}/{MAX_RETRIES})...")
await asyncio.sleep(wait)
continue
print(f" ERR {record['type']:10s} {record['name']} 异常: {e}")
return "fail"
return "fail"
async def batch_create(zone_id, records):
"""批量并发创建 DNS 记录,每批之间等待 1 秒"""
total_success = 0
total_failed = 0
async with async_client() as client:
for i in range(0, len(records), MAX_CONCURRENCY):
batch = records[i:i + MAX_CONCURRENCY]
sem = asyncio.Semaphore(MAX_CONCURRENCY)
tasks = [create_record_async(sem, client, zone_id, r) for r in batch]
results = await asyncio.gather(*tasks)
total_success += sum(1 for r in results if r == "ok")
total_failed += sum(1 for r in results if r != "ok")
# 如果这批有限流,等 30 秒;否则等 1 秒
had_throttle = any(r == "throttle" for r in results)
if i + MAX_CONCURRENCY < len(records):
if had_throttle:
print(" 检测到限流,等待 5min...")
await asyncio.sleep(300)
else:
await asyncio.sleep(1)
return total_success, total_failed
def find_incomplete_subdomains(existing_records, domain, has_dkim=False):
"""找出邮箱 DNS 记录不完整的子域名"""
# 收集所有子域名的记录
sub_records = {} # subdomain -> {"mx": set(), "spf": bool, "dkim": bool}
mx_targets = {mx["content"] for mx in MX_RECORDS}
for r in existing_records:
name = r["name"]
# 跳过主域本身的记录
if name == domain:
continue
# 只关注 .domain 结尾的子域名
if not name.endswith(f".{domain}"):
continue
# 提取子域名部分
sub_part = name[: -(len(domain) + 1)]
# DKIM 记录: cf2024-1._domainkey.sub.domain
if sub_part.startswith("cf2024-1._domainkey."):
actual_sub = sub_part[len("cf2024-1._domainkey."):]
if actual_sub not in sub_records:
sub_records[actual_sub] = {"mx": set(), "spf": False, "dkim": False, "ids": []}
sub_records[actual_sub]["dkim"] = True
sub_records[actual_sub]["ids"].append(r["id"])
continue
# MX / TXT(SPF) 记录
if sub_part not in sub_records:
sub_records[sub_part] = {"mx": set(), "spf": False, "dkim": False, "ids": []}
if r["type"] == "MX" and r["content"] in mx_targets:
sub_records[sub_part]["mx"].add(r["content"])
sub_records[sub_part]["ids"].append(r["id"])
elif r["type"] == "TXT" and "spf1" in r.get("content", ""):
sub_records[sub_part]["spf"] = True
sub_records[sub_part]["ids"].append(r["id"])
# 判断完整性: 需要 3 MX + 1 SPF (+ 可选 DKIM)
incomplete = {}
complete = {}
for sub, info in sub_records.items():
mx_count = len(info["mx"])
is_complete = mx_count == 3 and info["spf"]
if has_dkim:
is_complete = is_complete and info["dkim"]
if not is_complete and info["ids"]: # 有记录但不完整
incomplete[sub] = info
elif is_complete:
complete[sub] = info
return incomplete, complete
def delete_records_sync(client, zone_id, record_ids):
"""同步删除 DNS 记录"""
success = 0
failed = 0
for rid in record_ids:
resp = client.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{rid}")
data = resp.json()
if data["success"]:
success += 1
else:
print(f" 删除失败 {rid}: {data['errors']}")
failed += 1
return success, failed
def cleanup_incomplete(client, zones):
"""检查并清理不完整的子域名邮箱记录"""
# 选择域名
print(f"\n选择要检查的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
while True:
choice = input(">>> ").strip().lower()
if choice == "all":
selected = zones[:]
break
parts = [p.strip() for p in choice.split(",") if p.strip()]
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
selected = [zones[int(p) - 1] for p in parts]
break
print("无效选择。")
total_deleted = 0
total_incomplete = 0
for zone in selected:
zone_id = zone["id"]
domain = zone["name"]
print(f"\n{'=' * 55}")
print(f" 检查域名: {domain}")
print(f"{'=' * 55}")
existing = get_existing_records(client, zone_id)
incomplete, complete = find_incomplete_subdomains(existing, domain)
print(f" 完整子域名: {len(complete)}")
print(f" 不完整子域名: {len(incomplete)}")
if not incomplete:
print(" 没有不完整的记录,跳过。")
continue
for sub, info in sorted(incomplete.items()):
mx_count = len(info["mx"])
print(f" {sub}.{domain} MX:{mx_count}/3 SPF:{'' if info['spf'] else ''} DKIM:{'' if info['dkim'] else ''} 记录数:{len(info['ids'])}")
total_incomplete += len(incomplete)
if total_incomplete == 0:
print("\n所有域名记录都是完整的。")
return
confirm = input(f"\n确认删除以上 {total_incomplete} 个不完整子域名的所有相关记录?[y/N]: ").strip().lower()
if confirm != "y":
print("已取消。")
return
# 执行删除
for zone in selected:
zone_id = zone["id"]
domain = zone["name"]
existing = get_existing_records(client, zone_id)
incomplete, _ = find_incomplete_subdomains(existing, domain)
if not incomplete:
continue
print(f"\n 删除 {domain} 的不完整记录...")
all_ids = []
for info in incomplete.values():
all_ids.extend(info["ids"])
success, failed = delete_records_sync(client, zone_id, all_ids)
total_deleted += success
print(f" 删除完成: 成功 {success}, 失败 {failed}")
print(f"\n总计删除: {total_deleted} 条记录")
def export_email_suffixes(client, zones):
"""导出解析完整的可用邮箱后缀列表"""
print(f"\n选择要导出的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
while True:
choice = input(">>> ").strip().lower()
if choice == "all":
selected = zones[:]
break
parts = [p.strip() for p in choice.split(",") if p.strip()]
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
selected = [zones[int(p) - 1] for p in parts]
break
print("无效选择。")
all_suffixes = []
for zone in selected:
zone_id = zone["id"]
domain = zone["name"]
print(f"\n 检查 {domain} ...")
existing = get_existing_records(client, zone_id)
incomplete, complete = find_incomplete_subdomains(existing, domain)
# 主域如果有完整邮箱记录也算
main_mx = set()
main_spf = False
for r in existing:
if r["name"] == domain:
if r["type"] == "MX" and r["content"] in {mx["content"] for mx in MX_RECORDS}:
main_mx.add(r["content"])
elif r["type"] == "TXT" and "spf1" in r.get("content", ""):
main_spf = True
if len(main_mx) == 3 and main_spf:
all_suffixes.append(domain)
# 完整的子域名
for sub in sorted(complete.keys()):
all_suffixes.append(f"{sub}.{domain}")
print(f" 完整: {len(complete)} 个子域名, 不完整: {len(incomplete)}")
print(f"\n{'=' * 55}")
print(f" 可用邮箱后缀: {len(all_suffixes)}")
print(f"{'=' * 55}")
# 打印数组格式
import json
print(f"\n{json.dumps(all_suffixes, ensure_ascii=False)}")
def main():
print("=" * 55)
print(" Cloudflare 二级域名邮箱 DNS 批量配置工具 (并发版)")
print("=" * 55)
client = sync_client()
# 主菜单
print("\n功能选择:")
print(" [1] 批量创建子域名邮箱 DNS")
print(" [2] 检查并清理不完整的记录")
print(" [3] 导出可用邮箱后缀列表")
while True:
mode = input("\n选择功能 [1-3]: ").strip()
if mode in ("1", "2", "3"):
break
print("无效选择。")
# 1. 列出域名
print("\n获取域名列表...")
zones = list_zones(client)
if not zones:
print("没有找到域名,请检查 API Token 权限。")
sys.exit(1)
print(f"\n找到 {len(zones)} 个域名:")
for i, z in enumerate(zones):
print(f" [{i + 1}] {z['name']} (状态: {z['status']})")
# 功能 2: 清理不完整记录
if mode == "2":
cleanup_incomplete(client, zones)
client.close()
return
# 功能 3: 导出可用邮箱后缀
if mode == "3":
export_email_suffixes(client, zones)
client.close()
return
# 2. 选择域名(支持多选)
print(f"\n选择域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
while True:
choice = input(">>> ").strip().lower()
if choice == "all":
selected_zones = zones[:]
break
parts = [p.strip() for p in choice.split(",") if p.strip()]
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
selected_zones = [zones[int(p) - 1] for p in parts]
break
print("无效选择,请重试。")
print(f"\n已选择 {len(selected_zones)} 个域名:")
for z in selected_zones:
print(f" - {z['name']}")
# 3. 输入二级域名数量和前缀
while True:
count = input("\n要创建多少个二级域名邮箱?: ").strip()
if count.isdigit() and int(count) > 0:
count = int(count)
break
print("请输入正整数。")
print("\n前缀模式:")
print(" [1] 数字前缀 (mail1, mail2, mail3...)")
print(" [2] 字母前缀 (a, b, c...)")
print(" [3] 自定义前缀 (输入逗号分隔的列表)")
while True:
prefix_choice = input("\n选择前缀模式 [1-3]: ").strip()
if prefix_choice in ("1", "2", "3"):
break
print("无效选择。")
subdomains = []
if prefix_choice == "1":
prefix = input("输入前缀 (默认 mail): ").strip() or "mail"
subdomains = [f"{prefix}{i}" for i in range(1, count + 1)]
elif prefix_choice == "2":
if count > 26:
print("字母前缀最多支持 26 个。")
sys.exit(1)
subdomains = [chr(ord('a') + i) for i in range(count)]
else:
custom = input("输入前缀列表 (逗号分隔): ").strip()
subdomains = [s.strip() for s in custom.split(",") if s.strip()]
if len(subdomains) != count:
print(f"警告:输入了 {len(subdomains)} 个前缀,与数量 {count} 不匹配,使用实际输入的。")
count = len(subdomains)
# 4. DKIM 策略
print("\nDKIM 策略:")
print(" [1] 复制主域 DKIM 记录")
print(" [2] 跳过 DKIM (只创建 MX + SPF)")
while True:
dkim_choice = input("\n选择 DKIM 策略 [1-2]: ").strip()
if dkim_choice in ("1", "2"):
break
print("无效选择。")
# 5. 对每个域名执行
grand_total_success = 0
grand_total_skipped = 0
grand_total_failed = 0
all_created_emails = []
for zi, zone in enumerate(selected_zones):
zone_id = zone["id"]
domain = zone["name"]
print(f"\n{'=' * 55}")
print(f" [{zi+1}/{len(selected_zones)}] 处理域名: {domain}")
print(f"{'=' * 55}")
# DKIM
dkim_value = None
if dkim_choice == "1":
print(" 获取主域 DKIM 记录...")
dkim_value = get_dkim_record(client, zone_id, domain)
if dkim_value:
print(f" 找到 DKIM 记录 (长度: {len(dkim_value)})")
else:
print(" 未找到主域 DKIM 记录,将跳过 DKIM。")
# 构建记录
all_records = []
for sub in subdomains:
all_records.extend(build_records_for_subdomain(domain, sub, dkim_value))
print(f" 待创建: {len(all_records)} 条记录")
# 去重
print(" 检查已有 DNS 记录 (去重)...")
existing = get_existing_records(client, zone_id)
print(f" 已有记录: {len(existing)}")
new_records, skipped = filter_existing(all_records, existing)
print(f" 跳过已存在: {skipped}")
print(f" 需新建: {len(new_records)}")
grand_total_skipped += skipped
if not new_records:
print(" 所有记录已存在,跳过此域名。")
continue
# 并发创建
print(f" 开始并发创建 (并发数: {MAX_CONCURRENCY})...")
start_time = time.time()
success, failed = asyncio.run(batch_create(zone_id, new_records))
elapsed = time.time() - start_time
grand_total_success += success
grand_total_failed += failed
print(f" 完成!成功: {success}, 失败: {failed}, 耗时: {elapsed:.1f}s")
for sub in subdomains:
all_created_emails.append(f"*@{sub}.{domain}")
client.close()
# 6. 总汇总
print(f"\n{'=' * 55}")
print(f" 全部完成!")
print(f" 域名数: {len(selected_zones)}")
print(f" 子域名数: {count} x {len(selected_zones)} = {count * len(selected_zones)}")
print(f" 成功: {grand_total_success}")
print(f" 跳过: {grand_total_skipped}")
print(f" 失败: {grand_total_failed}")
print(f"{'=' * 55}")
if grand_total_success > 0:
print("\n注意: 还需要在 Cloudflare Email Routing 中配置转发规则才能收邮件。")
if __name__ == "__main__":
main()