Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 62 additions & 23 deletions src/cert-generator/alidns/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@


def get_domain_rr():
tokens = os.environ["DEVS_DOMAIN"].split(".")
# Certbot 会为每个域名设置 CERTBOT_DOMAIN 环境变量
# 如果不存在(例如测试环境),则回退到 DEVS_DOMAIN
domain = os.environ.get("CERTBOT_DOMAIN") or os.environ.get("DEVS_DOMAIN", "")
tokens = domain.split(".")
del tokens[-2:]
if tokens[0] == "*":
tokens.pop(0)

# 检查列表是否为空,避免根域名导致的 IndexError
if tokens:
if tokens[0] == "*":
tokens.pop(0)

tokens.insert(0, "_acme-challenge")
return ".".join(tokens)


def get_domain_name():
tokens = os.environ["DEVS_DOMAIN"].split(".")
# Certbot 会为每个域名设置 CERTBOT_DOMAIN 环境变量
domain = os.environ.get("CERTBOT_DOMAIN") or os.environ.get("DEVS_DOMAIN", "")
tokens = domain.split(".")
return ".".join(tokens[-2:])


Expand All @@ -30,16 +39,22 @@ def check_if_valid_domain():


def insert_rr(domainName, rr):
print(os.environ["CERTBOT_VALIDATION"])
client.add_domain_record_with_options(
alidns_20150109_models.AddDomainRecordRequest(
domain_name=domainName,
rr=rr,
type="TXT",
value=os.environ["CERTBOT_VALIDATION"],
),
runtime,
)
validation = os.environ.get("CERTBOT_VALIDATION", "")
print(f"[DNS API] Adding TXT record: {rr}.{domainName} = {validation[:20]}...")
try:
result = client.add_domain_record_with_options(
alidns_20150109_models.AddDomainRecordRequest(
domain_name=domainName,
rr=rr,
type="TXT",
value=validation,
),
runtime,
)
print(f"[DNS API] Record added successfully, RecordId: {result.body.record_id}")
except Exception as e:
print(f"[DNS API] Failed to add record: {str(e)}")
raise


def delete_rr(record_id):
Expand All @@ -49,15 +64,22 @@ def delete_rr(record_id):


def update_rr(record_id, rr):
client.update_domain_record_with_options(
alidns_20150109_models.UpdateDomainRecordRequest(
record_id=record_id,
rr=rr,
type="TXT",
value=os.environ["CERTBOT_VALIDATION"],
),
runtime,
)
validation = os.environ.get("CERTBOT_VALIDATION", "")
print(f"[DNS API] Updating TXT record {record_id}: {rr} = {validation[:20]}...")
try:
client.update_domain_record_with_options(
alidns_20150109_models.UpdateDomainRecordRequest(
record_id=record_id,
rr=rr,
type="TXT",
value=validation,
),
runtime,
)
print(f"[DNS API] Record updated successfully")
except Exception as e:
print(f"[DNS API] Failed to update record: {str(e)}")
raise


def get_domain_record_id(domainName, rr):
Expand All @@ -75,6 +97,23 @@ def get_domain_record_id(domainName, rr):
return record_id


def get_domain_record_id_by_value(domainName, rr, value):
"""根据 RR 名和记录值精确匹配,返回 record_id。
用于 cleanup 时只删除当前 challenge 对应的那条 TXT 记录,
避免误删同名但不同值的其他 ACME challenge 记录。
"""
records = client.describe_domain_records_with_options(
alidns_20150109_models.DescribeDomainRecordsRequest(
domain_name=domainName, type_key_word="TXT", rrkey_word=rr
),
runtime,
)
for record in records.body.domain_records.record:
if record.rr == rr and record.value == value:
return record.record_id
return None


def get_alidns_endpoint():
if os.environ["FC_REGION"] == "ap-southeast-1":
return "alidns.ap-southeast-1.aliyuncs.com"
Expand Down
45 changes: 33 additions & 12 deletions src/cert-generator/authenticator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
# -*- coding: utf-8 -*-

import os
import time
import sys

from alidns.index import (
get_domain_record_id,
update_rr,
insert_rr,
get_domain_rr,
get_domain_name,
)

domainName = get_domain_name()
rr = get_domain_rr()

record_id = get_domain_record_id(domainName, rr)

if record_id:
update_rr(record_id, rr)
print("dns record updated.")
else:
try:
domainName = get_domain_name()
rr = get_domain_rr()
validation = os.environ.get("CERTBOT_VALIDATION", "")
current_domain = os.environ.get("CERTBOT_DOMAIN", os.environ.get("DEVS_DOMAIN", "unknown"))

