#!/usr/bin/env python3 import os import re import socket def get_host_ip(): """Get the host IP address""" try: # Create a socket to determine the outgoing IP address s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) # Connect to Google DNS (doesn't send data) host_ip = s.getsockname()[0] s.close() return host_ip except Exception: return "127.0.0.1" # Fallback to localhost def get_user_input(prompt, default=None): """Interactive prompt with default values""" value = input(f"{prompt} [{default}]: ") or default return value def find_caddyfile(): """Check if the Caddyfile exists, otherwise ask for the path""" default_path = "./conf/Caddyfile" if os.path.exists(default_path): return default_path print("⚠ No Caddyfile found!") while True: custom_path = get_user_input("Enter the path to your Caddyfile") if os.path.exists(custom_path): return custom_path print("❌ File not found, please try again.") def parse_existing_entries(caddyfile_path): """Parse the existing Caddyfile to extract all configured domains""" existing_entries = {} try: with open(caddyfile_path, "r") as file: content = file.read() # First, normalize the content to make parsing more reliable # This removes comments and normalizes whitespace lines = [] in_comment = False for line in content.splitlines(): line = line.strip() if not line or line.startswith('#'): continue # Handle inline comments if '#' in line and not in_comment: line = line[:line.index('#')].strip() lines.append(line) normalized_content = '\n'.join(lines) # Use brace matching to properly extract domain blocks blocks = [] current_position = 0 while current_position < len(normalized_content): # Find the next domain block start block_start = normalized_content.find('{', current_position) if block_start == -1: break # Find corresponding domain definition domain_start = normalized_content.rfind('\n', 0, block_start) if domain_start == -1: domain_start = 0 else: domain_start += 1 # Skip the newline domain_def = normalized_content[domain_start:block_start].strip() # Find end of this block (accounting for nested braces) brace_count = 1 block_end = block_start + 1 while brace_count > 0 and block_end < len(normalized_content): if normalized_content[block_end] == '{': brace_count += 1 elif normalized_content[block_end] == '}': brace_count -= 1 block_end += 1 if brace_count == 0: # We found a complete block block_content = normalized_content[domain_start:block_end] # Only process blocks with reverse_proxy directives if 'reverse_proxy' in block_content: blocks.append((domain_def, block_content)) current_position = block_end # Process the extracted blocks for domain_def, block_content in blocks: # Extract target from reverse_proxy directive proxy_match = re.search(r'reverse_proxy\s+(https?:\/\/[\d\.]+:\d+|[\d\.]+:\d+)', block_content) if not proxy_match: continue target = proxy_match.group(1).strip() # Process domains (handle comma-separated lists correctly) domains = [d.strip() for d in domain_def.split(',')] # Process each domain for domain in domains: # Skip if it looks like a directive rather than a domain if '{' in domain or '}' in domain or not domain: continue # Skip literal "Host" that are likely from host header directives rather than domains if domain == "Host" or domain == "{host}": continue # Verify domain format (basic check) if not re.match(r'^[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9]$', domain) and not domain.startswith('*.'): print(f"⚠️ Skipping invalid domain format: '{domain}'") continue # Determine proxy type proxy_type = 'standard' if "https://" in target and "tls_insecure_skip_verify" in block_content: if "versions h1.1" in block_content: proxy_type = 'opnsense' else: proxy_type = 'https_skip_verify' # Store the entry existing_entries[domain] = { 'target': target, 'content': block_content, 'proxy_type': proxy_type } # Debug output for special cases if domain.lower() == "host": print(f"⚠️ Warning: Found domain named 'host': {domain}") except Exception as e: print(f"❌ Error reading Caddyfile: {e}") import traceback print(traceback.format_exc()) return existing_entries def format_caddy_entry(domains, target_ip, target_port, proxy_type): """Generate a properly formatted Caddy entry based on proxy type""" domain_list = ", ".join(domains) # Multiple domains in a single line if proxy_type == "standard": return f""" {domain_list} {{ tls {{ dns cloudflare {{env.CLOUDFLARE_API_TOKEN}} }} reverse_proxy {target_ip}:{target_port} }} """ elif proxy_type == "https_skip_verify": return f""" {domain_list} {{ tls {{ dns cloudflare {{env.CLOUDFLARE_API_TOKEN}} }} reverse_proxy https://{target_ip}:{target_port} {{ transport http {{ tls tls_insecure_skip_verify }} }} }} """ elif proxy_type == "opnsense": return f""" {domain_list} {{ tls {{ dns cloudflare {{env.CLOUDFLARE_API_TOKEN}} }} reverse_proxy https://{target_ip}:{target_port} {{ transport http {{ tls tls_insecure_skip_verify versions h1.1 # Enforce HTTP/1.1 }} header_up Host {{host}} header_up X-Real-IP {{remote_host}} header_up X-Forwarded-Proto {{scheme}} header_up X-Forwarded-For {{remote}} header_down Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" # Remove problematic headers header_up -Connection header_up -Upgrade }} }} """ def update_existing_entry(caddyfile_path, domain, new_entry): """Replace an existing entry for the given domain""" try: with open(caddyfile_path, "r") as file: content = file.read() # New improved pattern to correctly match complete domain blocks # This regex matches the domain block from start to finish, including all braces domain_pattern = fr'(?m)^(?:(?:{re.escape(domain)}|[^{{,\s]+(?:,\s*{re.escape(domain)})(?:,\s*[^{{,\s]+)*|{re.escape(domain)}(?:,\s*[^{{,\s]+)+))\s*{{(?:[^{{}}]|{{(?:[^{{}}]|{{[^{{}}]*}})*}})*}}' pattern = re.compile(domain_pattern, re.DOTALL) match = pattern.search(content) if match: # Replace the block containing this domain with the new entry new_content = content[:match.start()] + new_entry.strip() + content[match.end():] with open(caddyfile_path, "w") as file: file.write(new_content) print(f"✅ Updated entry for {domain}") else: print(f"⚠ Could not find exact entry for {domain}. Adding as new entry.") with open(caddyfile_path, "a") as file: file.write(new_entry) except Exception as e: print(f"❌ Error updating Caddyfile: {e}") print(f"Error details: {str(e)}") def add_caddy_entry(caddyfile_path): """Add new Caddy reverse proxy entries, showing existing entries first""" host_ip = get_host_ip() existing_entries = parse_existing_entries(caddyfile_path) print("\n📌 Existing Caddy Entries:") if existing_entries: for domain, data in existing_entries.items(): print(f" 🔹 {domain} → {data['target']}") else: print(" ⚠ No entries found.") while True: domain = get_user_input("\nEnter the domain you want to configure", "") if not domain: print("❌ No domain provided. Skipping entry.") continue # If domain exists, extract its current values existing_ip = host_ip existing_port = "8080" proxy_type = "standard" if domain in existing_entries: print(f"⚠ The domain {domain} already exists.") edit_existing = get_user_input("Do you want to edit this entry? (y/n)", "y").lower() == "y" if not edit_existing: continue existing_target = existing_entries[domain]['target'] proxy_type = existing_entries[domain]['proxy_type'] target_without_protocol = existing_target.replace("https://", "").replace("http://", "") if ":" in target_without_protocol: existing_ip, existing_port = target_without_protocol.split(":") else: existing_ip = target_without_protocol existing_port = "80" # Show host IP as an option target_ip_prompt = f"Enter the target IP (type 'host' for {host_ip})" target_ip = get_user_input(target_ip_prompt, existing_ip) # Replace 'host' with actual host IP if target_ip.lower() == 'host': target_ip = host_ip target_port = get_user_input("Enter the target port", existing_port) print("\nChoose the proxy mode:") print("1️⃣ Standard (No HTTPS changes)") print("2️⃣ Internal HTTPS (skip verify)") print("3️⃣ OPNsense Mode (skip verify + enforce HTTP/1.1)") # Pre-fill proxy type based on detected configuration mode_choice_default = "1" if proxy_type == "https_skip_verify": mode_choice_default = "2" elif proxy_type == "opnsense": mode_choice_default = "3" mode_choice = get_user_input("Enter option (1/2/3)", mode_choice_default) proxy_type = "standard" if mode_choice == "2": proxy_type = "https_skip_verify" elif mode_choice == "3": proxy_type = "opnsense" new_entry = format_caddy_entry([domain], target_ip, target_port, proxy_type) if domain in existing_entries: update_existing_entry(caddyfile_path, domain, new_entry) else: try: with open(caddyfile_path, "a") as file: file.write(new_entry) print(f"\n✅ New entry added: {domain} → {target_ip}:{target_port}") except Exception as e: print(f"\n❌ Error writing to Caddyfile: {e}") return # Ask if another entry should be added more_entries = get_user_input("\nDo you want to add or edit another entry? (y/n)", "n").lower() == "y" if not more_entries: break # Restart Caddy container restart_caddy = get_user_input("\nDo you want to restart the Caddy container? (y/n)", "y").lower() == "y" if restart_caddy: os.system("docker compose restart caddy") print("🔄 Caddy container restarted!") if __name__ == "__main__": caddyfile_path = find_caddyfile() add_caddy_entry(caddyfile_path)