python/Caddy/addEntry.py
2025-03-30 13:29:50 +02:00

331 lines
12 KiB
Python
Executable file
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import re
import socket
def get_host_ip():
"""Get the host IP address"""
try:
# Create a socket to determine the outgoing IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Connect to Google DNS (doesn't send data)
host_ip = s.getsockname()[0]
s.close()
return host_ip
except Exception:
return "127.0.0.1" # Fallback to localhost
def get_user_input(prompt, default=None):
"""Interactive prompt with default values"""
value = input(f"{prompt} [{default}]: ") or default
return value
def find_caddyfile():
"""Check if the Caddyfile exists, otherwise ask for the path"""
default_path = "./conf/Caddyfile"
if os.path.exists(default_path):
return default_path
print("⚠ No Caddyfile found!")
while True:
custom_path = get_user_input("Enter the path to your Caddyfile")
if os.path.exists(custom_path):
return custom_path
print("❌ File not found, please try again.")
def parse_existing_entries(caddyfile_path):
"""Parse the existing Caddyfile to extract all configured domains"""
existing_entries = {}
try:
with open(caddyfile_path, "r") as file:
content = file.read()
# First, normalize the content to make parsing more reliable
# This removes comments and normalizes whitespace
lines = []
in_comment = False
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
# Handle inline comments
if '#' in line and not in_comment:
line = line[:line.index('#')].strip()
lines.append(line)
normalized_content = '\n'.join(lines)
# Use brace matching to properly extract domain blocks
blocks = []
current_position = 0
while current_position < len(normalized_content):
# Find the next domain block start
block_start = normalized_content.find('{', current_position)
if block_start == -1:
break
# Find corresponding domain definition
domain_start = normalized_content.rfind('\n', 0, block_start)
if domain_start == -1:
domain_start = 0
else:
domain_start += 1 # Skip the newline
domain_def = normalized_content[domain_start:block_start].strip()
# Find end of this block (accounting for nested braces)
brace_count = 1
block_end = block_start + 1
while brace_count > 0 and block_end < len(normalized_content):
if normalized_content[block_end] == '{':
brace_count += 1
elif normalized_content[block_end] == '}':
brace_count -= 1
block_end += 1
if brace_count == 0:
# We found a complete block
block_content = normalized_content[domain_start:block_end]
# Only process blocks with reverse_proxy directives
if 'reverse_proxy' in block_content:
blocks.append((domain_def, block_content))
current_position = block_end
# Process the extracted blocks
for domain_def, block_content in blocks:
# Extract target from reverse_proxy directive
proxy_match = re.search(r'reverse_proxy\s+(https?:\/\/[\d\.]+:\d+|[\d\.]+:\d+)', block_content)
if not proxy_match:
continue
target = proxy_match.group(1).strip()
# Process domains (handle comma-separated lists correctly)
domains = [d.strip() for d in domain_def.split(',')]
# Process each domain
for domain in domains:
# Skip if it looks like a directive rather than a domain
if '{' in domain or '}' in domain or not domain:
continue
# Skip literal "Host" that are likely from host header directives rather than domains
if domain == "Host" or domain == "{host}":
continue
# Verify domain format (basic check)
if not re.match(r'^[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9]$', domain) and not domain.startswith('*.'):
print(f"⚠️ Skipping invalid domain format: '{domain}'")
continue
# Determine proxy type
proxy_type = 'standard'
if "https://" in target and "tls_insecure_skip_verify" in block_content:
if "versions h1.1" in block_content:
proxy_type = 'opnsense'
else:
proxy_type = 'https_skip_verify'
# Store the entry
existing_entries[domain] = {
'target': target,
'content': block_content,
'proxy_type': proxy_type
}
# Debug output for special cases
if domain.lower() == "host":
print(f"⚠️ Warning: Found domain named 'host': {domain}")
except Exception as e:
print(f"❌ Error reading Caddyfile: {e}")
import traceback
print(traceback.format_exc())
return existing_entries
def format_caddy_entry(domains, target_ip, target_port, proxy_type):
"""Generate a properly formatted Caddy entry based on proxy type"""
domain_list = ", ".join(domains) # Multiple domains in a single line
if proxy_type == "standard":
return f"""
{domain_list} {{
tls {{
dns cloudflare {{env.CLOUDFLARE_API_TOKEN}}
}}
reverse_proxy {target_ip}:{target_port}
}}
"""
elif proxy_type == "https_skip_verify":
return f"""
{domain_list} {{
tls {{
dns cloudflare {{env.CLOUDFLARE_API_TOKEN}}
}}
reverse_proxy https://{target_ip}:{target_port} {{
transport http {{
tls
tls_insecure_skip_verify
}}
}}
}}
"""
elif proxy_type == "opnsense":
return f"""
{domain_list} {{
tls {{
dns cloudflare {{env.CLOUDFLARE_API_TOKEN}}
}}
reverse_proxy https://{target_ip}:{target_port} {{
transport http {{
tls
tls_insecure_skip_verify
versions h1.1 # Enforce HTTP/1.1
}}
header_up Host {{host}}
header_up X-Real-IP {{remote_host}}
header_up X-Forwarded-Proto {{scheme}}
header_up X-Forwarded-For {{remote}}
header_down Strict-Transport-Security "max-age=31536000; includeSubDomains; preload"
# Remove problematic headers
header_up -Connection
header_up -Upgrade
}}
}}
"""
def update_existing_entry(caddyfile_path, domain, new_entry):
"""Replace an existing entry for the given domain"""
try:
with open(caddyfile_path, "r") as file:
content = file.read()
# New improved pattern to correctly match complete domain blocks
# This regex matches the domain block from start to finish, including all braces
domain_pattern = fr'(?m)^(?:(?:{re.escape(domain)}|[^{{,\s]+(?:,\s*{re.escape(domain)})(?:,\s*[^{{,\s]+)*|{re.escape(domain)}(?:,\s*[^{{,\s]+)+))\s*{{(?:[^{{}}]|{{(?:[^{{}}]|{{[^{{}}]*}})*}})*}}'
pattern = re.compile(domain_pattern, re.DOTALL)
match = pattern.search(content)
if match:
# Replace the block containing this domain with the new entry
new_content = content[:match.start()] + new_entry.strip() + content[match.end():]
with open(caddyfile_path, "w") as file:
file.write(new_content)
print(f"✅ Updated entry for {domain}")
else:
print(f"⚠ Could not find exact entry for {domain}. Adding as new entry.")
with open(caddyfile_path, "a") as file:
file.write(new_entry)
except Exception as e:
print(f"❌ Error updating Caddyfile: {e}")
print(f"Error details: {str(e)}")
def add_caddy_entry(caddyfile_path):
"""Add new Caddy reverse proxy entries, showing existing entries first"""
host_ip = get_host_ip()
existing_entries = parse_existing_entries(caddyfile_path)
print("\n📌 Existing Caddy Entries:")
if existing_entries:
for domain, data in existing_entries.items():
print(f" 🔹 {domain}{data['target']}")
else:
print(" ⚠ No entries found.")
while True:
domain = get_user_input("\nEnter the domain you want to configure", "")
if not domain:
print("❌ No domain provided. Skipping entry.")
continue
# If domain exists, extract its current values
existing_ip = host_ip
existing_port = "8080"
proxy_type = "standard"
if domain in existing_entries:
print(f"⚠ The domain {domain} already exists.")
edit_existing = get_user_input("Do you want to edit this entry? (y/n)", "y").lower() == "y"
if not edit_existing:
continue
existing_target = existing_entries[domain]['target']
proxy_type = existing_entries[domain]['proxy_type']
target_without_protocol = existing_target.replace("https://", "").replace("http://", "")
if ":" in target_without_protocol:
existing_ip, existing_port = target_without_protocol.split(":")
else:
existing_ip = target_without_protocol
existing_port = "80"
# Show host IP as an option
target_ip_prompt = f"Enter the target IP (type 'host' for {host_ip})"
target_ip = get_user_input(target_ip_prompt, existing_ip)
# Replace 'host' with actual host IP
if target_ip.lower() == 'host':
target_ip = host_ip
target_port = get_user_input("Enter the target port", existing_port)
print("\nChoose the proxy mode:")
print("1⃣ Standard (No HTTPS changes)")
print("2⃣ Internal HTTPS (skip verify)")
print("3⃣ OPNsense Mode (skip verify + enforce HTTP/1.1)")
# Pre-fill proxy type based on detected configuration
mode_choice_default = "1"
if proxy_type == "https_skip_verify":
mode_choice_default = "2"
elif proxy_type == "opnsense":
mode_choice_default = "3"
mode_choice = get_user_input("Enter option (1/2/3)", mode_choice_default)
proxy_type = "standard"
if mode_choice == "2":
proxy_type = "https_skip_verify"
elif mode_choice == "3":
proxy_type = "opnsense"
new_entry = format_caddy_entry([domain], target_ip, target_port, proxy_type)
if domain in existing_entries:
update_existing_entry(caddyfile_path, domain, new_entry)
else:
try:
with open(caddyfile_path, "a") as file:
file.write(new_entry)
print(f"\n✅ New entry added: {domain}{target_ip}:{target_port}")
except Exception as e:
print(f"\n❌ Error writing to Caddyfile: {e}")
return
# Ask if another entry should be added
more_entries = get_user_input("\nDo you want to add or edit another entry? (y/n)", "n").lower() == "y"
if not more_entries:
break
# Restart Caddy container
restart_caddy = get_user_input("\nDo you want to restart the Caddy container? (y/n)", "y").lower() == "y"
if restart_caddy:
os.system("docker compose restart caddy")
print("🔄 Caddy container restarted!")
if __name__ == "__main__":
caddyfile_path = find_caddyfile()
add_caddy_entry(caddyfile_path)