print(f"[Authenticator] Processing domain: {current_domain}")
print(f"[Authenticator] Base domain: {domainName}")
print(f"[Authenticator] RR: {rr}")
print(f"[Authenticator] Validation: {validation[:20]}...")

# 始终新增记录,不要更新已有记录!
# 原因:*.example.com 和 example.com 共用同一个 _acme-challenge RR 名,
# 但 ACME 验证需要两条不同值的 TXT 记录同时存在。
# DNS 允许同名多条 TXT 记录,所以每次都应该 insert 而不是 update。
insert_rr(domainName, rr)
print("dns record added.")
print("[Authenticator] DNS record added.")

# 等待 DNS 记录传播(重要!)
# Certbot 会为每个域名分别调用此脚本,所以每个域名都会等待
# 阿里云 DNS 通常在 10-30 秒内生效
# 注意:多域名证书的总等待时间 = 域名数量 × 等待时间
wait_time = int(os.environ.get("DNS_PROPAGATION_WAIT", "30"))
print(f"[Authenticator] Waiting {wait_time} seconds for DNS propagation...")
time.sleep(wait_time)
print("[Authenticator] DNS propagation wait completed.")

except Exception as e:
print(f"[Authenticator] ERROR: {str(e)}")
sys.exit(1)
32 changes: 25 additions & 7 deletions src/cert-generator/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
# -*- coding: utf-8 -*-

from alidns.index import get_domain_record_id, get_domain_rr, get_domain_name, delete_rr
import os

domainName = get_domain_name()
rr = get_domain_rr()

record_id = get_domain_record_id(domainName, rr)
from alidns.index import get_domain_record_id_by_value, get_domain_rr, get_domain_name, delete_rr

if record_id:
delete_rr(record_id)
print("dns record delted.")
try:
domainName = get_domain_name()
rr = get_domain_rr()
validation = os.environ.get("CERTBOT_VALIDATION", "")

print(f"[Cleanup] Domain: {domainName}")
print(f"[Cleanup] RR: {rr}")
print(f"[Cleanup] Validation: {validation[:20]}...")

# 按 RR + value 精确匹配删除,避免误删同名的其他 ACME challenge 记录
# (*.example.com 和 example.com 共用同一个 _acme-challenge RR 名)
record_id = get_domain_record_id_by_value(domainName, rr, validation)

if record_id:
delete_rr(record_id)
print(f"[Cleanup] DNS record {record_id} deleted.")
else:
print("[Cleanup] No matching DNS record found to delete.")

except Exception as e:
print(f"[Cleanup] WARNING: {str(e)}")
# cleanup 失败不影响证书生成,只记录警告
pass
135 changes: 121 additions & 14 deletions src/cert-generator/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from cas.index import upload_cert, get_cert_by_name
from alidns.index import check_if_valid_domain

try:
from certbot.crypto_util import get_names_from_cert
except ImportError:
get_names_from_cert = None


def check_if_less_than_seven_days(x):
d = datetime.datetime.strptime(x, "%Y-%m-%d")
Expand All @@ -18,25 +23,123 @@ def check_if_less_than_seven_days(x):
def handler(event, context):
evt = json.loads(event)
domainName = evt.get("domainName")

# 支持 domainName 为字符串或数组
if domainName is not None:
if domainName.startswith("https://") or domainName.startswith("http://"):
domainName = domainName.split("://")[1]
os.environ["DEVS_DOMAIN"] = domainName
# 如果是字符串,转换为列表
if isinstance(domainName, str):
domainName = domainName.strip()
if not domainName:
raise Exception("domainName must be a non-empty string")
if domainName.startswith("https://") or domainName.startswith("http://"):
domainName = domainName.split("://", 1)[1].strip()
if not domainName:
raise Exception("domainName must be a non-empty string after normalization")
domainNames = [domainName]
# 如果是列表,清理每个域名
elif isinstance(domainName, list):
if not domainName:
raise Exception("domainName list must be non-empty")
domainNames = []
for d in domainName:
if not isinstance(d, str):
raise Exception("all domainName items must be non-empty strings")
d = d.strip()
if not d:
raise Exception("all domainName items must be non-empty strings")
if d.startswith("https://") or d.startswith("http://"):
d = d.split("://", 1)[1].strip()
if not d:
raise Exception("all domainName items must be non-empty strings after normalization")
domainNames.append(d)
else:
raise Exception("domainName must be a string or array")

