from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify from flask_login import login_required from app.core.models import Subnet, Server, App from app.core.extensions import db from app.scripts.ip_scanner import scan import ipaddress from datetime import datetime import json bp = Blueprint("ipam", __name__, url_prefix="/ipam") @bp.route("/") @login_required def ipam_home(): """Main IPAM dashboard""" subnets = Subnet.query.all() # Calculate usage for each subnet for subnet in subnets: network = ipaddress.ip_network(subnet.cidr, strict=False) max_hosts = ( network.num_addresses - 2 if network.prefixlen < 31 else network.num_addresses ) used_count = Server.query.filter_by(subnet_id=subnet.id).count() subnet.usage_percent = (used_count / max_hosts) * 100 if max_hosts > 0 else 0 return render_template( "ipam/index.html", title="IPAM Dashboard", subnets=subnets, now=datetime.now() ) @bp.route("/subnet/new", methods=["GET", "POST"]) @login_required def subnet_new(): """Create a new subnet""" if request.method == "POST": cidr = request.form.get("cidr") location = request.form.get("location") auto_scan = request.form.get("auto_scan") == "on" # Basic validation if not cidr or not location: flash("Please fill in all required fields", "danger") return render_template("ipam/subnet_form.html", title="New Subnet") # Validate CIDR format try: ipaddress.ip_network(cidr, strict=False) except ValueError: flash("Invalid CIDR format", "danger") return render_template("ipam/subnet_form.html", title="New Subnet") # Check if CIDR already exists if Subnet.query.filter_by(cidr=cidr).first(): flash("Subnet already exists", "danger") return render_template("ipam/subnet_form.html", title="New Subnet") # Create new subnet with JSON string for active_hosts, not a Python list subnet = Subnet( cidr=cidr, location=location, active_hosts=json.dumps([]), # Convert empty list to JSON string last_scanned=None, auto_scan=auto_scan, ) db.session.add(subnet) db.session.commit() flash("Subnet created successfully", "success") return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id)) return render_template("ipam/subnet_form.html", title="New Subnet") @bp.route("/subnet/") @login_required def subnet_view(subnet_id): """View a subnet and all its hosts""" subnet = Subnet.query.get_or_404(subnet_id) # Get servers in this subnet servers = Server.query.filter_by(subnet_id=subnet_id).all() # Get applications in this subnet subnet_apps = [] for server in servers: apps = App.query.filter_by(server_id=server.id).all() subnet_apps.extend(apps) # Calculate usage statistics network = ipaddress.ip_network(subnet.cidr, strict=False) total_ips = network.num_addresses - 2 # Subtract network and broadcast addresses used_ips = Server.query.filter_by(subnet_id=subnet_id).count() return render_template( "ipam/subnet_view.html", title=f"Subnet {subnet.cidr}", subnet=subnet, servers=servers, subnet_apps=subnet_apps, total_ips=total_ips, used_ips=used_ips, ) @bp.route("/subnet//edit", methods=["GET", "POST"]) @login_required def subnet_edit(subnet_id): """Edit a subnet""" subnet = Subnet.query.get_or_404(subnet_id) if request.method == "POST": cidr = request.form.get("cidr") location = request.form.get("location") auto_scan = request.form.get("auto_scan") == "on" # Validate inputs if not all([cidr, location]): flash("All fields are required", "danger") return render_template( "ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True, ) # Validate CIDR format try: ipaddress.ip_network(cidr, strict=False) except ValueError: flash("Invalid CIDR format", "danger") return render_template( "ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True, ) # Update subnet subnet.cidr = cidr subnet.location = location subnet.auto_scan = auto_scan try: db.session.commit() flash(f"Subnet {cidr} has been updated", "success") return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id)) except Exception as e: db.session.rollback() flash(f"Error updating subnet: {str(e)}", "danger") return render_template( "ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True ) @bp.route("/subnet//delete", methods=["POST"]) @login_required def subnet_delete(subnet_id): """Delete a subnet""" subnet = Subnet.query.get_or_404(subnet_id) # Check if subnet has servers servers_count = Server.query.filter_by(subnet_id=subnet_id).count() if servers_count > 0: flash( f"Cannot delete subnet {subnet.cidr}. It has {servers_count} servers assigned.", "danger", ) return redirect(url_for("ipam.subnet_view", subnet_id=subnet_id)) db.session.delete(subnet) db.session.commit() flash(f"Subnet {subnet.cidr} has been deleted", "success") return redirect(url_for("ipam.ipam_home")) @bp.route("/subnet//scan", methods=["POST"]) @login_required def subnet_scan(subnet_id): """Manually scan a subnet""" subnet = Subnet.query.get_or_404(subnet_id) try: # Call the scan function with manual_trigger=True scan(subnet, manual_trigger=True) db.session.commit() flash(f"Scan completed for subnet {subnet.cidr}", "success") except Exception as e: db.session.rollback() flash(f"Error scanning subnet: {str(e)}", "danger") return redirect(url_for("ipam.subnet_view", subnet_id=subnet_id)) @bp.route("/subnet//force-delete", methods=["POST"]) @login_required def subnet_force_delete(subnet_id): """Force delete a subnet and all its related servers and applications""" subnet = Subnet.query.get_or_404(subnet_id) try: # Get all servers to be deleted for reporting servers = Server.query.filter_by(subnet_id=subnet_id).all() server_count = len(servers) # This will cascade delete all related servers and their applications db.session.delete(subnet) db.session.commit() flash( f"Subnet {subnet.cidr} and {server_count} related servers were deleted successfully", "success", ) return redirect(url_for("dashboard.ipam_home")) except Exception as e: db.session.rollback() flash(f"Error deleting subnet: {str(e)}", "danger") return redirect(url_for("dashboard.subnet_view", subnet_id=subnet_id)) @bp.route("/subnet/create-ajax", methods=["POST"]) @login_required def subnet_create_ajax(): """Create a subnet via AJAX""" data = request.json if not data: return jsonify({"success": False, "error": "No data provided"}) cidr = data.get("cidr") location = data.get("location") auto_scan = data.get("auto_scan", False) if not cidr or not location: return jsonify({"success": False, "error": "CIDR and location are required"}) # Validate CIDR try: network = ipaddress.ip_network(cidr, strict=False) except ValueError as e: return jsonify({"success": False, "error": f"Invalid CIDR: {str(e)}"}) # Create subnet subnet = Subnet( cidr=cidr, location=location, auto_scan=auto_scan, active_hosts=json.dumps([]) ) try: db.session.add(subnet) db.session.commit() return jsonify( { "success": True, "subnet_id": subnet.id, "cidr": subnet.cidr, "location": subnet.location, } ) except Exception as e: db.session.rollback() return jsonify({"success": False, "error": str(e)}) @bp.route("/location/") @login_required def location_overview(location): """View all subnets and servers in a specific location""" # Get all subnets in this location subnets = Subnet.query.filter_by(location=location).all() # Get servers in these subnets servers = [] for subnet in subnets: subnet_servers = Server.query.filter_by(subnet_id=subnet.id).all() servers.extend(subnet_servers) # Create a hierarchical structure hierarchy = { 'subnets': [], 'standalone_servers': [] } # Organize subnets and their servers for subnet in subnets: subnet_data = { 'id': subnet.id, 'cidr': subnet.cidr, 'location': subnet.location, 'servers': [] } # Only add description if it exists as an attribute if hasattr(subnet, 'description'): subnet_data['description'] = subnet.description for server in subnet.servers: server_data = { 'id': server.id, 'hostname': server.hostname, 'ip_address': server.ip_address, 'apps': [] } for app in server.apps: app_data = { 'id': app.id, 'name': app.name, 'ports': app.ports } server_data['apps'].append(app_data) subnet_data['servers'].append(server_data) hierarchy['subnets'].append(subnet_data) return render_template( "dashboard/overview.html", title=f"{location} Overview", hierarchy=hierarchy, location=location )