#!/usr/bin/env python3 import os import time import base64 import logging import subprocess from kubernetes import client, config, watch from kubernetes.client.rest import ApiException from Crypto.PublicKey import RSA # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 配置 Kubernetes 客户端 try: config.load_incluster_config() # 如果在 Pod 内运行 except Exception as e: logging.error("Failed to load kube config: %s", e) exit(1) v1 = client.CoreV1Api() # 配置 Nginx 主机和目录 CLUSTER_HOST = "10.0.0.4" NGINX_HOST = "10.0.0.1" NGINX_USER = "nginx" NGINX_CERT_DIR = "/etc/nginx/certs" NGINX_CONF_DIR = "/etc/nginx/conf.d" NAMESPACE = "basic" MAX_RETRIES = 10 # 最大重试次数 RETRY_DELAY = 20 # 每次重试的时间间隔(秒) # 创建目录用于存放临时证书和密钥 TEMP_DIR = "/tmp/nginx_certs" RSA_DIR = "/root/.ssh" # 新增的文件夹,用于存放 SSH 密钥对 PUBLIC_KEY_COMMENT = "generated-key" os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(RSA_DIR, exist_ok=True) def generate_ssh_key_pair(): """ 生成 SSH 密钥对(私钥为 PEM,公钥为 OpenSSH 单行格式),并保存到 RSA_DIR。 私钥权限设为 600,公钥权限设为 644。 """ try: logging.info("Generating SSH key pair...") os.makedirs(RSA_DIR, exist_ok=True) # 生成 RSA 私钥 key = RSA.generate(2048) # 私钥(PEM,多行) private_key_bytes = key.export_key(format='PEM') # 公钥(OpenSSH 单行格式,例如: b"ssh-rsa AAAAB3NzaC1yc2E..."} public_key_bytes = key.publickey().export_key(format='OpenSSH') # 给公钥添加注释(可选),并确保以换行结尾 if PUBLIC_KEY_COMMENT: public_key_bytes = public_key_bytes + b' ' + PUBLIC_KEY_COMMENT.encode('utf-8') public_key_bytes = public_key_bytes + b'\n' private_key_path = os.path.join(RSA_DIR, "id_rsa") public_key_path = os.path.join(RSA_DIR, "id_rsa.pub") # 写入私钥与公钥 with open(private_key_path, "wb") as private_file: private_file.write(private_key_bytes) with open(public_key_path, "wb") as public_file: public_file.write(public_key_bytes) # 设置合理的文件权限 try: os.chmod(private_key_path, 0o600) os.chmod(public_key_path, 0o644) except PermissionError: logging.warning("No permission to chmod the key files; please set permissions manually if needed.") logging.info(f"SSH key pair saved to {private_key_path} and {public_key_path}") except Exception as e: logging.error("Failed to generate SSH key pair: %s", e) def upload_cert_with_retry(tmp_crt, tmp_key, domain): """ 尝试多次上传证书文件到 Nginx 服务器,直到成功或达到最大重试次数 """ retries = 0 while retries < MAX_RETRIES: try: logging.info(f"Attempting to upload certificate for {domain}, attempt {retries + 1}...") # 上传证书和私钥,使用 ssh + sudo tee for local_file, remote_file in [ (tmp_crt, f"{NGINX_CERT_DIR}/{domain}.pem"), (tmp_key, f"{NGINX_CERT_DIR}/{domain}.key") ]: ssh_command = [ "ssh", f"{NGINX_USER}@{NGINX_HOST}", f"sudo tee {remote_file} > /dev/null" ] with open(local_file, "rb") as f: subprocess.run(ssh_command, stdin=f, check=True) logging.info(f"Uploaded certificate and key for {domain} to Nginx server.") return # 上传成功,退出循环 except subprocess.CalledProcessError as e: retries += 1 logging.error(f"Attempt {retries} failed: {e}. Retrying in {RETRY_DELAY} seconds...") time.sleep(RETRY_DELAY) logging.error(f"Failed to upload certificate for {domain} after {MAX_RETRIES} attempts.") def upload_cert(domain, crt_data, key_data): """ 上传证书和私钥到 Nginx 中转服务器,并生成 Nginx 配置文件 """ try: # 临时保存证书和私钥 tmp_crt = os.path.join(TEMP_DIR, f"{domain}.crt") tmp_key = os.path.join(TEMP_DIR, f"{domain}.key") with open(tmp_crt, "wb") as f: f.write(base64.b64decode(crt_data)) with open(tmp_key, "wb") as f: f.write(base64.b64decode(key_data)) logging.info(f"Certificate and key for {domain} saved locally.") # 上传证书和私钥 upload_cert_with_retry(tmp_crt, tmp_key, domain) # 生成 Nginx 配置文件 conf_content = f""" server {{ listen 443 ssl http2; server_name {domain}; ssl_certificate /etc/nginx/certs/{domain}.pem; ssl_certificate_key /etc/nginx/certs/{domain}.key; location / {{ proxy_pass https://{CLUSTER_HOST}; proxy_ssl_server_name on; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; }} }} """ tmp_conf = os.path.join(TEMP_DIR, f"{domain}.conf") with open(tmp_conf, "w") as f: f.write(conf_content) # 上传 Nginx 配置文件 ssh_command = [ "ssh", f"{NGINX_USER}@{NGINX_HOST}", f"sudo tee {NGINX_CONF_DIR}/{domain}.conf > /dev/null" ] logging.info(f"Uploading Nginx config for {domain} using sudo tee...") with open(tmp_conf, "rb") as f: subprocess.run(ssh_command, stdin=f, check=True) logging.info(f"Uploaded Nginx config for {domain} to Nginx server via sudo tee.") # 重载 Nginx reload_command = ["ssh", f"{NGINX_USER}@{NGINX_HOST}", "sudo nginx -s reload"] subprocess.run(reload_command, check=True) logging.info(f"Nginx reloaded successfully for {domain}.") except subprocess.CalledProcessError as e: logging.error("Error during SSH commands: %s", e) except Exception as e: logging.error("Unexpected error: %s", e) def watch_secrets_all_ns(): """ 监听所有命名空间中的 Secrets,当发现以 -tls 结尾的 cert-manager 生成证书时上传到 Nginx """ logging.info("Starting to watch all Kubernetes secrets across namespaces.") w = watch.Watch() try: # 监听所有命名空间 for event in w.stream(v1.list_secret_for_all_namespaces): secret = event['object'] secret_name = secret.metadata.name annotations = secret.metadata.annotations or {} # 仅处理 cert-manager 生成的 TLS Secret if secret_name.endswith("-tls") and annotations.get("cert-manager.io/issuer-name"): if "tls.crt" in secret.data and "tls.key" in secret.data: domain = secret_name[:-4] # 去掉 -tls 后缀 logging.info(f"Found TLS secret {secret_name} for domain {domain}, uploading...") upload_cert(domain, secret.data["tls.crt"], secret.data["tls.key"]) except ApiException as e: logging.error("Kubernetes API exception: %s", e) except Exception as e: logging.error("Unexpected error: %s", e) exit(1) if __name__ == "__main__": # 生成 SSH 密钥对 generate_ssh_key_pair() # 开始监听 Secrets 变化 watch_secrets_all_ns()