#!/usr/bin/env python3 """ Cloudflare 二级域名邮箱 DNS 批量配置工具 功能:选择域名,批量为二级域名创建邮箱所需的 DNS 记录(MX + SPF + DKIM) 支持并发创建、自动跳过已存在的记录 """ import httpx import sys import time import asyncio # ============ 配置 ============ CF_API_TOKEN = "nJWYVC1s7P9PpEdELjRTlJaYSMs0XOML1auCv6R8" CF_API_BASE = "https://api.cloudflare.com/client/v4" PROXY = "http://192.168.1.2:10810" MAX_CONCURRENCY = 5 # 最大并发数(Cloudflare 限流,不宜太高) MAX_RETRIES = 3 # 限流重试次数 # Cloudflare Email Routing MX 记录 MX_RECORDS = [ {"content": "route1.mx.cloudflare.net", "priority": 10}, {"content": "route2.mx.cloudflare.net", "priority": 20}, {"content": "route3.mx.cloudflare.net", "priority": 30}, ] SPF_VALUE = "v=spf1 include:_spf.mx.cloudflare.net ~all" def sync_client(): return httpx.Client( proxy=PROXY, headers={ "Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json", }, timeout=30, ) def async_client(): return httpx.AsyncClient( proxy=PROXY, headers={ "Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json", }, timeout=30, ) def api_get_sync(client, path, params=None): resp = client.get(f"{CF_API_BASE}{path}", params=params) data = resp.json() if not data["success"]: print(f" API 错误: {data['errors']}") return None return data def list_zones(client): data = api_get_sync(client, "/zones", {"per_page": 50}) if not data: return [] return data["result"] def get_existing_records(client, zone_id): """获取 zone 下所有 DNS 记录,用于去重""" all_records = [] page = 1 while True: data = api_get_sync(client, f"/zones/{zone_id}/dns_records", { "per_page": 100, "page": page, }) if not data: break all_records.extend(data["result"]) if page >= data["result_info"]["total_pages"]: break page += 1 return all_records def get_dkim_record(client, zone_id, domain): data = api_get_sync(client, f"/zones/{zone_id}/dns_records", { "type": "TXT", "name": f"cf2024-1._domainkey.{domain}", }) if data and data["result"]: return data["result"][0]["content"] return None def build_records_for_subdomain(domain, subdomain, dkim_value=None): """构建一个子域名所需的所有 DNS 记录""" full_sub = f"{subdomain}.{domain}" records = [] for mx in MX_RECORDS: records.append({ "type": "MX", "name": full_sub, "content": mx["content"], "priority": mx["priority"], "ttl": 1, "proxied": False, "comment": f"Email routing for {full_sub}", }) records.append({ "type": "TXT", "name": full_sub, "content": SPF_VALUE, "ttl": 1, "proxied": False, "comment": f"SPF for {full_sub}", }) if dkim_value: records.append({ "type": "TXT", "name": f"cf2024-1._domainkey.{full_sub}", "content": dkim_value, "ttl": 1, "proxied": False, "comment": f"DKIM for {full_sub}", }) return records def filter_existing(records_to_create, existing_records): """过滤掉已存在的记录""" existing_set = set() for r in existing_records: key = (r["type"], r["name"], r["content"]) existing_set.add(key) new_records = [] skipped = 0 for r in records_to_create: key = (r["type"], r["name"], r["content"]) if key in existing_set: skipped += 1 else: new_records.append(r) return new_records, skipped async def create_record_async(sem, client, zone_id, record): """并发创建单条 DNS 记录,带限流重试""" async with sem: for attempt in range(1, MAX_RETRIES + 1): try: resp = await client.post( f"{CF_API_BASE}/zones/{zone_id}/dns_records", json=record, ) data = resp.json() if data["success"]: label = record["type"] if record["type"] == "TXT": if "spf1" in record["content"]: label = "TXT(SPF)" elif "DKIM" in record.get("comment", ""): label = "TXT(DKIM)" print(f" OK {label:10s} {record['name']} -> {record['content'][:50]}") return "ok" else: # 检查是否限流 (code 971) is_throttle = any(e.get("code") == 971 for e in data.get("errors", [])) if is_throttle and attempt < MAX_RETRIES: print(f" THROTTLE {record['name']} 等待 5min 后重试 ({attempt}/{MAX_RETRIES})...") await asyncio.sleep(300) continue print(f" FAIL {record['type']:10s} {record['name']} 错误: {data['errors']}") return "throttle" if is_throttle else "fail" except Exception as e: if attempt < MAX_RETRIES: wait = attempt * 2 print(f" ERR {record['name']} {e} 重试 ({attempt}/{MAX_RETRIES})...") await asyncio.sleep(wait) continue print(f" ERR {record['type']:10s} {record['name']} 异常: {e}") return "fail" return "fail" async def batch_create(zone_id, records): """批量并发创建 DNS 记录,每批之间等待 1 秒""" total_success = 0 total_failed = 0 async with async_client() as client: for i in range(0, len(records), MAX_CONCURRENCY): batch = records[i:i + MAX_CONCURRENCY] sem = asyncio.Semaphore(MAX_CONCURRENCY) tasks = [create_record_async(sem, client, zone_id, r) for r in batch] results = await asyncio.gather(*tasks) total_success += sum(1 for r in results if r == "ok") total_failed += sum(1 for r in results if r != "ok") # 如果这批有限流,等 30 秒;否则等 1 秒 had_throttle = any(r == "throttle" for r in results) if i + MAX_CONCURRENCY < len(records): if had_throttle: print(" 检测到限流,等待 5min...") await asyncio.sleep(300) else: await asyncio.sleep(1) return total_success, total_failed def find_incomplete_subdomains(existing_records, domain, has_dkim=False): """找出邮箱 DNS 记录不完整的子域名""" # 收集所有子域名的记录 sub_records = {} # subdomain -> {"mx": set(), "spf": bool, "dkim": bool} mx_targets = {mx["content"] for mx in MX_RECORDS} for r in existing_records: name = r["name"] # 跳过主域本身的记录 if name == domain: continue # 只关注 .domain 结尾的子域名 if not name.endswith(f".{domain}"): continue # 提取子域名部分 sub_part = name[: -(len(domain) + 1)] # DKIM 记录: cf2024-1._domainkey.sub.domain if sub_part.startswith("cf2024-1._domainkey."): actual_sub = sub_part[len("cf2024-1._domainkey."):] if actual_sub not in sub_records: sub_records[actual_sub] = {"mx": set(), "spf": False, "dkim": False, "ids": []} sub_records[actual_sub]["dkim"] = True sub_records[actual_sub]["ids"].append(r["id"]) continue # MX / TXT(SPF) 记录 if sub_part not in sub_records: sub_records[sub_part] = {"mx": set(), "spf": False, "dkim": False, "ids": []} if r["type"] == "MX" and r["content"] in mx_targets: sub_records[sub_part]["mx"].add(r["content"]) sub_records[sub_part]["ids"].append(r["id"]) elif r["type"] == "TXT" and "spf1" in r.get("content", ""): sub_records[sub_part]["spf"] = True sub_records[sub_part]["ids"].append(r["id"]) # 判断完整性: 需要 3 MX + 1 SPF (+ 可选 DKIM) incomplete = {} complete = {} for sub, info in sub_records.items(): mx_count = len(info["mx"]) is_complete = mx_count == 3 and info["spf"] if has_dkim: is_complete = is_complete and info["dkim"] if not is_complete and info["ids"]: # 有记录但不完整 incomplete[sub] = info elif is_complete: complete[sub] = info return incomplete, complete def delete_records_sync(client, zone_id, record_ids): """同步删除 DNS 记录""" success = 0 failed = 0 for rid in record_ids: resp = client.delete(f"{CF_API_BASE}/zones/{zone_id}/dns_records/{rid}") data = resp.json() if data["success"]: success += 1 else: print(f" 删除失败 {rid}: {data['errors']}") failed += 1 return success, failed def cleanup_incomplete(client, zones): """检查并清理不完整的子域名邮箱记录""" # 选择域名 print(f"\n选择要检查的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:") while True: choice = input(">>> ").strip().lower() if choice == "all": selected = zones[:] break parts = [p.strip() for p in choice.split(",") if p.strip()] if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts: selected = [zones[int(p) - 1] for p in parts] break print("无效选择。") total_deleted = 0 total_incomplete = 0 for zone in selected: zone_id = zone["id"] domain = zone["name"] print(f"\n{'=' * 55}") print(f" 检查域名: {domain}") print(f"{'=' * 55}") existing = get_existing_records(client, zone_id) incomplete, complete = find_incomplete_subdomains(existing, domain) print(f" 完整子域名: {len(complete)} 个") print(f" 不完整子域名: {len(incomplete)} 个") if not incomplete: print(" 没有不完整的记录,跳过。") continue for sub, info in sorted(incomplete.items()): mx_count = len(info["mx"]) print(f" {sub}.{domain} MX:{mx_count}/3 SPF:{'有' if info['spf'] else '无'} DKIM:{'有' if info['dkim'] else '无'} 记录数:{len(info['ids'])}") total_incomplete += len(incomplete) if total_incomplete == 0: print("\n所有域名记录都是完整的。") return confirm = input(f"\n确认删除以上 {total_incomplete} 个不完整子域名的所有相关记录?[y/N]: ").strip().lower() if confirm != "y": print("已取消。") return # 执行删除 for zone in selected: zone_id = zone["id"] domain = zone["name"] existing = get_existing_records(client, zone_id) incomplete, _ = find_incomplete_subdomains(existing, domain) if not incomplete: continue print(f"\n 删除 {domain} 的不完整记录...") all_ids = [] for info in incomplete.values(): all_ids.extend(info["ids"]) success, failed = delete_records_sync(client, zone_id, all_ids) total_deleted += success print(f" 删除完成: 成功 {success}, 失败 {failed}") print(f"\n总计删除: {total_deleted} 条记录") def export_email_suffixes(client, zones): """导出解析完整的可用邮箱后缀列表""" print(f"\n选择要导出的域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:") while True: choice = input(">>> ").strip().lower() if choice == "all": selected = zones[:] break parts = [p.strip() for p in choice.split(",") if p.strip()] if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts: selected = [zones[int(p) - 1] for p in parts] break print("无效选择。") all_suffixes = [] for zone in selected: zone_id = zone["id"] domain = zone["name"] print(f"\n 检查 {domain} ...") existing = get_existing_records(client, zone_id) incomplete, complete = find_incomplete_subdomains(existing, domain) # 主域如果有完整邮箱记录也算 main_mx = set() main_spf = False for r in existing: if r["name"] == domain: if r["type"] == "MX" and r["content"] in {mx["content"] for mx in MX_RECORDS}: main_mx.add(r["content"]) elif r["type"] == "TXT" and "spf1" in r.get("content", ""): main_spf = True if len(main_mx) == 3 and main_spf: all_suffixes.append(domain) # 完整的子域名 for sub in sorted(complete.keys()): all_suffixes.append(f"{sub}.{domain}") print(f" 完整: {len(complete)} 个子域名, 不完整: {len(incomplete)} 个") print(f"\n{'=' * 55}") print(f" 可用邮箱后缀: {len(all_suffixes)} 个") print(f"{'=' * 55}") # 打印数组格式 import json print(f"\n{json.dumps(all_suffixes, ensure_ascii=False)}") def main(): print("=" * 55) print(" Cloudflare 二级域名邮箱 DNS 批量配置工具 (并发版)") print("=" * 55) client = sync_client() # 主菜单 print("\n功能选择:") print(" [1] 批量创建子域名邮箱 DNS") print(" [2] 检查并清理不完整的记录") print(" [3] 导出可用邮箱后缀列表") while True: mode = input("\n选择功能 [1-3]: ").strip() if mode in ("1", "2", "3"): break print("无效选择。") # 1. 列出域名 print("\n获取域名列表...") zones = list_zones(client) if not zones: print("没有找到域名,请检查 API Token 权限。") sys.exit(1) print(f"\n找到 {len(zones)} 个域名:") for i, z in enumerate(zones): print(f" [{i + 1}] {z['name']} (状态: {z['status']})") # 功能 2: 清理不完整记录 if mode == "2": cleanup_incomplete(client, zones) client.close() return # 功能 3: 导出可用邮箱后缀 if mode == "3": export_email_suffixes(client, zones) client.close() return # 2. 选择域名(支持多选) print(f"\n选择域名 [1-{len(zones)}],多选用逗号分隔,输入 all 选择全部:") while True: choice = input(">>> ").strip().lower() if choice == "all": selected_zones = zones[:] break parts = [p.strip() for p in choice.split(",") if p.strip()] if all(p.isdigit() and 1 <= int(p) <= len(zones) for p in parts) and parts: selected_zones = [zones[int(p) - 1] for p in parts] break print("无效选择,请重试。") print(f"\n已选择 {len(selected_zones)} 个域名:") for z in selected_zones: print(f" - {z['name']}") # 3. 输入二级域名数量和前缀 while True: count = input("\n要创建多少个二级域名邮箱?: ").strip() if count.isdigit() and int(count) > 0: count = int(count) break print("请输入正整数。") print("\n前缀模式:") print(" [1] 数字前缀 (mail1, mail2, mail3...)") print(" [2] 字母前缀 (a, b, c...)") print(" [3] 自定义前缀 (输入逗号分隔的列表)") while True: prefix_choice = input("\n选择前缀模式 [1-3]: ").strip() if prefix_choice in ("1", "2", "3"): break print("无效选择。") subdomains = [] if prefix_choice == "1": prefix = input("输入前缀 (默认 mail): ").strip() or "mail" subdomains = [f"{prefix}{i}" for i in range(1, count + 1)] elif prefix_choice == "2": if count > 26: print("字母前缀最多支持 26 个。") sys.exit(1) subdomains = [chr(ord('a') + i) for i in range(count)] else: custom = input("输入前缀列表 (逗号分隔): ").strip() subdomains = [s.strip() for s in custom.split(",") if s.strip()] if len(subdomains) != count: print(f"警告:输入了 {len(subdomains)} 个前缀,与数量 {count} 不匹配,使用实际输入的。") count = len(subdomains) # 4. DKIM 策略 print("\nDKIM 策略:") print(" [1] 复制主域 DKIM 记录") print(" [2] 跳过 DKIM (只创建 MX + SPF)") while True: dkim_choice = input("\n选择 DKIM 策略 [1-2]: ").strip() if dkim_choice in ("1", "2"): break print("无效选择。") # 5. 对每个域名执行 grand_total_success = 0 grand_total_skipped = 0 grand_total_failed = 0 all_created_emails = [] for zi, zone in enumerate(selected_zones): zone_id = zone["id"] domain = zone["name"] print(f"\n{'=' * 55}") print(f" [{zi+1}/{len(selected_zones)}] 处理域名: {domain}") print(f"{'=' * 55}") # DKIM dkim_value = None if dkim_choice == "1": print(" 获取主域 DKIM 记录...") dkim_value = get_dkim_record(client, zone_id, domain) if dkim_value: print(f" 找到 DKIM 记录 (长度: {len(dkim_value)})") else: print(" 未找到主域 DKIM 记录,将跳过 DKIM。") # 构建记录 all_records = [] for sub in subdomains: all_records.extend(build_records_for_subdomain(domain, sub, dkim_value)) print(f" 待创建: {len(all_records)} 条记录") # 去重 print(" 检查已有 DNS 记录 (去重)...") existing = get_existing_records(client, zone_id) print(f" 已有记录: {len(existing)} 条") new_records, skipped = filter_existing(all_records, existing) print(f" 跳过已存在: {skipped} 条") print(f" 需新建: {len(new_records)} 条") grand_total_skipped += skipped if not new_records: print(" 所有记录已存在,跳过此域名。") continue # 并发创建 print(f" 开始并发创建 (并发数: {MAX_CONCURRENCY})...") start_time = time.time() success, failed = asyncio.run(batch_create(zone_id, new_records)) elapsed = time.time() - start_time grand_total_success += success grand_total_failed += failed print(f" 完成!成功: {success}, 失败: {failed}, 耗时: {elapsed:.1f}s") for sub in subdomains: all_created_emails.append(f"*@{sub}.{domain}") client.close() # 6. 总汇总 print(f"\n{'=' * 55}") print(f" 全部完成!") print(f" 域名数: {len(selected_zones)} 个") print(f" 子域名数: {count} x {len(selected_zones)} = {count * len(selected_zones)} 个") print(f" 成功: {grand_total_success} 条") print(f" 跳过: {grand_total_skipped} 条") print(f" 失败: {grand_total_failed} 条") print(f"{'=' * 55}") if grand_total_success > 0: print("\n注意: 还需要在 Cloudflare Email Routing 中配置转发规则才能收邮件。") if __name__ == "__main__": main()