from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify from flask_login import login_required, current_user from app.core.models import Subnet, Server, App, Location from app.core.extensions import db from app.scripts.ip_scanner import scan import ipaddress from datetime import datetime import json bp = Blueprint("ipam", __name__, url_prefix="/ipam") @bp.route("/") @login_required def ipam_home(): """Main IPAM dashboard""" subnets = Subnet.query.all() # Calculate usage for each subnet for subnet in subnets: network = ipaddress.ip_network(subnet.cidr, strict=False) max_hosts = ( network.num_addresses - 2 if network.prefixlen < 31 else network.num_addresses ) used_count = Server.query.filter_by(subnet_id=subnet.id).count() subnet.usage_percent = (used_count / max_hosts) * 100 if max_hosts > 0 else 0 return render_template( "ipam/index.html", title="IPAM Dashboard", subnets=subnets, now=datetime.now() ) @bp.route("/subnet/new", methods=["GET", "POST"]) @login_required def subnet_new(): """Create a new subnet""" # Get all locations for the dropdown locations = Location.query.filter_by(user_id=current_user.id).all() if request.method == "POST": cidr = request.form.get("cidr") location_id = request.form.get("location_id") auto_scan = request.form.get("auto_scan") == "on" # Basic validation if not cidr or not location_id: flash("CIDR notation and location are required", "danger") return render_template("ipam/subnet_form.html", title="New Subnet", locations=locations) # Create new subnet subnet = Subnet( cidr=cidr, location_id=location_id, user_id=current_user.id, active_hosts=json.dumps([]), # Convert empty list to JSON string last_scanned=None, auto_scan=auto_scan, ) db.session.add(subnet) db.session.commit() flash("Subnet created successfully", "success") return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id)) return render_template("ipam/subnet_form.html", title="New Subnet", locations=locations) @bp.route("/subnet/") @login_required def subnet_view(subnet_id): """View a subnet and all its hosts""" subnet = Subnet.query.get_or_404(subnet_id) # Get servers in this subnet servers = Server.query.filter_by(subnet_id=subnet_id).all() # Get applications in this subnet subnet_apps = [] for server in servers: apps = App.query.filter_by(server_id=server.id).all() subnet_apps.extend(apps) # Calculate usage statistics network = ipaddress.ip_network(subnet.cidr, strict=False) total_ips = network.num_addresses - 2 # Subtract network and broadcast addresses used_ips = Server.query.filter_by(subnet_id=subnet_id).count() return render_template( "ipam/subnet_view.html", title=f"Subnet {subnet.cidr}", subnet=subnet, servers=servers, subnet_apps=subnet_apps, total_ips=total_ips, used_ips=used_ips, ) @bp.route("/subnet//edit", methods=["GET", "POST"]) @login_required def subnet_edit(subnet_id): """Edit a subnet""" subnet = Subnet.query.get_or_404(subnet_id) if request.method == "POST": cidr = request.form.get("cidr") location = request.form.get("location") auto_scan = request.form.get("auto_scan") == "on" # Validate inputs if not all([cidr, location]): flash("All fields are required", "danger") return render_template( "ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True, ) # Validate CIDR format try: ipaddress.ip_network(cidr, strict=False) except ValueError: flash("Invalid CIDR format", "danger") return render_template( "ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True, ) # Update subnet subnet.cidr = cidr subnet.location = location subnet.auto_scan = auto_scan try: db.session.commit() flash(f"Subnet {cidr} has been updated", "success") return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id)) except Exception as e: db.session.rollback() flash(f"Error updating subnet: {str(e)}", "danger") return render_template( "ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True ) @bp.route("/subnet//delete", methods=["POST"]) @login_required def subnet_delete(subnet_id): """Delete a subnet""" subnet = Subnet.query.get_or_404(subnet_id) # Check if subnet has servers servers_count = Server.query.filter_by(subnet_id=subnet_id).count() if servers_count > 0: flash( f"Cannot delete subnet {subnet.cidr}. It has {servers_count} servers assigned.", "danger", ) return redirect(url_for("ipam.subnet_view", subnet_id=subnet_id)) db.session.delete(subnet) db.session.commit() flash(f"Subnet {subnet.cidr} has been deleted", "success") return redirect(url_for("ipam.ipam_home")) @bp.route("/subnet//scan", methods=["POST"]) @login_required def subnet_scan(subnet_id): """Manually scan a subnet""" subnet = Subnet.query.get_or_404(subnet_id) try: # Call the scan function with manual_trigger=True scan(subnet, manual_trigger=True) db.session.commit() flash(f"Scan completed for subnet {subnet.cidr}", "success") except Exception as e: db.session.rollback() flash(f"Error scanning subnet: {str(e)}", "danger") return redirect(url_for("ipam.subnet_view", subnet_id=subnet_id)) @bp.route("/subnet//force-delete", methods=["POST"]) @login_required def subnet_force_delete(subnet_id): """Force delete a subnet and all its related servers and applications""" subnet = Subnet.query.get_or_404(subnet_id) try: # Get all servers to be deleted for reporting servers = Server.query.filter_by(subnet_id=subnet_id).all() server_count = len(servers) # This will cascade delete all related servers and their applications db.session.delete(subnet) db.session.commit() flash( f"Subnet {subnet.cidr} and {server_count} related servers were deleted successfully", "success", ) return redirect(url_for("dashboard.ipam_home")) except Exception as e: db.session.rollback() flash(f"Error deleting subnet: {str(e)}", "danger") return redirect(url_for("dashboard.subnet_view", subnet_id=subnet_id)) @bp.route("/subnet/create-ajax", methods=["POST"]) @login_required def subnet_create_ajax(): """Create a subnet via AJAX""" data = request.json if not data: return jsonify({"success": False, "error": "No data provided"}) cidr = data.get("cidr") location = data.get("location") auto_scan = data.get("auto_scan", False) if not cidr or not location: return jsonify({"success": False, "error": "CIDR and location are required"}) # Validate CIDR try: network = ipaddress.ip_network(cidr, strict=False) except ValueError as e: return jsonify({"success": False, "error": f"Invalid CIDR: {str(e)}"}) # Create subnet subnet = Subnet( cidr=cidr, location=location, auto_scan=auto_scan, active_hosts=json.dumps([]) ) try: db.session.add(subnet) db.session.commit() return jsonify( { "success": True, "subnet_id": subnet.id, "cidr": subnet.cidr, "location": subnet.location, } ) except Exception as e: db.session.rollback() return jsonify({"success": False, "error": str(e)}) @bp.route("/location/") @login_required def location_overview(location_id): """View all subnets and servers in a specific location""" # Get the location (ensure it belongs to the current user) location = Location.query.filter_by(id=location_id, user_id=current_user.id).first_or_404() # Create a hierarchical structure hierarchy = { 'locations': { location.name: { 'id': location.id, 'description': location.description, 'subnets': [], 'standalone_servers': [] } } } # Organize subnets and their servers for subnet in location.subnets: subnet_data = { 'id': subnet.id, 'cidr': subnet.cidr, 'servers': [] } for server in subnet.servers: server_data = { 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address, 'description': server.description, 'apps': [] } for app in server.apps: app_data = { 'id': app.id, 'name': app.name, 'url': app.url, 'ports': [] } for port in app.ports: port_data = { 'id': port.id, 'number': port.port_number, 'protocol': port.protocol, 'description': port.description } app_data['ports'].append(port_data) server_data['apps'].append(app_data) subnet_data['servers'].append(server_data) hierarchy['locations'][location.name]['subnets'].append(subnet_data) # Add standalone servers for server in location.standalone_servers: server_data = { 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address, 'description': server.description, 'apps': [] } for app in server.apps: app_data = { 'id': app.id, 'name': app.name, 'url': app.url, 'ports': [] } for port in app.ports: port_data = { 'id': port.id, 'number': port.port_number, 'protocol': port.protocol, 'description': port.description } app_data['ports'].append(port_data) server_data['apps'].append(app_data) hierarchy['locations'][location.name]['standalone_servers'].append(server_data) return render_template( "ipam/location_overview.html", title=f"{location.name} Overview", hierarchy=hierarchy, location=location )