from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify from flask_login import login_required from app.core.models import Subnet, Server from app.core.extensions import db from app.scripts.ip_scanner import scan import ipaddress from datetime import datetime import json bp = Blueprint('ipam', __name__, url_prefix='/ipam') @bp.route('/') @login_required def ipam_home(): """Main IPAM dashboard""" subnets = Subnet.query.all() # Calculate usage for each subnet for subnet in subnets: network = ipaddress.ip_network(subnet.cidr, strict=False) max_hosts = network.num_addresses - 2 if network.prefixlen < 31 else network.num_addresses used_count = Server.query.filter_by(subnet_id=subnet.id).count() subnet.usage_percent = (used_count / max_hosts) * 100 if max_hosts > 0 else 0 return render_template( 'ipam/index.html', title='IPAM Dashboard', subnets=subnets, now=datetime.now() ) @bp.route('/subnet/new', methods=['GET', 'POST']) @login_required def subnet_new(): """Create a new subnet""" if request.method == 'POST': cidr = request.form.get('cidr') location = request.form.get('location') auto_scan = request.form.get('auto_scan') == 'on' # Basic validation if not cidr or not location: flash('Please fill in all required fields', 'danger') return render_template( 'ipam/subnet_form.html', title='New Subnet' ) # Validate CIDR format try: ipaddress.ip_network(cidr, strict=False) except ValueError: flash('Invalid CIDR format', 'danger') return render_template( 'ipam/subnet_form.html', title='New Subnet' ) # Check if CIDR already exists if Subnet.query.filter_by(cidr=cidr).first(): flash('Subnet already exists', 'danger') return render_template( 'ipam/subnet_form.html', title='New Subnet' ) # Create new subnet with JSON string for active_hosts, not a Python list subnet = Subnet( cidr=cidr, location=location, active_hosts=json.dumps([]), # Convert empty list to JSON string last_scanned=None, auto_scan=auto_scan ) db.session.add(subnet) db.session.commit() flash('Subnet created successfully', 'success') return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id)) return render_template( 'ipam/subnet_form.html', title='New Subnet' ) @bp.route('/subnet/') @login_required def subnet_view(subnet_id): """View a specific subnet""" subnet = Subnet.query.get_or_404(subnet_id) # Get all servers in this subnet servers = Server.query.filter_by(subnet_id=subnet_id).all() # Parse CIDR for display network = ipaddress.ip_network(subnet.cidr, strict=False) subnet_info = { 'network_address': str(network.network_address), 'broadcast_address': str(network.broadcast_address), 'netmask': str(network.netmask), 'num_addresses': network.num_addresses, 'host_range': f"{str(network.network_address + 1)} - {str(network.broadcast_address - 1)}" if network.prefixlen < 31 else subnet.cidr } return render_template( 'ipam/subnet_view.html', title=subnet.cidr, subnet=subnet, subnet_info=subnet_info, servers=servers, now=datetime.now() ) @bp.route('/subnet//edit', methods=['GET', 'POST']) @login_required def subnet_edit(subnet_id): """Edit a subnet""" subnet = Subnet.query.get_or_404(subnet_id) if request.method == 'POST': cidr = request.form.get('cidr') location = request.form.get('location') auto_scan = request.form.get('auto_scan') == 'on' # Validate inputs if not all([cidr, location]): flash('All fields are required', 'danger') return render_template('ipam/subnet_form.html', title='Edit Subnet', subnet=subnet, edit_mode=True) # Validate CIDR format try: ipaddress.ip_network(cidr, strict=False) except ValueError: flash('Invalid CIDR format', 'danger') return render_template('ipam/subnet_form.html', title='Edit Subnet', subnet=subnet, edit_mode=True) # Update subnet subnet.cidr = cidr subnet.location = location subnet.auto_scan = auto_scan try: db.session.commit() flash(f'Subnet {cidr} has been updated', 'success') return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id)) except Exception as e: db.session.rollback() flash(f'Error updating subnet: {str(e)}', 'danger') return render_template('ipam/subnet_form.html', title='Edit Subnet', subnet=subnet, edit_mode=True) @bp.route('/subnet//delete', methods=['POST']) @login_required def subnet_delete(subnet_id): """Delete a subnet""" subnet = Subnet.query.get_or_404(subnet_id) # Check if subnet has servers servers_count = Server.query.filter_by(subnet_id=subnet_id).count() if servers_count > 0: flash(f'Cannot delete subnet {subnet.cidr}. It has {servers_count} servers assigned.', 'danger') return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id)) db.session.delete(subnet) db.session.commit() flash(f'Subnet {subnet.cidr} has been deleted', 'success') return redirect(url_for('ipam.ipam_home')) @bp.route('/subnet//scan', methods=['POST']) @login_required def subnet_scan(subnet_id): """Manually scan a subnet""" subnet = Subnet.query.get_or_404(subnet_id) try: # Call the scan function with manual_trigger=True scan(subnet, manual_trigger=True) db.session.commit() flash(f'Scan completed for subnet {subnet.cidr}', 'success') except Exception as e: db.session.rollback() flash(f'Error scanning subnet: {str(e)}', 'danger') return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id)) @bp.route('/subnet//force-delete', methods=['POST']) @login_required def subnet_force_delete(subnet_id): """Force delete a subnet and all its related servers and applications""" subnet = Subnet.query.get_or_404(subnet_id) try: # Get all servers to be deleted for reporting servers = Server.query.filter_by(subnet_id=subnet_id).all() server_count = len(servers) # This will cascade delete all related servers and their applications db.session.delete(subnet) db.session.commit() flash(f'Subnet {subnet.cidr} and {server_count} related servers were deleted successfully', 'success') return redirect(url_for('dashboard.ipam_home')) except Exception as e: db.session.rollback() flash(f'Error deleting subnet: {str(e)}', 'danger') return redirect(url_for('dashboard.subnet_view', subnet_id=subnet_id)) @bp.route('/subnet/create-ajax', methods=['POST']) @login_required def subnet_create_ajax(): """Create a subnet via AJAX""" data = request.json if not data: return jsonify({'success': False, 'error': 'No data provided'}) cidr = data.get('cidr') location = data.get('location') auto_scan = data.get('auto_scan', False) if not cidr or not location: return jsonify({'success': False, 'error': 'CIDR and location are required'}) # Validate CIDR try: network = ipaddress.ip_network(cidr, strict=False) except ValueError as e: return jsonify({'success': False, 'error': f'Invalid CIDR: {str(e)}'}) # Create subnet subnet = Subnet( cidr=cidr, location=location, auto_scan=auto_scan, active_hosts=json.dumps([]) ) try: db.session.add(subnet) db.session.commit() return jsonify({ 'success': True, 'subnet_id': subnet.id, 'cidr': subnet.cidr, 'location': subnet.location }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'error': str(e)})