590 lines
20 KiB
Python
590 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Cloudflare 二级域名邮箱 DNS 批量配置工具
|
||
|
||
功能:选择域名,批量为二级域名创建邮箱所需的 DNS 记录(MX + SPF + DKIM)
|
||
支持并发创建、自动跳过已存在的记录
|
||
"""
|
||
|
||
import httpx
|
||
import sys
|
||
import time
|
||
import asyncio
|
||
|
||
# ============ 配置 ============
|
||
CF_API_TOKEN = "nJWYVC1s7P9PpEdELjRTlJaYSMs0XOML1auCv6R8"
|
||
CF_API_BASE = "https://api.cloudflare.com/client/v4"
|
||
PROXY = "http://192.168.1.2:10810"
|
||
MAX_CONCURRENCY = 5 # 最大并发数(Cloudflare 限流,不宜太高)
|
||
MAX_RETRIES = 3 # 限流重试次数
|
||
|
||
# Cloudflare Email Routing MX 记录
|
||
MX_RECORDS = [
|
||
{"content": "route1.mx.cloudflare.net", "priority": 10},
|
||
{"content": "route2.mx.cloudflare.net", "priority": 20},
|
||
{"content": "route3.mx.cloudflare.net", "priority": 30},
|
||
]
|
||
|
||
SPF_VALUE = "v=spf1 include:_spf.mx.cloudflare.net ~all"
|
||
|
||
|
||
def sync_client():
|
||
return httpx.Client(
|
||
proxy=PROXY,
|
||
headers={
|
||
"Authorization": f"Bearer {CF_API_TOKEN}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
timeout=30,
|
||
)
|
||
|
||
|
||
def async_client():
|
||
return httpx.AsyncClient(
|
||
proxy=PROXY,
|
||
headers={
|
||
"Authorization": f"Bearer {CF_API_TOKEN}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
timeout=30,
|
||
)
|
||
|
||
|
||
def api_get_sync(client, path, params=None):
|
||
resp = client.get(f"{CF_API_BASE}{path}", params=params)
|
||
data = resp.json()
|
||
if not data["success"]:
|
||
print(f" API 错误: {data['errors']}")
|
||
return None
|
||
return data
|
||
|
||
|
||
def list_zones(client):
|
||
data = api_get_sync(client, "/zones", {"per_page": 50})
|
||
if not data:
|
||
return []
|
||
return data["result"]
|
||
|
||
|
||
def get_existing_records(client, zone_id):
|
||
"""获取 zone 下所有 DNS 记录,用于去重"""
|
||
all_records = []
|
||
page = 1
|
||
while True:
|
||
data = api_get_sync(client, f"/zones/{zone_id}/dns_records", {
|
||
"per_page": 100,
|
||
"page": page,
|
||
})
|
||
if not data:
|
||
break
|
||
all_records.extend(data["result"])
|
||
if page >= data["result_info"]["total_pages"]:
|
||
break
|
||
page += 1
|
||
return all_records
|
||
|
||
|
||
def get_dkim_record(client, zone_id, domain):
|
||
data = api_get_sync(client, f"/zones/{zone_id}/dns_records", {
|
||
"type": "TXT",
|
||
"name": f"cf2024-1._domainkey.{domain}",
|
||
})
|
||
if data and data["result"]:
|
||
return data["result"][0]["content"]
|
||
return None
|
||
|
||
|
||
def build_records_for_subdomain(domain, subdomain, dkim_value=None):
|
||
"""构建一个子域名所需的所有 DNS 记录"""
|
||
full_sub = f"{subdomain}.{domain}"
|
||
records = []
|
||
|
||
for mx in MX_RECORDS:
|
||
records.append({
|
||
"type": "MX",
|
||
"name": full_sub,
|
||
"content": mx["content"],
|
||
"priority": mx["priority"],
|
||
"ttl": 1,
|
||
"proxied": False,
|
||
"comment": f"Email routing for {full_sub}",
|
||
})
|
||
|
||
records.append({
|
||
"type": "TXT",
|
||
"name": full_sub,
|
||
"content": SPF_VALUE,
|
||
"ttl": 1,
|
||
"proxied": False,
|
||
"comment": f"SPF for {full_sub}",
|
||
})
|
||
|
||
if dkim_value:
|
||
records.append({
|
||
"type": "TXT",
|
||
"name": f"cf2024-1._domainkey.{full_sub}",
|
||
"content": dkim_value,
|
||
"ttl": 1,
|
||
"proxied": False,
|
||
"comment": f"DKIM for {full_sub}",
|
||
})
|
||
|
||
return records
|
||
|
||
|
||
def filter_existing(records_to_create, existing_records):
|
||
"""过滤掉已存在的记录"""
|
||
existing_set = set()
|
||
for r in existing_records:
|
||
key = (r["type"], r["name"], r["content"])
|
||
existing_set.add(key)
|
||
|
||
new_records = []
|
||
skipped = 0
|
||
for r in records_to_create:
|
||
key = (r["type"], r["name"], r["content"])
|
||
if key in existing_set:
|
||
skipped += 1
|
||
else:
|
||
new_records.append(r)
|
||
|
||
return new_records, skipped
|
||
|
||
|
||
async def create_record_async(sem, client, zone_id, record):
|
||
"""并发创建单条 DNS 记录,带限流重试"""
|
||
async with sem:
|
||
for attempt in range(1, MAX_RETRIES + 1):
|
||
try:
|
||
resp = await client.post(
|
||
f"{CF_API_BASE}/zones/{zone_id}/dns_records",
|
||
json=record,
|
||
)
|
||
data = resp.json()
|
||
if data["success"]:
|
||
label = record["type"]
|
||
if record["type"] == "TXT":
|
||
if "spf1" in record["content"]:
|
||
label = "TXT(SPF)"
|
||
elif "DKIM" in record.get("comment", ""):
|
||
label = "TXT(DKIM)"
|
||
print(f" OK {label:10s} {record['name']} -> {record['content'][:50]}")
|
||
return "ok"
|
||
else:
|
||
# 检查是否限流 (code 971)
|
||
is_throttle = any(e.get("code") == 971 for e in data.get("errors", []))
|
||
if is_throttle and attempt < MAX_RETRIES:
|
||
print(f" THROTTLE {record['name']} 等待 5min 后重试 ({attempt}/{MAX_RETRIES})...")
|
||
await asyncio.sleep(300)
|
||
continue
|
||
print(f" FAIL {record['type']:10s} {record['name']} 错误: {data['errors']}")
|
||
return "throttle" if is_throttle else "fail"
|
||
except Exception as e:
|
||
if attempt < MAX_RETRIES:
|
||
wait = attempt * 2
|
||
print(f" ERR {record['name']} {e} 重试 ({attempt}/{MAX_RETRIES})...")
|
||
await asyncio.sleep(wait)
|
||
continue
|
||
print(f" ERR {record['type']:10s} {record['name']} 异常: {e}")
|
||
return "fail"
|
||
return "fail"
|
||
|
||
|
||
async def batch_create(zone_id, records):
|
||
"""批量并发创建 DNS 记录,每批之间等待 1 秒"""
|
||
total_success = 0
|
||
total_failed = 0
|
||
async with async_client() as client:
|
||
for i in range(0, len(records), MAX_CONCURRENCY):
|
||
batch = records[i:i + MAX_CONCURRENCY]
|
||
sem = asyncio.Semaphore(MAX_CONCURRENCY)
|
||
tasks = [create_record_async(sem, client, zone_id, r) for r in batch]
|
||
results = await asyncio.gather(*tasks)
|
||
total_success += sum(1 for r in results if r == "ok")
|
||
total_failed += sum(1 for r in results if r != "ok")
|
||
# 如果这批有限流,等 30 秒;否则等 1 秒
|
||
had_throttle = any(r == "throttle" for r in results)
|
||
if i + MAX_CONCURRENCY < len(records):
|
||
if had_throttle:
|
||
print(" 检测到限流,等待 5min...")
|
||
await asyncio.sleep(300)
|
||
else:
|
||
await asyncio.sleep(1)
|
||
return total_success, total_failed
|
||
|
||
|
||
def find_incomplete_subdomains(existing_records, domain, has_dkim=False):
|
||
"""找出邮箱 DNS 记录不完整的子域名"""
|
||
# 收集所有子域名的记录
|
||
sub_records = {} # subdomain -> {"mx": set(), "spf": bool, "dkim": bool}
|
||
mx_targets = {mx["content"] for mx in MX_RECORDS}
|
||
|
||
for r in existing_records:
|
||
name = r["name"]
|
||
# 跳过主域本身的记录
|
||
if name == domain:
|
||
continue
|
||
# 只关注 .domain 结尾的子域名
|
||
if not name.endswith(f".{domain}"):
|
||
continue
|
||
|
||
# 提取子域名部分
|
||
sub_part = name[: -(len(domain) + 1)]
|
||
|
||
# DKIM 记录: cf2024-1._domainkey.sub.domain
|
||
if sub_part.startswith("cf2024-1._domainkey."):
|
||
actual_sub = sub_part[len("cf2024-1._domainkey."):]
|
||
if actual_sub not in sub_records:
|
||
sub_records[actual_sub] = {"mx": set(), "spf": False, "dkim": False, "ids": []}
|
||
sub_records[actual_sub]["dkim"] = True
|
||
sub_records[actual_sub]["ids"].append(r["id"])
|
||
continue
|
||
|
||
# MX / TXT(SPF) 记录
|
||
if sub_part not in sub_records:
|
||
sub_records[sub_part] = {"mx": set(), "spf": False, "dkim": False, "ids": []}
|
||
|
||
if r["type"] == "MX" and r["content"] in mx_targets:
|
||
sub_records[sub_part]["mx"].add(r["content"])
|
||
sub_records[sub_part]["ids"].append(r["id"])
|
||
elif r["type"] == "TXT" and "spf1" in r.get("content", ""):
|
||
sub_records[sub_part]["spf"] = True
|
||
sub_records[sub_part]["ids"].append(r["id"])
|
||
|
||
# 判断完整性: 需要 3 MX + 1 SPF (+ 可选 DKIM)
|
||
incomplete = {}
|
||
complete = {}
|
||
for sub, info in sub_records.items():
|
||
mx_count = len(info["mx"])
|
||
is_complete = mx_count == 3 and info["spf"]
|
||
if has_dkim:
|
||
is_complete = is_complete and info["dkim"]
|
||
|
||
if not is_complete and info["ids"]: # 有记录但不完整
|
||
incomplete[sub] = info
|
||
elif is_complete:
|
||
complete[sub] = info
|
||
|
||
return incomplete, complete
|
||
|
||
|
||
def delete_records_sync(client, zone_id, record_ids):
|
||
"""同步删除 DNS 记录"""
|
||
success = 0
|
||
failed = 0
|
||
for rid in record_ids:
|
||
resp = client.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{rid}")
|
||
data = resp.json()
|
||
if data["success"]:
|
||
success += 1
|
||
else:
|
||
print(f" 删除失败 {rid}: {data['errors']}")
|
||
failed += 1
|
||
return success, failed
|
||
|
||
|
||
def cleanup_incomplete(client, zones):
|
||
"""检查并清理不完整的子域名邮箱记录"""
|
||
# 选择域名
|
||
print(f"\n选择要检查的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
|
||
while True:
|
||
choice = input(">>> ").strip().lower()
|
||
if choice == "all":
|
||
selected = zones[:]
|
||
break
|
||
parts = [p.strip() for p in choice.split(",") if p.strip()]
|
||
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
|
||
selected = [zones[int(p) - 1] for p in parts]
|
||
break
|
||
print("无效选择。")
|
||
|
||
total_deleted = 0
|
||
total_incomplete = 0
|
||
|
||
for zone in selected:
|
||
zone_id = zone["id"]
|
||
domain = zone["name"]
|
||
print(f"\n{'=' * 55}")
|
||
print(f" 检查域名: {domain}")
|
||
print(f"{'=' * 55}")
|
||
|
||
existing = get_existing_records(client, zone_id)
|
||
incomplete, complete = find_incomplete_subdomains(existing, domain)
|
||
|
||
print(f" 完整子域名: {len(complete)} 个")
|
||
print(f" 不完整子域名: {len(incomplete)} 个")
|
||
|
||
if not incomplete:
|
||
print(" 没有不完整的记录,跳过。")
|
||
continue
|
||
|
||
for sub, info in sorted(incomplete.items()):
|
||
mx_count = len(info["mx"])
|
||
print(f" {sub}.{domain} MX:{mx_count}/3 SPF:{'有' if info['spf'] else '无'} DKIM:{'有' if info['dkim'] else '无'} 记录数:{len(info['ids'])}")
|
||
|
||
total_incomplete += len(incomplete)
|
||
|
||
if total_incomplete == 0:
|
||
print("\n所有域名记录都是完整的。")
|
||
return
|
||
|
||
confirm = input(f"\n确认删除以上 {total_incomplete} 个不完整子域名的所有相关记录?[y/N]: ").strip().lower()
|
||
if confirm != "y":
|
||
print("已取消。")
|
||
return
|
||
|
||
# 执行删除
|
||
for zone in selected:
|
||
zone_id = zone["id"]
|
||
domain = zone["name"]
|
||
existing = get_existing_records(client, zone_id)
|
||
incomplete, _ = find_incomplete_subdomains(existing, domain)
|
||
|
||
if not incomplete:
|
||
continue
|
||
|
||
print(f"\n 删除 {domain} 的不完整记录...")
|
||
all_ids = []
|
||
for info in incomplete.values():
|
||
all_ids.extend(info["ids"])
|
||
|
||
success, failed = delete_records_sync(client, zone_id, all_ids)
|
||
total_deleted += success
|
||
print(f" 删除完成: 成功 {success}, 失败 {failed}")
|
||
|
||
print(f"\n总计删除: {total_deleted} 条记录")
|
||
|
||
|
||
def export_email_suffixes(client, zones):
|
||
"""导出解析完整的可用邮箱后缀列表"""
|
||
print(f"\n选择要导出的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
|
||
while True:
|
||
choice = input(">>> ").strip().lower()
|
||
if choice == "all":
|
||
selected = zones[:]
|
||
break
|
||
parts = [p.strip() for p in choice.split(",") if p.strip()]
|
||
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
|
||
selected = [zones[int(p) - 1] for p in parts]
|
||
break
|
||
print("无效选择。")
|
||
|
||
all_suffixes = []
|
||
|
||
for zone in selected:
|
||
zone_id = zone["id"]
|
||
domain = zone["name"]
|
||
print(f"\n 检查 {domain} ...")
|
||
|
||
existing = get_existing_records(client, zone_id)
|
||
incomplete, complete = find_incomplete_subdomains(existing, domain)
|
||
|
||
# 主域如果有完整邮箱记录也算
|
||
main_mx = set()
|
||
main_spf = False
|
||
for r in existing:
|
||
if r["name"] == domain:
|
||
if r["type"] == "MX" and r["content"] in {mx["content"] for mx in MX_RECORDS}:
|
||
main_mx.add(r["content"])
|
||
elif r["type"] == "TXT" and "spf1" in r.get("content", ""):
|
||
main_spf = True
|
||
if len(main_mx) == 3 and main_spf:
|
||
all_suffixes.append(domain)
|
||
|
||
# 完整的子域名
|
||
for sub in sorted(complete.keys()):
|
||
all_suffixes.append(f"{sub}.{domain}")
|
||
|
||
print(f" 完整: {len(complete)} 个子域名, 不完整: {len(incomplete)} 个")
|
||
|
||
print(f"\n{'=' * 55}")
|
||
print(f" 可用邮箱后缀: {len(all_suffixes)} 个")
|
||
print(f"{'=' * 55}")
|
||
|
||
# 打印数组格式
|
||
import json
|
||
print(f"\n{json.dumps(all_suffixes, ensure_ascii=False)}")
|
||
|
||
|
||
def main():
|
||
print("=" * 55)
|
||
print(" Cloudflare 二级域名邮箱 DNS 批量配置工具 (并发版)")
|
||
print("=" * 55)
|
||
|
||
client = sync_client()
|
||
|
||
# 主菜单
|
||
print("\n功能选择:")
|
||
print(" [1] 批量创建子域名邮箱 DNS")
|
||
print(" [2] 检查并清理不完整的记录")
|
||
print(" [3] 导出可用邮箱后缀列表")
|
||
|
||
while True:
|
||
mode = input("\n选择功能 [1-3]: ").strip()
|
||
if mode in ("1", "2", "3"):
|
||
break
|
||
print("无效选择。")
|
||
|
||
# 1. 列出域名
|
||
print("\n获取域名列表...")
|
||
zones = list_zones(client)
|
||
if not zones:
|
||
print("没有找到域名,请检查 API Token 权限。")
|
||
sys.exit(1)
|
||
|
||
print(f"\n找到 {len(zones)} 个域名:")
|
||
for i, z in enumerate(zones):
|
||
print(f" [{i + 1}] {z['name']} (状态: {z['status']})")
|
||
|
||
# 功能 2: 清理不完整记录
|
||
if mode == "2":
|
||
cleanup_incomplete(client, zones)
|
||
client.close()
|
||
return
|
||
|
||
# 功能 3: 导出可用邮箱后缀
|
||
if mode == "3":
|
||
export_email_suffixes(client, zones)
|
||
client.close()
|
||
return
|
||
|
||
# 2. 选择域名(支持多选)
|
||
print(f"\n选择域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:")
|
||
while True:
|
||
choice = input(">>> ").strip().lower()
|
||
if choice == "all":
|
||
selected_zones = zones[:]
|
||
break
|
||
parts = [p.strip() for p in choice.split(",") if p.strip()]
|
||
if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts:
|
||
selected_zones = [zones[int(p) - 1] for p in parts]
|
||
break
|
||
print("无效选择,请重试。")
|
||
|
||
print(f"\n已选择 {len(selected_zones)} 个域名:")
|
||
for z in selected_zones:
|
||
print(f" - {z['name']}")
|
||
|
||
# 3. 输入二级域名数量和前缀
|
||
while True:
|
||
count = input("\n要创建多少个二级域名邮箱?: ").strip()
|
||
if count.isdigit() and int(count) > 0:
|
||
count = int(count)
|
||
break
|
||
print("请输入正整数。")
|
||
|
||
print("\n前缀模式:")
|
||
print(" [1] 数字前缀 (mail1, mail2, mail3...)")
|
||
print(" [2] 字母前缀 (a, b, c...)")
|
||
print(" [3] 自定义前缀 (输入逗号分隔的列表)")
|
||
|
||
while True:
|
||
prefix_choice = input("\n选择前缀模式 [1-3]: ").strip()
|
||
if prefix_choice in ("1", "2", "3"):
|
||
break
|
||
print("无效选择。")
|
||
|
||
subdomains = []
|
||
if prefix_choice == "1":
|
||
prefix = input("输入前缀 (默认 mail): ").strip() or "mail"
|
||
subdomains = [f"{prefix}{i}" for i in range(1, count + 1)]
|
||
elif prefix_choice == "2":
|
||
if count > 26:
|
||
print("字母前缀最多支持 26 个。")
|
||
sys.exit(1)
|
||
subdomains = [chr(ord('a') + i) for i in range(count)]
|
||
else:
|
||
custom = input("输入前缀列表 (逗号分隔): ").strip()
|
||
subdomains = [s.strip() for s in custom.split(",") if s.strip()]
|
||
if len(subdomains) != count:
|
||
print(f"警告:输入了 {len(subdomains)} 个前缀,与数量 {count} 不匹配,使用实际输入的。")
|
||
count = len(subdomains)
|
||
|
||
# 4. DKIM 策略
|
||
print("\nDKIM 策略:")
|
||
print(" [1] 复制主域 DKIM 记录")
|
||
print(" [2] 跳过 DKIM (只创建 MX + SPF)")
|
||
|
||
while True:
|
||
dkim_choice = input("\n选择 DKIM 策略 [1-2]: ").strip()
|
||
if dkim_choice in ("1", "2"):
|
||
break
|
||
print("无效选择。")
|
||
|
||
# 5. 对每个域名执行
|
||
grand_total_success = 0
|
||
grand_total_skipped = 0
|
||
grand_total_failed = 0
|
||
all_created_emails = []
|
||
|
||
for zi, zone in enumerate(selected_zones):
|
||
zone_id = zone["id"]
|
||
domain = zone["name"]
|
||
print(f"\n{'=' * 55}")
|
||
print(f" [{zi+1}/{len(selected_zones)}] 处理域名: {domain}")
|
||
print(f"{'=' * 55}")
|
||
|
||
# DKIM
|
||
dkim_value = None
|
||
if dkim_choice == "1":
|
||
print(" 获取主域 DKIM 记录...")
|
||
dkim_value = get_dkim_record(client, zone_id, domain)
|
||
if dkim_value:
|
||
print(f" 找到 DKIM 记录 (长度: {len(dkim_value)})")
|
||
else:
|
||
print(" 未找到主域 DKIM 记录,将跳过 DKIM。")
|
||
|
||
# 构建记录
|
||
all_records = []
|
||
for sub in subdomains:
|
||
all_records.extend(build_records_for_subdomain(domain, sub, dkim_value))
|
||
print(f" 待创建: {len(all_records)} 条记录")
|
||
|
||
# 去重
|
||
print(" 检查已有 DNS 记录 (去重)...")
|
||
existing = get_existing_records(client, zone_id)
|
||
print(f" 已有记录: {len(existing)} 条")
|
||
|
||
new_records, skipped = filter_existing(all_records, existing)
|
||
print(f" 跳过已存在: {skipped} 条")
|
||
print(f" 需新建: {len(new_records)} 条")
|
||
|
||
grand_total_skipped += skipped
|
||
|
||
if not new_records:
|
||
print(" 所有记录已存在,跳过此域名。")
|
||
continue
|
||
|
||
# 并发创建
|
||
print(f" 开始并发创建 (并发数: {MAX_CONCURRENCY})...")
|
||
start_time = time.time()
|
||
success, failed = asyncio.run(batch_create(zone_id, new_records))
|
||
elapsed = time.time() - start_time
|
||
|
||
grand_total_success += success
|
||
grand_total_failed += failed
|
||
|
||
print(f" 完成!成功: {success}, 失败: {failed}, 耗时: {elapsed:.1f}s")
|
||
|
||
for sub in subdomains:
|
||
all_created_emails.append(f"*@{sub}.{domain}")
|
||
|
||
client.close()
|
||
|
||
# 6. 总汇总
|
||
print(f"\n{'=' * 55}")
|
||
print(f" 全部完成!")
|
||
print(f" 域名数: {len(selected_zones)} 个")
|
||
print(f" 子域名数: {count} x {len(selected_zones)} = {count * len(selected_zones)} 个")
|
||
print(f" 成功: {grand_total_success} 条")
|
||
print(f" 跳过: {grand_total_skipped} 条")
|
||
print(f" 失败: {grand_total_failed} 条")
|
||
print(f"{'=' * 55}")
|
||
|
||
if grand_total_success > 0:
|
||
print("\n注意: 还需要在 Cloudflare Email Routing 中配置转发规则才能收邮件。")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|