from flask import Blueprint, jsonify, request, abort, current_app, render_template from flask_login import login_required from app.core.models import Subnet, Server, App, Port from app.core.extensions import db from app.scripts.ip_scanner import scan import random import ipaddress from flask_wtf import CSRFProtect import markdown from datetime import datetime from flask import flash from app.utils.app_utils import validate_port_data bp = Blueprint("api", __name__, url_prefix="/api") csrf = CSRFProtect() @bp.route("/subnets", methods=["GET"]) def get_subnets(): """Get all subnets grouped by site""" subnets = Subnet.query.all() # Group subnets by location (site) sites = {} for subnet in subnets: location = subnet.location if location not in sites: sites[location] = [] sites[location].append( {"id": subnet.id, "cidr": subnet.cidr, "location": location} ) # Convert to list of site objects result = [ {"name": site_name, "subnets": subnets} for site_name, subnets in sites.items() ] return jsonify(result) @bp.route("/subnets/", methods=["GET"]) @login_required def get_subnet(subnet_id): """Get details for a specific subnet""" subnet = Subnet.query.get_or_404(subnet_id) servers = [] for server in Server.query.filter_by(subnet_id=subnet_id).all(): servers.append( { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, "created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) result = { "id": subnet.id, "cidr": subnet.cidr, "location": subnet.location, "used_ips": subnet.used_ips, "auto_scan": subnet.auto_scan, "created_at": subnet.created_at.strftime("%Y-%m-%d %H:%M:%S"), "servers": servers, } return jsonify(result) @bp.route("/subnets//scan", methods=["POST"]) @login_required def api_subnet_scan(subnet_id): """Scan a subnet via API""" subnet = Subnet.query.get_or_404(subnet_id) try: results = scan(subnet.cidr, save_results=True) return jsonify( { "success": True, "subnet": subnet.cidr, "hosts_found": len(results), "results": results, } ) except Exception as e: return ( jsonify({"success": False, "message": f"Error scanning subnet: {str(e)}"}), 500, ) @bp.route("/servers", methods=["GET"]) @login_required def get_servers(): """Get all servers""" servers = Server.query.all() result = [] for server in servers: result.append( { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, "subnet_id": server.subnet_id, "created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) return jsonify({"servers": result}) @bp.route("/servers/", methods=["GET"]) @login_required def get_server(server_id): """Get a specific server""" server = Server.query.get_or_404(server_id) apps = [] for app in server.apps: ports = [] for port in app.ports: ports.append( { "id": port.id, "port_number": port.port_number, "protocol": port.protocol, "description": port.description, } ) apps.append( { "id": app.id, "name": app.name, "ports": ports, "created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) result = { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, "subnet_id": server.subnet_id, "documentation": server.documentation, "apps": apps, "created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"), } return jsonify(result) @bp.route("/apps", methods=["GET"]) @login_required def get_apps(): """Get all applications""" apps = App.query.all() result = [] for app in apps: result.append( { "id": app.id, "name": app.name, "server_id": app.server_id, "created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) return jsonify({"apps": result}) @bp.route("/apps/", methods=["GET"]) @login_required def get_app(app_id): """Get details for a specific application""" app = App.query.get_or_404(app_id) result = { "id": app.id, "name": app.name, "server_id": app.server_id, "documentation": app.documentation, "created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"), "ports": app.ports, } return jsonify(result) @bp.route("/status", methods=["GET"]) def status(): return jsonify({"status": "OK"}) @bp.route("/markdown-preview", methods=["POST"]) @csrf.exempt # Remove this line in production! Temporary fix for demo purposes def markdown_preview(): data = request.json md_content = data.get("markdown", "") html = markdown.markdown(md_content) return jsonify({"html": html}) @bp.route("/ports/suggest", methods=["GET"]) def suggest_ports(): app_type = request.args.get("type", "").lower() # Common port suggestions based on app type suggestions = { "web": [ {"port": 80, "type": "tcp", "desc": "HTTP"}, {"port": 443, "type": "tcp", "desc": "HTTPS"}, ], "database": [ {"port": 3306, "type": "tcp", "desc": "MySQL"}, {"port": 5432, "type": "tcp", "desc": "PostgreSQL"}, {"port": 1521, "type": "tcp", "desc": "Oracle"}, ], "mail": [ {"port": 25, "type": "tcp", "desc": "SMTP"}, {"port": 143, "type": "tcp", "desc": "IMAP"}, {"port": 110, "type": "tcp", "desc": "POP3"}, ], "file": [ {"port": 21, "type": "tcp", "desc": "FTP"}, {"port": 22, "type": "tcp", "desc": "SFTP/SSH"}, {"port": 445, "type": "tcp", "desc": "SMB"}, ], } if app_type in suggestions: return jsonify(suggestions[app_type]) # Default suggestions return jsonify( [ {"port": 80, "type": "tcp", "desc": "HTTP"}, {"port": 22, "type": "tcp", "desc": "SSH"}, ] ) @bp.route("/servers//suggest_port", methods=["GET"]) @login_required def suggest_port(server_id): """Suggest a random unused port for a server""" server = Server.query.get_or_404(server_id) # Get all used ports for this server used_ports = [] for app in server.apps: for port in app.ports: used_ports.append(port.port_number) # Find an unused port in the dynamic/private port range available_port = None attempts = 0 while attempts < 50: # Try 50 times to find a random port # Random port between 10000 and 65535 port = random.randint(10000, 65535) if port not in used_ports: available_port = port break attempts += 1 if available_port is None: # If no random port found, find first available in sequence for port in range(10000, 65536): if port not in used_ports: available_port = port break return jsonify({"port": available_port}) @bp.route("/app//add-port", methods=["POST"]) @login_required def add_app_port(app_id): """Add a port to an application""" app = App.query.get_or_404(app_id) try: port_number = request.form.get("port_number") protocol = request.form.get("protocol", "TCP") description = request.form.get("description", "") # Validate port data valid, clean_port, error = validate_port_data(port_number, protocol, description) if not valid: return jsonify({"success": False, "error": error}), 400 # Check if port already exists existing_port = Port.query.filter_by(app_id=app_id, port_number=clean_port).first() if existing_port: return jsonify({ "success": False, "error": f"Port {clean_port} already exists for this application" }), 400 # Create new port new_port = Port( app_id=app_id, port_number=clean_port, protocol=protocol, description=description ) db.session.add(new_port) db.session.commit() flash(f"Port {clean_port}/{protocol} added successfully", "success") return jsonify({ "success": True, "message": f"Port {clean_port}/{protocol} added successfully", "port": { "id": new_port.id, "number": new_port.port_number, "protocol": new_port.protocol, "description": new_port.description } }) except Exception as e: db.session.rollback() return jsonify({"success": False, "error": str(e)}), 500 @bp.route("/app//ports", methods=["GET"]) @login_required def get_app_ports(app_id): """Get all ports for an application""" app = App.query.get_or_404(app_id) ports = Port.query.filter_by(app_id=app_id).all() result = { "app_id": app_id, "ports": [ { "id": port.id, "number": port.number, "protocol": port.protocol, "description": port.description, } for port in ports ], } return jsonify(result) @bp.route("/port//delete", methods=["POST"]) @login_required def delete_port(port_id): """Delete a port""" # Add CSRF validation if request.is_json: # For AJAX requests csrf_token = request.json.get("csrf_token") if not csrf_token or not csrf.validate_csrf(csrf_token): return jsonify({"success": False, "error": "CSRF validation failed"}), 403 port = Port.query.get_or_404(port_id) try: db.session.delete(port) db.session.commit() return jsonify({"success": True, "message": f"Port {port.number} deleted"}) except Exception as e: db.session.rollback() return jsonify({"success": False, "error": str(e)}), 500 @bp.route("/subnets//servers", methods=["GET"]) def get_subnet_servers(subnet_id): """Get all servers for a specific subnet""" servers = Server.query.filter_by(subnet_id=subnet_id).all() return jsonify( [ { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, } for server in servers ] ) @bp.route("/server//ports", methods=["GET"]) @login_required def get_server_ports(server_id): """Get all used ports for a server""" server = Server.query.get_or_404(server_id) # Get all ports associated with this server ports = Port.query.filter_by(server_id=server_id).all() used_ports = [port.number for port in ports] return jsonify({"server_id": server_id, "used_ports": used_ports}) @bp.route("/server//free-port", methods=["GET"]) @login_required def get_free_port(server_id): """Find a free port for a server""" server = Server.query.get_or_404(server_id) # Get all ports associated with this server used_ports = [ port.number for port in Port.query.filter_by(server_id=server_id).all() ] # Find the first free port (starting from 8000) for port_number in range(8000, 9000): if port_number not in used_ports: return jsonify({"success": True, "port": port_number}) return jsonify( {"success": False, "error": "No free ports available in the range 8000-9000"} )