# 使用第一个域名作为主域名(用于 DNS 验证和证书存储路径)
if not domainNames:
raise Exception("no valid domainName provided")
primary_domain = domainNames[0]
os.environ["DEVS_DOMAIN"] = primary_domain
else:
raise Exception("no domainName provided")

certName = evt.get("certName")
certificate_id = None
if certName is not None:
cert = get_cert_by_name(certName)
if cert is not None:
if cert.common != domainName:
raise Exception("domainName and cert common name do not match")
# 从证书内容中解析所有 SAN 域名(使用 certbot/acme 模块)
existing_domains = []
if hasattr(cert, 'cert') and cert.cert:
try:
if get_names_from_cert is None:
raise ImportError("certbot.crypto_util not available")
# 使用 certbot 的方法从 PEM 格式证书中提取所有域名(CN + SAN)
cert_bytes = cert.cert.encode('utf-8')
all_names = get_names_from_cert(cert_bytes)
existing_domains = all_names
print(f"Parsed {len(all_names)} domain(s) from certificate: {', '.join(all_names)}")

# 方案二:补充检查 CN 是否在列表中(双重保险)
if hasattr(cert, 'common') and cert.common:
if cert.common not in existing_domains:
existing_domains.insert(0, cert.common)
print(f"Added CN '{cert.common}' to domain list")
except Exception as e:
print(f"Warning: Failed to parse SAN from certificate: {e}")
# 降级方案:从 API 字段中解析域名
if hasattr(cert, 'common') and cert.common:
existing_domains.append(cert.common)

# 解析 sans 字段
if hasattr(cert, 'sans') and cert.sans:
print(f"DEBUG: sans field type: {type(cert.sans)}, value: {cert.sans}")
# sans 可能是字符串(逗号/空格/换行分隔)或列表
if isinstance(cert.sans, str):
# 尝试多种分隔符:逗号、空格、换行
sans_str = cert.sans.replace('\n', ',').replace(' ', ',')
sans_list = [s.strip() for s in sans_str.split(',') if s.strip()]
elif isinstance(cert.sans, list):
sans_list = cert.sans
else:
sans_list = []

# 添加所有SAN域名,去重
for san in sans_list:
if san and san not in existing_domains:
existing_domains.append(san)

if existing_domains:
print(f"Using fallback method, parsed domains: {', '.join(existing_domains)}")

if existing_domains:
# 使用现有证书的域名列表,保持顺序一致
domainNames = existing_domains
primary_domain = domainNames[0]
os.environ["DEVS_DOMAIN"] = primary_domain
print(f"Using domains from existing certificate: {', '.join(domainNames)}")

# 检查证书是否需要更新
if check_if_less_than_seven_days(cert.end_date):
certificate_id = cert.id
else:
print("Cert will not expire in 7 days")
return cert.id
if domainName is None:
raise Exception("no domainName provided")
check_if_valid_domain()

# 对所有请求的域名执行域名有效性检查,避免仅校验主域名
original_devs_domain = os.environ.get("DEVS_DOMAIN")
try:
for domain in domainNames:
os.environ["DEVS_DOMAIN"] = domain
check_if_valid_domain()
finally:
# 恢复为主域名,保持后续逻辑的兼容性
if original_devs_domain is not None:
os.environ["DEVS_DOMAIN"] = original_devs_domain
else:
os.environ["DEVS_DOMAIN"] = primary_domain

# 构建 certbot 参数,支持多域名
certbot_args = [
"certonly",
"--manual",
Expand All @@ -52,22 +155,26 @@ def handler(event, context):
"--key-type",
"rsa",
"--cert-name",
domainName,
primary_domain,
"--email",
"your_mail@mail.com",
"--server",
"https://acme-v02.api.letsencrypt.org/directory",
"--domains",
domainName,
]

# 为每个域名添加 -d 参数(注意:用 -d 而不是 --domains)
for domain in domainNames:
certbot_args.extend(["-d", domain])

print(f"Requesting certificate for domains: {', '.join(domainNames)}")
certbot.main.main(certbot_args)
print("cert generated")
exitCode = subprocess.call("/code/scripts/upload-certs.sh")
if exitCode == 0:
cert, key, cert_id = upload_cert(
"/etc/letsencrypt/live/" + domainName + "/fullchain.pem",
"/etc/letsencrypt/live/" + domainName + "/privkey.pkcs1.pem",
domainName,
"/etc/letsencrypt/live/" + primary_domain + "/fullchain.pem",
"/etc/letsencrypt/live/" + primary_domain + "/privkey.pkcs1.pem",
primary_domain,
certName,
certificate_id,
)
Expand Down