homedocs/app/routes/ipam.py
2025-03-30 19:57:41 +02:00

195 lines
No EOL
6.6 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify
from flask_login import login_required
from app.core.models import Subnet, Server
from app.core.extensions import db
from app.scripts.ip_scanner import scan
import ipaddress
from datetime import datetime
import json
bp = Blueprint('ipam', __name__, url_prefix='/ipam')
@bp.route('/')
@login_required
def ipam_home():
"""Main IPAM dashboard"""
subnets = Subnet.query.all()
# Calculate usage for each subnet
for subnet in subnets:
network = ipaddress.ip_network(subnet.cidr, strict=False)
max_hosts = network.num_addresses - 2 if network.prefixlen < 31 else network.num_addresses
used_count = Server.query.filter_by(subnet_id=subnet.id).count()
subnet.usage_percent = (used_count / max_hosts) * 100 if max_hosts > 0 else 0
return render_template(
'ipam/index.html',
title='IPAM Dashboard',
subnets=subnets,
now=datetime.now()
)
@bp.route('/subnet/new', methods=['GET', 'POST'])
@login_required
def subnet_new():
"""Create a new subnet"""
if request.method == 'POST':
cidr = request.form.get('cidr')
location = request.form.get('location')
auto_scan = request.form.get('auto_scan') == 'on'
# Basic validation
if not cidr or not location:
flash('Please fill in all required fields', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
# Validate CIDR format
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
flash('Invalid CIDR format', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
# Check if CIDR already exists
if Subnet.query.filter_by(cidr=cidr).first():
flash('Subnet already exists', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
# Create new subnet with JSON string for active_hosts, not a Python list
subnet = Subnet(
cidr=cidr,
location=location,
active_hosts=json.dumps([]), # Convert empty list to JSON string
last_scanned=None,
auto_scan=auto_scan
)
db.session.add(subnet)
db.session.commit()
flash('Subnet created successfully', 'success')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id))
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
@bp.route('/subnet/<int:subnet_id>')
@login_required
def subnet_view(subnet_id):
"""View a specific subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
# Get all servers in this subnet
servers = Server.query.filter_by(subnet_id=subnet_id).all()
# Parse CIDR for display
network = ipaddress.ip_network(subnet.cidr, strict=False)
subnet_info = {
'network_address': str(network.network_address),
'broadcast_address': str(network.broadcast_address),
'netmask': str(network.netmask),
'num_addresses': network.num_addresses,
'host_range': f"{str(network.network_address + 1)} - {str(network.broadcast_address - 1)}" if network.prefixlen < 31 else subnet.cidr
}
return render_template(
'ipam/subnet_view.html',
title=subnet.cidr,
subnet=subnet,
subnet_info=subnet_info,
servers=servers,
now=datetime.now()
)
@bp.route('/subnet/<int:subnet_id>/edit', methods=['GET', 'POST'])
@login_required
def subnet_edit(subnet_id):
"""Edit a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
if request.method == 'POST':
cidr = request.form.get('cidr')
location = request.form.get('location')
auto_scan = request.form.get('auto_scan') == 'on'
# Validate inputs
if not all([cidr, location]):
flash('All fields are required', 'danger')
return render_template('ipam/subnet_form.html',
title='Edit Subnet',
subnet=subnet,
edit_mode=True)
# Validate CIDR format
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
flash('Invalid CIDR format', 'danger')
return render_template('ipam/subnet_form.html',
title='Edit Subnet',
subnet=subnet,
edit_mode=True)
# Update subnet
subnet.cidr = cidr
subnet.location = location
subnet.auto_scan = auto_scan
try:
db.session.commit()
flash(f'Subnet {cidr} has been updated', 'success')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id))
except Exception as e:
db.session.rollback()
flash(f'Error updating subnet: {str(e)}', 'danger')
return render_template('ipam/subnet_form.html',
title='Edit Subnet',
subnet=subnet,
edit_mode=True)
@bp.route('/subnet/<int:subnet_id>/delete', methods=['POST'])
@login_required
def subnet_delete(subnet_id):
"""Delete a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
# Check if subnet has servers
servers_count = Server.query.filter_by(subnet_id=subnet_id).count()
if servers_count > 0:
flash(f'Cannot delete subnet {subnet.cidr}. It has {servers_count} servers assigned.', 'danger')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id))
db.session.delete(subnet)
db.session.commit()
flash(f'Subnet {subnet.cidr} has been deleted', 'success')
return redirect(url_for('ipam.ipam_home'))
@bp.route('/subnet/<int:subnet_id>/scan', methods=['POST'])
@login_required
def subnet_scan(subnet_id):
"""Manually scan a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
try:
# Call the scan function with manual_trigger=True
scan(subnet, manual_trigger=True)
db.session.commit()
flash(f'Scan completed for subnet {subnet.cidr}', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error scanning subnet: {str(e)}', 'danger')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id))