from flask import Blueprint, jsonify, request, abort from flask_login import login_required from app.core.models import Subnet, Server, App, Port from app.core.extensions import db from app.scripts.ip_scanner import scan import random import ipaddress bp = Blueprint('api', __name__, url_prefix='/api') @bp.route('/subnets', methods=['GET']) def get_subnets(): """Get all subnets grouped by site""" subnets = Subnet.query.all() # Group subnets by location (site) sites = {} for subnet in subnets: location = subnet.location if location not in sites: sites[location] = [] sites[location].append({ 'id': subnet.id, 'cidr': subnet.cidr, 'location': location }) # Convert to list of site objects result = [ { 'name': site_name, 'subnets': subnets } for site_name, subnets in sites.items() ] return jsonify(result) @bp.route('/subnets/', methods=['GET']) @login_required def get_subnet(subnet_id): """Get details for a specific subnet""" subnet = Subnet.query.get_or_404(subnet_id) servers = [] for server in Server.query.filter_by(subnet_id=subnet_id).all(): servers.append({ 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address, 'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S') }) result = { 'id': subnet.id, 'cidr': subnet.cidr, 'location': subnet.location, 'used_ips': subnet.used_ips, 'auto_scan': subnet.auto_scan, 'created_at': subnet.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'servers': servers } return jsonify(result) @bp.route('/subnets//scan', methods=['POST']) @login_required def api_subnet_scan(subnet_id): """Scan a subnet via API""" subnet = Subnet.query.get_or_404(subnet_id) try: results = scan(subnet.cidr, save_results=True) return jsonify({ 'success': True, 'subnet': subnet.cidr, 'hosts_found': len(results), 'results': results }) except Exception as e: return jsonify({ 'success': False, 'message': f'Error scanning subnet: {str(e)}' }), 500 @bp.route('/servers', methods=['GET']) @login_required def get_servers(): """Get all servers""" servers = Server.query.all() result = [] for server in servers: result.append({ 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address, 'subnet_id': server.subnet_id, 'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S') }) return jsonify({'servers': result}) @bp.route('/servers/', methods=['GET']) @login_required def get_server(server_id): """Get a specific server""" server = Server.query.get_or_404(server_id) apps = [] for app in server.apps: ports = [] for port in app.ports: ports.append({ 'id': port.id, 'port_number': port.port_number, 'protocol': port.protocol, 'description': port.description }) apps.append({ 'id': app.id, 'name': app.name, 'ports': ports, 'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S') }) result = { 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address, 'subnet_id': server.subnet_id, 'documentation': server.documentation, 'apps': apps, 'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S') } return jsonify(result) @bp.route('/apps', methods=['GET']) @login_required def get_apps(): """Get all applications""" apps = App.query.all() result = [] for app in apps: result.append({ 'id': app.id, 'name': app.name, 'server_id': app.server_id, 'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S') }) return jsonify({'apps': result}) @bp.route('/apps/', methods=['GET']) @login_required def get_app(app_id): """Get details for a specific application""" app = App.query.get_or_404(app_id) result = { 'id': app.id, 'name': app.name, 'server_id': app.server_id, 'documentation': app.documentation, 'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'ports': app.ports } return jsonify(result) @bp.route('/status', methods=['GET']) def status(): return jsonify({'status': 'OK'}) @bp.route('/markdown-preview', methods=['POST']) def markdown_preview(): data = request.json md_content = data.get('markdown', '') html = markdown.markdown(md_content) return jsonify({'html': html}) @bp.route('/ports/suggest', methods=['GET']) def suggest_ports(): app_type = request.args.get('type', '').lower() # Common port suggestions based on app type suggestions = { 'web': [ {'port': 80, 'type': 'tcp', 'desc': 'HTTP'}, {'port': 443, 'type': 'tcp', 'desc': 'HTTPS'} ], 'database': [ {'port': 3306, 'type': 'tcp', 'desc': 'MySQL'}, {'port': 5432, 'type': 'tcp', 'desc': 'PostgreSQL'}, {'port': 1521, 'type': 'tcp', 'desc': 'Oracle'} ], 'mail': [ {'port': 25, 'type': 'tcp', 'desc': 'SMTP'}, {'port': 143, 'type': 'tcp', 'desc': 'IMAP'}, {'port': 110, 'type': 'tcp', 'desc': 'POP3'} ], 'file': [ {'port': 21, 'type': 'tcp', 'desc': 'FTP'}, {'port': 22, 'type': 'tcp', 'desc': 'SFTP/SSH'}, {'port': 445, 'type': 'tcp', 'desc': 'SMB'} ] } if app_type in suggestions: return jsonify(suggestions[app_type]) # Default suggestions return jsonify([ {'port': 80, 'type': 'tcp', 'desc': 'HTTP'}, {'port': 22, 'type': 'tcp', 'desc': 'SSH'} ]) @bp.route('/servers//suggest_port', methods=['GET']) @login_required def suggest_port(server_id): """Suggest a random unused port for a server""" server = Server.query.get_or_404(server_id) # Get all used ports for this server used_ports = [] for app in server.apps: for port in app.ports: used_ports.append(port.port_number) # Find an unused port in the dynamic/private port range available_port = None attempts = 0 while attempts < 50: # Try 50 times to find a random port # Random port between 10000 and 65535 port = random.randint(10000, 65535) if port not in used_ports: available_port = port break attempts += 1 if available_port is None: # If no random port found, find first available in sequence for port in range(10000, 65536): if port not in used_ports: available_port = port break return jsonify({'port': available_port}) @bp.route('/apps//ports', methods=['GET']) @login_required def get_app_ports(app_id): """Get all ports for an app""" app = App.query.get_or_404(app_id) ports = [] for port in app.ports: ports.append({ 'id': port.id, 'port_number': port.port_number, 'protocol': port.protocol, 'description': port.description }) return jsonify({'ports': ports}) @bp.route('/apps//ports', methods=['POST']) @login_required def add_app_port(app_id): """Add a new port to an app""" app = App.query.get_or_404(app_id) data = request.json if not data or 'port_number' not in data: return jsonify({'error': 'Missing port number'}), 400 port_number = data.get('port_number') protocol = data.get('protocol', 'TCP') description = data.get('description', '') # Check if port already exists for this app existing_port = Port.query.filter_by(app_id=app_id, port_number=port_number).first() if existing_port: return jsonify({'error': 'Port already exists for this app'}), 400 new_port = Port( app_id=app_id, port_number=port_number, protocol=protocol, description=description ) db.session.add(new_port) db.session.commit() return jsonify({ 'id': new_port.id, 'port_number': new_port.port_number, 'protocol': new_port.protocol, 'description': new_port.description }) @bp.route('/ports/', methods=['DELETE']) @login_required def delete_port(port_id): """Delete a port""" port = Port.query.get_or_404(port_id) db.session.delete(port) db.session.commit() return jsonify({'success': True}) @bp.route('/subnets//servers', methods=['GET']) def get_subnet_servers(subnet_id): """Get all servers for a specific subnet""" servers = Server.query.filter_by(subnet_id=subnet_id).all() return jsonify([{ 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address } for server in servers])