diff --git a/src/cert-generator/alidns/index.py b/src/cert-generator/alidns/index.py index 7cb53f0..6c0c343 100644 --- a/src/cert-generator/alidns/index.py +++ b/src/cert-generator/alidns/index.py @@ -9,16 +9,25 @@ def get_domain_rr(): - tokens = os.environ["DEVS_DOMAIN"].split(".") + # Certbot 会为每个域名设置 CERTBOT_DOMAIN 环境变量 + # 如果不存在(例如测试环境),则回退到 DEVS_DOMAIN + domain = os.environ.get("CERTBOT_DOMAIN") or os.environ.get("DEVS_DOMAIN", "") + tokens = domain.split(".") del tokens[-2:] - if tokens[0] == "*": - tokens.pop(0) + + # 检查列表是否为空,避免根域名导致的 IndexError + if tokens: + if tokens[0] == "*": + tokens.pop(0) + tokens.insert(0, "_acme-challenge") return ".".join(tokens) def get_domain_name(): - tokens = os.environ["DEVS_DOMAIN"].split(".") + # Certbot 会为每个域名设置 CERTBOT_DOMAIN 环境变量 + domain = os.environ.get("CERTBOT_DOMAIN") or os.environ.get("DEVS_DOMAIN", "") + tokens = domain.split(".") return ".".join(tokens[-2:]) @@ -30,16 +39,22 @@ def check_if_valid_domain(): def insert_rr(domainName, rr): - print(os.environ["CERTBOT_VALIDATION"]) - client.add_domain_record_with_options( - alidns_20150109_models.AddDomainRecordRequest( - domain_name=domainName, - rr=rr, - type="TXT", - value=os.environ["CERTBOT_VALIDATION"], - ), - runtime, - ) + validation = os.environ.get("CERTBOT_VALIDATION", "") + print(f"[DNS API] Adding TXT record: {rr}.{domainName} = {validation[:20]}...") + try: + result = client.add_domain_record_with_options( + alidns_20150109_models.AddDomainRecordRequest( + domain_name=domainName, + rr=rr, + type="TXT", + value=validation, + ), + runtime, + ) + print(f"[DNS API] Record added successfully, RecordId: {result.body.record_id}") + except Exception as e: + print(f"[DNS API] Failed to add record: {str(e)}") + raise def delete_rr(record_id): @@ -49,15 +64,22 @@ def delete_rr(record_id): def update_rr(record_id, rr): - client.update_domain_record_with_options( - alidns_20150109_models.UpdateDomainRecordRequest( - record_id=record_id, - rr=rr, - type="TXT", - value=os.environ["CERTBOT_VALIDATION"], - ), - runtime, - ) + validation = os.environ.get("CERTBOT_VALIDATION", "") + print(f"[DNS API] Updating TXT record {record_id}: {rr} = {validation[:20]}...") + try: + client.update_domain_record_with_options( + alidns_20150109_models.UpdateDomainRecordRequest( + record_id=record_id, + rr=rr, + type="TXT", + value=validation, + ), + runtime, + ) + print(f"[DNS API] Record updated successfully") + except Exception as e: + print(f"[DNS API] Failed to update record: {str(e)}") + raise def get_domain_record_id(domainName, rr): @@ -75,6 +97,23 @@ def get_domain_record_id(domainName, rr): return record_id +def get_domain_record_id_by_value(domainName, rr, value): + """根据 RR 名和记录值精确匹配,返回 record_id。 + 用于 cleanup 时只删除当前 challenge 对应的那条 TXT 记录, + 避免误删同名但不同值的其他 ACME challenge 记录。 + """ + records = client.describe_domain_records_with_options( + alidns_20150109_models.DescribeDomainRecordsRequest( + domain_name=domainName, type_key_word="TXT", rrkey_word=rr + ), + runtime, + ) + for record in records.body.domain_records.record: + if record.rr == rr and record.value == value: + return record.record_id + return None + + def get_alidns_endpoint(): if os.environ["FC_REGION"] == "ap-southeast-1": return "alidns.ap-southeast-1.aliyuncs.com" diff --git a/src/cert-generator/authenticator.py b/src/cert-generator/authenticator.py index c4e98c0..2f2acbd 100644 --- a/src/cert-generator/authenticator.py +++ b/src/cert-generator/authenticator.py @@ -1,21 +1,42 @@ # -*- coding: utf-8 -*- +import os +import time +import sys + from alidns.index import ( - get_domain_record_id, - update_rr, insert_rr, get_domain_rr, get_domain_name, ) -domainName = get_domain_name() -rr = get_domain_rr() - -record_id = get_domain_record_id(domainName, rr) - -if record_id: - update_rr(record_id, rr) - print("dns record updated.") -else: +try: + domainName = get_domain_name() + rr = get_domain_rr() + validation = os.environ.get("CERTBOT_VALIDATION", "") + current_domain = os.environ.get("CERTBOT_DOMAIN", os.environ.get("DEVS_DOMAIN", "unknown")) + + print(f"[Authenticator] Processing domain: {current_domain}") + print(f"[Authenticator] Base domain: {domainName}") + print(f"[Authenticator] RR: {rr}") + print(f"[Authenticator] Validation: {validation[:20]}...") + + # 始终新增记录,不要更新已有记录! + # 原因:*.example.com 和 example.com 共用同一个 _acme-challenge RR 名, + # 但 ACME 验证需要两条不同值的 TXT 记录同时存在。 + # DNS 允许同名多条 TXT 记录,所以每次都应该 insert 而不是 update。 insert_rr(domainName, rr) - print("dns record added.") + print("[Authenticator] DNS record added.") + + # 等待 DNS 记录传播(重要!) + # Certbot 会为每个域名分别调用此脚本,所以每个域名都会等待 + # 阿里云 DNS 通常在 10-30 秒内生效 + # 注意:多域名证书的总等待时间 = 域名数量 × 等待时间 + wait_time = int(os.environ.get("DNS_PROPAGATION_WAIT", "30")) + print(f"[Authenticator] Waiting {wait_time} seconds for DNS propagation...") + time.sleep(wait_time) + print("[Authenticator] DNS propagation wait completed.") + +except Exception as e: + print(f"[Authenticator] ERROR: {str(e)}") + sys.exit(1) diff --git a/src/cert-generator/cleanup.py b/src/cert-generator/cleanup.py index ce99620..0352dac 100644 --- a/src/cert-generator/cleanup.py +++ b/src/cert-generator/cleanup.py @@ -1,12 +1,30 @@ # -*- coding: utf-8 -*- -from alidns.index import get_domain_record_id, get_domain_rr, get_domain_name, delete_rr +import os -domainName = get_domain_name() -rr = get_domain_rr() -record_id = get_domain_record_id(domainName, rr) +from alidns.index import get_domain_record_id_by_value, get_domain_rr, get_domain_name, delete_rr -if record_id: - delete_rr(record_id) - print("dns record delted.") +try: + domainName = get_domain_name() + rr = get_domain_rr() + validation = os.environ.get("CERTBOT_VALIDATION", "") + + print(f"[Cleanup] Domain: {domainName}") + print(f"[Cleanup] RR: {rr}") + print(f"[Cleanup] Validation: {validation[:20]}...") + + # 按 RR + value 精确匹配删除,避免误删同名的其他 ACME challenge 记录 + # (*.example.com 和 example.com 共用同一个 _acme-challenge RR 名) + record_id = get_domain_record_id_by_value(domainName, rr, validation) + + if record_id: + delete_rr(record_id) + print(f"[Cleanup] DNS record {record_id} deleted.") + else: + print("[Cleanup] No matching DNS record found to delete.") + +except Exception as e: + print(f"[Cleanup] WARNING: {str(e)}") + # cleanup 失败不影响证书生成,只记录警告 + pass diff --git a/src/cert-generator/index.py b/src/cert-generator/index.py index 849f998..254bb81 100644 --- a/src/cert-generator/index.py +++ b/src/cert-generator/index.py @@ -8,6 +8,11 @@ from cas.index import upload_cert, get_cert_by_name from alidns.index import check_if_valid_domain +try: + from certbot.crypto_util import get_names_from_cert +except ImportError: + get_names_from_cert = None + def check_if_less_than_seven_days(x): d = datetime.datetime.strptime(x, "%Y-%m-%d") @@ -18,25 +23,123 @@ def check_if_less_than_seven_days(x): def handler(event, context): evt = json.loads(event) domainName = evt.get("domainName") + + # 支持 domainName 为字符串或数组 if domainName is not None: - if domainName.startswith("https://") or domainName.startswith("http://"): - domainName = domainName.split("://")[1] - os.environ["DEVS_DOMAIN"] = domainName + # 如果是字符串,转换为列表 + if isinstance(domainName, str): + domainName = domainName.strip() + if not domainName: + raise Exception("domainName must be a non-empty string") + if domainName.startswith("https://") or domainName.startswith("http://"): + domainName = domainName.split("://", 1)[1].strip() + if not domainName: + raise Exception("domainName must be a non-empty string after normalization") + domainNames = [domainName] + # 如果是列表,清理每个域名 + elif isinstance(domainName, list): + if not domainName: + raise Exception("domainName list must be non-empty") + domainNames = [] + for d in domainName: + if not isinstance(d, str): + raise Exception("all domainName items must be non-empty strings") + d = d.strip() + if not d: + raise Exception("all domainName items must be non-empty strings") + if d.startswith("https://") or d.startswith("http://"): + d = d.split("://", 1)[1].strip() + if not d: + raise Exception("all domainName items must be non-empty strings after normalization") + domainNames.append(d) + else: + raise Exception("domainName must be a string or array") + + # 使用第一个域名作为主域名(用于 DNS 验证和证书存储路径) + if not domainNames: + raise Exception("no valid domainName provided") + primary_domain = domainNames[0] + os.environ["DEVS_DOMAIN"] = primary_domain + else: + raise Exception("no domainName provided") + certName = evt.get("certName") certificate_id = None if certName is not None: cert = get_cert_by_name(certName) if cert is not None: - if cert.common != domainName: - raise Exception("domainName and cert common name do not match") + # 从证书内容中解析所有 SAN 域名(使用 certbot/acme 模块) + existing_domains = [] + if hasattr(cert, 'cert') and cert.cert: + try: + if get_names_from_cert is None: + raise ImportError("certbot.crypto_util not available") + # 使用 certbot 的方法从 PEM 格式证书中提取所有域名(CN + SAN) + cert_bytes = cert.cert.encode('utf-8') + all_names = get_names_from_cert(cert_bytes) + existing_domains = all_names + print(f"Parsed {len(all_names)} domain(s) from certificate: {', '.join(all_names)}") + + # 方案二:补充检查 CN 是否在列表中(双重保险) + if hasattr(cert, 'common') and cert.common: + if cert.common not in existing_domains: + existing_domains.insert(0, cert.common) + print(f"Added CN '{cert.common}' to domain list") + except Exception as e: + print(f"Warning: Failed to parse SAN from certificate: {e}") + # 降级方案:从 API 字段中解析域名 + if hasattr(cert, 'common') and cert.common: + existing_domains.append(cert.common) + + # 解析 sans 字段 + if hasattr(cert, 'sans') and cert.sans: + print(f"DEBUG: sans field type: {type(cert.sans)}, value: {cert.sans}") + # sans 可能是字符串(逗号/空格/换行分隔)或列表 + if isinstance(cert.sans, str): + # 尝试多种分隔符:逗号、空格、换行 + sans_str = cert.sans.replace('\n', ',').replace(' ', ',') + sans_list = [s.strip() for s in sans_str.split(',') if s.strip()] + elif isinstance(cert.sans, list): + sans_list = cert.sans + else: + sans_list = [] + + # 添加所有SAN域名,去重 + for san in sans_list: + if san and san not in existing_domains: + existing_domains.append(san) + + if existing_domains: + print(f"Using fallback method, parsed domains: {', '.join(existing_domains)}") + + if existing_domains: + # 使用现有证书的域名列表,保持顺序一致 + domainNames = existing_domains + primary_domain = domainNames[0] + os.environ["DEVS_DOMAIN"] = primary_domain + print(f"Using domains from existing certificate: {', '.join(domainNames)}") + + # 检查证书是否需要更新 if check_if_less_than_seven_days(cert.end_date): certificate_id = cert.id else: print("Cert will not expire in 7 days") return cert.id - if domainName is None: - raise Exception("no domainName provided") - check_if_valid_domain() + + # 对所有请求的域名执行域名有效性检查,避免仅校验主域名 + original_devs_domain = os.environ.get("DEVS_DOMAIN") + try: + for domain in domainNames: + os.environ["DEVS_DOMAIN"] = domain + check_if_valid_domain() + finally: + # 恢复为主域名,保持后续逻辑的兼容性 + if original_devs_domain is not None: + os.environ["DEVS_DOMAIN"] = original_devs_domain + else: + os.environ["DEVS_DOMAIN"] = primary_domain + + # 构建 certbot 参数,支持多域名 certbot_args = [ "certonly", "--manual", @@ -52,22 +155,26 @@ def handler(event, context): "--key-type", "rsa", "--cert-name", - domainName, + primary_domain, "--email", "your_mail@mail.com", "--server", "https://acme-v02.api.letsencrypt.org/directory", - "--domains", - domainName, ] + + # 为每个域名添加 -d 参数(注意:用 -d 而不是 --domains) + for domain in domainNames: + certbot_args.extend(["-d", domain]) + + print(f"Requesting certificate for domains: {', '.join(domainNames)}") certbot.main.main(certbot_args) print("cert generated") exitCode = subprocess.call("/code/scripts/upload-certs.sh") if exitCode == 0: cert, key, cert_id = upload_cert( - "/etc/letsencrypt/live/" + domainName + "/fullchain.pem", - "/etc/letsencrypt/live/" + domainName + "/privkey.pkcs1.pem", - domainName, + "/etc/letsencrypt/live/" + primary_domain + "/fullchain.pem", + "/etc/letsencrypt/live/" + primary_domain + "/privkey.pkcs1.pem", + primary_domain, certName, certificate_id, )