from flask import ( Blueprint, jsonify, request, abort, current_app, render_template, redirect, url_for, ) from flask_login import login_required from app.core.models import Subnet, Server, App, Port from app.core.extensions import db from app.scripts.ip_scanner import scan import random import ipaddress from flask_wtf import CSRFProtect import markdown from datetime import datetime from flask import flash from app.utils.app_utils import validate_port_data bp = Blueprint("api", __name__, url_prefix="/api") csrf = CSRFProtect() @bp.route("/subnets", methods=["GET"]) def get_subnets(): """Get all subnets grouped by site""" subnets = Subnet.query.all() # Group subnets by location (site) sites = {} for subnet in subnets: location = subnet.location if location not in sites: sites[location] = [] sites[location].append( {"id": subnet.id, "cidr": subnet.cidr, "location": location} ) # Convert to list of site objects result = [ {"name": site_name, "subnets": subnets} for site_name, subnets in sites.items() ] return jsonify(result) @bp.route("/subnets/", methods=["GET"]) @login_required def get_subnet(subnet_id): """Get details for a specific subnet""" subnet = Subnet.query.get_or_404(subnet_id) servers = [] for server in Server.query.filter_by(subnet_id=subnet_id).all(): servers.append( { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, "created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) result = { "id": subnet.id, "cidr": subnet.cidr, "location": subnet.location, "used_ips": subnet.used_ips, "auto_scan": subnet.auto_scan, "created_at": subnet.created_at.strftime("%Y-%m-%d %H:%M:%S"), "servers": servers, } return jsonify(result) @bp.route("/subnets//scan", methods=["POST"]) @login_required def api_subnet_scan(subnet_id): """Scan a subnet via API""" subnet = Subnet.query.get_or_404(subnet_id) try: results = scan(subnet.cidr, save_results=True) return jsonify( { "success": True, "subnet": subnet.cidr, "hosts_found": len(results), "results": results, } ) except Exception as e: return ( jsonify({"success": False, "message": f"Error scanning subnet: {str(e)}"}), 500, ) @bp.route("/servers", methods=["GET"]) @login_required def get_servers(): """Get all servers""" servers = Server.query.all() result = [] for server in servers: result.append( { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, "subnet_id": server.subnet_id, "created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) return jsonify({"servers": result}) @bp.route("/servers/", methods=["GET"]) @login_required def get_server(server_id): """Get a specific server""" server = Server.query.get_or_404(server_id) apps = [] for app in server.apps: ports = [] for port in app.ports: ports.append( { "id": port.id, "port_number": port.port_number, "protocol": port.protocol, "description": port.description, } ) apps.append( { "id": app.id, "name": app.name, "ports": ports, "created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) result = { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, "subnet_id": server.subnet_id, "documentation": server.documentation, "apps": apps, "created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"), } return jsonify(result) @bp.route("/apps", methods=["GET"]) @login_required def get_apps(): """Get all applications""" apps = App.query.all() result = [] for app in apps: result.append( { "id": app.id, "name": app.name, "server_id": app.server_id, "created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"), } ) return jsonify({"apps": result}) @bp.route("/apps/", methods=["GET"]) @login_required def get_app(app_id): """Get details for a specific application""" app = App.query.get_or_404(app_id) result = { "id": app.id, "name": app.name, "server_id": app.server_id, "documentation": app.documentation, "created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"), "ports": app.ports, } return jsonify(result) @bp.route("/status", methods=["GET"]) def status(): return jsonify({"status": "OK"}) @bp.route("/ports/suggest", methods=["GET"]) def suggest_ports(): app_type = request.args.get("type", "").lower() # Common port suggestions based on app type suggestions = { "web": [ {"port": 80, "type": "tcp", "desc": "HTTP"}, {"port": 443, "type": "tcp", "desc": "HTTPS"}, ], "database": [ {"port": 3306, "type": "tcp", "desc": "MySQL"}, {"port": 5432, "type": "tcp", "desc": "PostgreSQL"}, {"port": 1521, "type": "tcp", "desc": "Oracle"}, ], "mail": [ {"port": 25, "type": "tcp", "desc": "SMTP"}, {"port": 143, "type": "tcp", "desc": "IMAP"}, {"port": 110, "type": "tcp", "desc": "POP3"}, ], "file": [ {"port": 21, "type": "tcp", "desc": "FTP"}, {"port": 22, "type": "tcp", "desc": "SFTP/SSH"}, {"port": 445, "type": "tcp", "desc": "SMB"}, ], } if app_type in suggestions: return jsonify(suggestions[app_type]) # Default suggestions return jsonify( [ {"port": 80, "type": "tcp", "desc": "HTTP"}, {"port": 22, "type": "tcp", "desc": "SSH"}, ] ) @bp.route("/servers//suggest_port", methods=["GET"]) @login_required def suggest_port(server_id): """Suggest a random unused port for a server""" server = Server.query.get_or_404(server_id) # Get all used ports for this server used_ports = [] for app in server.apps: for port in app.ports: used_ports.append(port.port_number) # Find an unused port in the dynamic/private port range available_port = None attempts = 0 while attempts < 50: # Try 50 times to find a random port # Random port between 10000 and 65535 port = random.randint(10000, 65535) if port not in used_ports: available_port = port break attempts += 1 if available_port is None: # If no random port found, find first available in sequence for port in range(10000, 65536): if port not in used_ports: available_port = port break return jsonify({"port": available_port}) @bp.route("/app//add-port", methods=["POST"]) @login_required def add_app_port(app_id): """Add a port to an application""" app = App.query.get_or_404(app_id) # Check if request is AJAX (XMLHttpRequest) is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.accept_mimetypes.best == 'application/json' try: port_number = request.form.get("port_number") protocol = request.form.get("protocol", "TCP") description = request.form.get("description", "") # Validate port data with server-side conflict check valid, clean_port, error = validate_port_data( port_number, protocol, description, app.server_id, app_id ) if not valid: flash(error, "danger") # If port is in use by another app, provide a direct link if "already in use by application" in error: app_name = error.split("'")[1] # Extract app name from error message conflict_app = App.query.filter_by(name=app_name, server_id=app.server_id).first() if conflict_app: edit_url = url_for('dashboard.app_edit', app_id=conflict_app.id) edit_link = f'Edit {app_name}' flash(f"Would you like to edit the conflicting application? {edit_link}", "info") return ( redirect(url_for("dashboard.app_view", app_id=app_id)) if not is_ajax else jsonify({"success": False, "error": error}) ), 400 # Check if port already exists for this app existing_port = Port.query.filter_by( app_id=app_id, port_number=clean_port, protocol=protocol ).first() if existing_port: error_msg = f"Port {clean_port}/{protocol} already exists for this application" flash(error_msg, "warning") return ( redirect(url_for("dashboard.app_view", app_id=app_id)) if not is_ajax else jsonify({"success": False, "error": error_msg}) ), 400 # Create new port new_port = Port( app_id=app_id, port_number=clean_port, protocol=protocol, description=description, ) db.session.add(new_port) db.session.commit() success_msg = f"Port {clean_port}/{protocol} added successfully" flash(success_msg, "success") # If it's a regular form submission (not AJAX), redirect if not is_ajax and request.content_type != 'application/json': return redirect(url_for("dashboard.app_view", app_id=app_id)) # Otherwise return JSON response return jsonify({ "success": True, "message": success_msg, "port": { "id": new_port.id, "port_number": new_port.port_number, "protocol": new_port.protocol, "description": new_port.description } }) except Exception as e: db.session.rollback() error_msg = f"Error adding port: {str(e)}" flash(error_msg, "danger") return ( redirect(url_for("dashboard.app_view", app_id=app_id)) if not is_ajax else jsonify({"success": False, "error": error_msg}) ), 500 @bp.route("/app//ports", methods=["GET"]) @login_required def get_app_ports(app_id): """Get all ports for an application""" app = App.query.get_or_404(app_id) ports = Port.query.filter_by(app_id=app_id).all() result = { "app_id": app_id, "ports": [ { "id": port.id, "number": port.number, "protocol": port.protocol, "description": port.description, } for port in ports ], } return jsonify(result) @bp.route("/app//port//delete", methods=["POST"]) @login_required def delete_app_port(app_id, port_id): """Delete a port from an application""" app = App.query.get_or_404(app_id) port = Port.query.get_or_404(port_id) if port.app_id != app.id: flash("Port does not belong to this application", "danger") return redirect(url_for("dashboard.app_view", app_id=app_id)) try: db.session.delete(port) db.session.commit() flash( f"Port {port.port_number}/{port.protocol} deleted successfully", "success" ) except Exception as e: db.session.rollback() flash(f"Error deleting port: {str(e)}", "danger") return redirect(url_for("dashboard.app_view", app_id=app_id)) @bp.route("/subnets//servers", methods=["GET"]) def get_subnet_servers(subnet_id): """Get all servers for a specific subnet""" servers = Server.query.filter_by(subnet_id=subnet_id).all() return jsonify( [ { "id": server.id, "hostname": server.hostname, "ip_address": server.ip_address, } for server in servers ] ) @bp.route("/server//ports", methods=["GET"]) @login_required def get_server_ports(server_id): """Get all used ports for a server""" server = Server.query.get_or_404(server_id) # Get all ports associated with this server ports = Port.query.filter_by(server_id=server_id).all() used_ports = [port.number for port in ports] return jsonify({"server_id": server_id, "used_ports": used_ports}) @bp.route("/server//free-port", methods=["GET"]) @login_required def get_free_port(server_id): """Find a free port for a server""" server = Server.query.get_or_404(server_id) # Get all ports associated with this server used_ports = [ port.number for port in Port.query.filter_by(server_id=server_id).all() ] # Find the first free port (starting from 8000) for port_number in range(8000, 9000): if port_number not in used_ports: return jsonify({"success": True, "port": port_number}) return jsonify( {"success": False, "error": "No free ports available in the range 8000-9000"} ) @bp.route("/validate/app-name", methods=["GET"]) @login_required def validate_app_name(): """API endpoint to validate application name in real-time""" name = request.args.get("name", "").strip() server_id = request.args.get("server_id") app_id = request.args.get("app_id") if not name or not server_id: return jsonify({"valid": False, "message": "Missing required parameters"}) # Check for existing app with same name query = App.query.filter(App.name == name, App.server_id == server_id) if app_id: query = query.filter(App.id != int(app_id)) existing_app = query.first() if existing_app: return jsonify({ "valid": False, "message": f"Application '{name}' already exists on this server", "app_id": existing_app.id, "edit_url": url_for('dashboard.app_edit', app_id=existing_app.id) }) # Check for similar names similar_apps = App.query.filter( App.name.ilike(f"%{name}%"), App.server_id == server_id ).limit(3).all() if similar_apps and (not app_id or not any(str(app.id) == app_id for app in similar_apps)): similar_names = [{"name": app.name, "id": app.id} for app in similar_apps] return jsonify({ "valid": True, "warning": "Similar application names found", "similar_apps": similar_names }) return jsonify({"valid": True}) @bp.route("/validate/app-port", methods=["GET"]) @login_required def validate_app_port(): """API endpoint to validate port in real-time""" port_number = request.args.get("port_number", "").strip() protocol = request.args.get("protocol", "TCP") server_id = request.args.get("server_id") app_id = request.args.get("app_id") if not port_number or not server_id: return jsonify({"valid": False, "message": "Missing required parameters"}) try: clean_port = int(port_number) if clean_port < 1 or clean_port > 65535: return jsonify({"valid": False, "message": "Port must be between 1 and 65535"}) in_use, app_name = is_port_in_use( clean_port, protocol, server_id, exclude_app_id=app_id if app_id else None ) if in_use: conflict_app = App.query.filter_by(name=app_name, server_id=server_id).first() return jsonify({ "valid": False, "message": f"Port {clean_port}/{protocol} is already in use by application '{app_name}'", "app_id": conflict_app.id if conflict_app else None, "edit_url": url_for('dashboard.app_edit', app_id=conflict_app.id) if conflict_app else None }) return jsonify({"valid": True}) except ValueError: return jsonify({"valid": False, "message": "Invalid port number"})