Files
gpt-plus-gpt/tools/cf_email_subdomain.py
2026-03-15 20:48:19 +08:00

590 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Cloudflare 二级域名邮箱 DNS 批量配置工具
功能:选择域名,批量为二级域名创建邮箱所需的 DNS 记录MX + SPF + DKIM
支持并发创建、自动跳过已存在的记录
"""
import httpx
import sys
import time
import asyncio
# ============ 配置 ============
CF_API_TOKEN = "nJWYVC1s7P9PpEdELjRTlJaYSMs0XOML1auCv6R8"
CF_API_BASE = "https://api.cloudflare.com/client/v4"
PROXY = "http://192.168.1.2:10810"
MAX_CONCURRENCY = 5 # 最大并发数Cloudflare 限流,不宜太高)
MAX_RETRIES = 3 # 限流重试次数
# Cloudflare Email Routing MX 记录
MX_RECORDS = [
{"content": "route1.mx.cloudflare.net", "priority": 10},
{"content": "route2.mx.cloudflare.net", "priority": 20},
{"content": "route3.mx.cloudflare.net", "priority": 30},
]
SPF_VALUE = "v=spf1 include:_spf.mx.cloudflare.net ~all"
def sync_client():
return httpx.Client(
proxy=PROXY,
headers={
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
},
timeout=30,
)
def async_client():
return httpx.AsyncClient(
proxy=PROXY,
headers={
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
},
timeout=30,
)
def api_get_sync(client, path, params=None):
resp = client.get(f"{CF_API_BASE}{path}", params=params)
data = resp.json()
if not data["success"]:
print(f" API 错误: {data['errors']}")
return None
return data
def list_zones(client):
data = api_get_sync(client, "/zones", {"per_page": 50})
if not data:
return []
return data["result"]
def get_existing_records(client, zone_id):
"""获取 zone 下所有 DNS 记录,用于去重"""
all_records = []
page = 1
while True:
data = api_get_sync(client, f"/zones/{zone_id}/dns_records", {
"per_page": 100,
"page": page,
})
if not data:
break
all_records.extend(data["result"])
if page >= data["result_info"]["total_pages"]:
break
page += 1
return all_records
def get_dkim_record(client, zone_id, domain):
data = api_get_sync(client, f"/zones/{zone_id}/dns_records", {
"type": "TXT",
"name": f"cf2024-1._domainkey.{domain}",
})
if data and data["result"]:
return data["result"][0]["content"]
return None
def build_records_for_subdomain(domain, subdomain, dkim_value=None):
"""构建一个子域名所需的所有 DNS 记录"""
full_sub = f"{subdomain}.{domain}"
records = []
for mx in MX_RECORDS:
records.append({
"type": "MX",
"name": full_sub,
"content": mx["content"],
"priority": mx["priority"],
"ttl": 1,
"proxied": False,
"comment": f"Email routing for {full_sub}",
})
records.append({
"type": "TXT",
"name": full_sub,
"content": SPF_VALUE,
"ttl": 1,
"proxied": False,
"comment": f"SPF for {full_sub}",
})
if dkim_value:
records.append({
"type": "TXT",
"name": f"cf2024-1._domainkey.{full_sub}",
"content": dkim_value,
"ttl": 1,
"proxied": False,
"comment": f"DKIM for {full_sub}",
})
return records
def filter_existing(records_to_create, existing_records):
"""过滤掉已存在的记录"""
existing_set = set()
for r in existing_records:
key = (r["type"], r["name"], r["content"])
existing_set.add(key)
new_records = []
skipped = 0
for r in records_to_create:
key = (r["type"], r["name"], r["content"])
if key in existing_set:
skipped += 1
else:
new_records.append(r)
return new_records, skipped
async def create_record_async(sem, client, zone_id, record):
"""并发创建单条 DNS 记录,带限流重试"""
async with sem:
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = await client.post(
f"{CF_API_BASE}/zones/{zone_id}/dns_records",
json=record,
)
data = resp.json()
if data["success"]:
label = record["type"]
if record["type"] == "TXT":
if "spf1" in record["content"]:
label = "TXT(SPF)"
elif "DKIM" in record.get("comment", ""):
label = "TXT(DKIM)"
print(f" OK {label:10s} {record['name']} -> {record['content'][:50]}")
return "ok"
else:
# 检查是否限流 (code 971)
is_throttle = any(e.get("code") == 971 for e in data.get("errors", []))
if is_throttle and attempt < MAX_RETRIES:
print(f" THROTTLE {record['name']} 等待 5min 后重试 ({attempt}/{MAX_RETRIES})...")
await asyncio.sleep(300)
continue
print(f" FAIL {record['type']:10s} {record['name']} 错误: {data['errors']}")
return "throttle" if is_throttle else "fail"
except Exception as e:
if attempt < MAX_RETRIES:
wait = attempt * 2
print(f" ERR {record['name']} {e} 重试 ({attempt}/{MAX_RETRIES})...")
await asyncio.sleep(wait)
continue
print(f" ERR {record['type']:10s} {record['name']} 异常: {e}")
return "fail"
return "fail"
async def batch_create(zone_id, records):
"""批量并发创建 DNS 记录,每批之间等待 1 秒"""
total_success = 0
total_failed = 0
async with async_client() as client:
for i in range(0, len(records), MAX_CONCURRENCY):
batch = records[i:i + MAX_CONCURRENCY]
sem = asyncio.Semaphore(MAX_CONCURRENCY)
tasks = [create_record_async(sem, client, zone_id, r) for r in batch]
results = await asyncio.gather(*tasks)
total_success += sum(1 for r in results if r == "ok")
total_failed += sum(1 for r in results if r != "ok")
# 如果这批有限流,等 30 秒;否则等 1 秒
had_throttle = any(r == "throttle" for r in results)
if i + MAX_CONCURRENCY < len(records):
if had_throttle:
print(" 检测到限流,等待 5min...")
await asyncio.sleep(300)
else:
await asyncio.sleep(1)
return total_success, total_failed
def find_incomplete_subdomains(existing_records, domain, has_dkim=False):
"""找出邮箱 DNS 记录不完整的子域名"""
# 收集所有子域名的记录
sub_records = {} # subdomain -> {"mx": set(), "spf": bool, "dkim": bool}
mx_targets = {mx["content"] for mx in MX_RECORDS}
for r in existing_records:
name = r["name"]
# 跳过主域本身的记录
if name == domain:
continue
# 只关注 .domain 结尾的子域名
if not name.endswith(f".{domain}"):
continue
# 提取子域名部分
sub_part = name[: -(len(domain) + 1)]
# DKIM 记录: cf2024-1._domainkey.sub.domain
if sub_part.startswith("cf2024-1._domainkey."):
actual_sub = sub_part[len("cf2024-1._domainkey."):]
if actual_sub not in sub_records:
sub_records[actual_sub] = {"mx": set(), "spf": False, "dkim": False, "ids": []}
sub_records[actual_sub]["dkim"] = True
sub_records[actual_sub]["ids"].append(r["id"])
continue
# MX / TXT(SPF) 记录
if sub_part not in sub_records:
sub_records[sub_part] = {"mx": set(), "spf": False, "dkim": False, "ids": []}
if r["type"] == "MX" and r["content"] in mx_targets:
sub_records[sub_part]["mx"].add(r["content"])
sub_records[sub_part]["ids"].append(r["id"])
elif r["type"] == "TXT" and "spf1" in r.get("content", ""):
sub_records[sub_part]["spf"] = True
sub_records[sub_part]["ids"].append(r["id"])
# 判断完整性: 需要 3 MX + 1 SPF (+ 可选 DKIM)
incomplete = {}
complete = {}
for sub, info in sub_records.items():
mx_count = len(info["mx"])
is_complete = mx_count == 3 and info["spf"]
if has_dkim:
is_complete = is_complete and info["dkim"]
if not is_complete and info["ids"]: # 有记录但不完整
incomplete[sub] = info
elif is_complete:
complete[sub] = info
return incomplete, complete
def delete_records_sync(client, zone_id, record_ids):
"""同步删除 DNS 记录"""
success = 0
failed = 0
for rid in record_ids:
resp = client.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{rid}")
data = resp.json()
if data["success"]:
success += 1
else:
print(f" 删除失败 {rid}: {data['errors']}")
failed += 1
return success, failed
def cleanup_incomplete(client, zones):
"""检查并清理不完整的子域名邮箱记录"""
# 选择域名
print(f"\n选择要检查的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
while True:
choice = input(">>> ").strip().lower()
if choice == "all":
selected = zones[:]
break
parts = [p.strip() for p in choice.split(",") if p.strip()]
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
selected = [zones[int(p) - 1] for p in parts]
break
print("无效选择。")
total_deleted = 0
total_incomplete = 0
for zone in selected:
zone_id = zone["id"]
domain = zone["name"]
print(f"\n{'=' * 55}")
print(f" 检查域名: {domain}")
print(f"{'=' * 55}")
existing = get_existing_records(client, zone_id)
incomplete, complete = find_incomplete_subdomains(existing, domain)
print(f" 完整子域名: {len(complete)}")
print(f" 不完整子域名: {len(incomplete)}")
if not incomplete:
print(" 没有不完整的记录,跳过。")
continue
for sub, info in sorted(incomplete.items()):
mx_count = len(info["mx"])
print(f" {sub}.{domain} MX:{mx_count}/3 SPF:{'' if info['spf'] else ''} DKIM:{'' if info['dkim'] else ''} 记录数:{len(info['ids'])}")
total_incomplete += len(incomplete)
if total_incomplete == 0:
print("\n所有域名记录都是完整的。")
return
confirm = input(f"\n确认删除以上 {total_incomplete} 个不完整子域名的所有相关记录?[y/N]: ").strip().lower()
if confirm != "y":
print("已取消。")
return
# 执行删除
for zone in selected:
zone_id = zone["id"]
domain = zone["name"]
existing = get_existing_records(client, zone_id)
incomplete, _ = find_incomplete_subdomains(existing, domain)
if not incomplete:
continue
print(f"\n 删除 {domain} 的不完整记录...")
all_ids = []
for info in incomplete.values():
all_ids.extend(info["ids"])
success, failed = delete_records_sync(client, zone_id, all_ids)
total_deleted += success
print(f" 删除完成: 成功 {success}, 失败 {failed}")
print(f"\n总计删除: {total_deleted} 条记录")
def export_email_suffixes(client, zones):
"""导出解析完整的可用邮箱后缀列表"""
print(f"\n选择要导出的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
while True:
choice = input(">>> ").strip().lower()
if choice == "all":
selected = zones[:]
break
parts = [p.strip() for p in choice.split(",") if p.strip()]
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
selected = [zones[int(p) - 1] for p in parts]
break
print("无效选择。")
all_suffixes = []
for zone in selected:
zone_id = zone["id"]
domain = zone["name"]
print(f"\n 检查 {domain} ...")
existing = get_existing_records(client, zone_id)
incomplete, complete = find_incomplete_subdomains(existing, domain)
# 主域如果有完整邮箱记录也算
main_mx = set()
main_spf = False
for r in existing:
if r["name"] == domain:
if r["type"] == "MX" and r["content"] in {mx["content"] for mx in MX_RECORDS}:
main_mx.add(r["content"])
elif r["type"] == "TXT" and "spf1" in r.get("content", ""):
main_spf = True
if len(main_mx) == 3 and main_spf:
all_suffixes.append(domain)
# 完整的子域名
for sub in sorted(complete.keys()):
all_suffixes.append(f"{sub}.{domain}")
print(f" 完整: {len(complete)} 个子域名, 不完整: {len(incomplete)}")
print(f"\n{'=' * 55}")
print(f" 可用邮箱后缀: {len(all_suffixes)}")
print(f"{'=' * 55}")
# 打印数组格式
import json
print(f"\n{json.dumps(all_suffixes, ensure_ascii=False)}")
def main():
print("=" * 55)
print(" Cloudflare 二级域名邮箱 DNS 批量配置工具 (并发版)")
print("=" * 55)
client = sync_client()
# 主菜单
print("\n功能选择:")
print(" [1] 批量创建子域名邮箱 DNS")
print(" [2] 检查并清理不完整的记录")
print(" [3] 导出可用邮箱后缀列表")
while True:
mode = input("\n选择功能 [1-3]: ").strip()
if mode in ("1", "2", "3"):
break
print("无效选择。")
# 1. 列出域名
print("\n获取域名列表...")
zones = list_zones(client)
if not zones:
print("没有找到域名,请检查 API Token 权限。")
sys.exit(1)
print(f"\n找到 {len(zones)} 个域名:")
for i, z in enumerate(zones):
print(f" [{i + 1}] {z['name']} (状态: {z['status']})")
# 功能 2: 清理不完整记录
if mode == "2":
cleanup_incomplete(client, zones)
client.close()
return
# 功能 3: 导出可用邮箱后缀
if mode == "3":
export_email_suffixes(client, zones)
client.close()
return
# 2. 选择域名(支持多选)
print(f"\n选择域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
while True:
choice = input(">>> ").strip().lower()
if choice == "all":
selected_zones = zones[:]
break
parts = [p.strip() for p in choice.split(",") if p.strip()]
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
selected_zones = [zones[int(p) - 1] for p in parts]
break
print("无效选择,请重试。")
print(f"\n已选择 {len(selected_zones)} 个域名:")
for z in selected_zones:
print(f" - {z['name']}")
# 3. 输入二级域名数量和前缀
while True:
count = input("\n要创建多少个二级域名邮箱?: ").strip()
if count.isdigit() and int(count) > 0:
count = int(count)
break
print("请输入正整数。")
print("\n前缀模式:")
print(" [1] 数字前缀 (mail1, mail2, mail3...)")
print(" [2] 字母前缀 (a, b, c...)")
print(" [3] 自定义前缀 (输入逗号分隔的列表)")
while True:
prefix_choice = input("\n选择前缀模式 [1-3]: ").strip()
if prefix_choice in ("1", "2", "3"):
break
print("无效选择。")
subdomains = []
if prefix_choice == "1":
prefix = input("输入前缀 (默认 mail): ").strip() or "mail"
subdomains = [f"{prefix}{i}" for i in range(1, count + 1)]
elif prefix_choice == "2":
if count > 26:
print("字母前缀最多支持 26 个。")
sys.exit(1)
subdomains = [chr(ord('a') + i) for i in range(count)]
else:
custom = input("输入前缀列表 (逗号分隔): ").strip()
subdomains = [s.strip() for s in custom.split(",") if s.strip()]
if len(subdomains) != count:
print(f"警告:输入了 {len(subdomains)} 个前缀,与数量 {count} 不匹配,使用实际输入的。")
count = len(subdomains)
# 4. DKIM 策略
print("\nDKIM 策略:")
print(" [1] 复制主域 DKIM 记录")
print(" [2] 跳过 DKIM (只创建 MX + SPF)")
while True:
dkim_choice = input("\n选择 DKIM 策略 [1-2]: ").strip()
if dkim_choice in ("1", "2"):
break
print("无效选择。")
# 5. 对每个域名执行
grand_total_success = 0
grand_total_skipped = 0
grand_total_failed = 0
all_created_emails = []
for zi, zone in enumerate(selected_zones):
zone_id = zone["id"]
domain = zone["name"]
print(f"\n{'=' * 55}")
print(f" [{zi+1}/{len(selected_zones)}] 处理域名: {domain}")
print(f"{'=' * 55}")
# DKIM
dkim_value = None
if dkim_choice == "1":
print(" 获取主域 DKIM 记录...")
dkim_value = get_dkim_record(client, zone_id, domain)
if dkim_value:
print(f" 找到 DKIM 记录 (长度: {len(dkim_value)})")
else:
print(" 未找到主域 DKIM 记录,将跳过 DKIM。")
# 构建记录
all_records = []
for sub in subdomains:
all_records.extend(build_records_for_subdomain(domain, sub, dkim_value))
print(f" 待创建: {len(all_records)} 条记录")
# 去重
print(" 检查已有 DNS 记录 (去重)...")
existing = get_existing_records(client, zone_id)
print(f" 已有记录: {len(existing)}")
new_records, skipped = filter_existing(all_records, existing)
print(f" 跳过已存在: {skipped}")
print(f" 需新建: {len(new_records)}")
grand_total_skipped += skipped
if not new_records:
print(" 所有记录已存在,跳过此域名。")
continue
# 并发创建
print(f" 开始并发创建 (并发数: {MAX_CONCURRENCY})...")
start_time = time.time()
success, failed = asyncio.run(batch_create(zone_id, new_records))
elapsed = time.time() - start_time
grand_total_success += success
grand_total_failed += failed
print(f" 完成!成功: {success}, 失败: {failed}, 耗时: {elapsed:.1f}s")
for sub in subdomains:
all_created_emails.append(f"*@{sub}.{domain}")
client.close()
# 6. 总汇总
print(f"\n{'=' * 55}")
print(f" 全部完成!")
print(f" 域名数: {len(selected_zones)}")
print(f" 子域名数: {count} x {len(selected_zones)} = {count * len(selected_zones)}")
print(f" 成功: {grand_total_success}")
print(f" 跳过: {grand_total_skipped}")
print(f" 失败: {grand_total_failed}")
print(f"{'=' * 55}")
if grand_total_success > 0:
print("\n注意: 还需要在 Cloudflare Email Routing 中配置转发规则才能收邮件。")
if __name__ == "__main__":
main()