This commit is contained in:
pika 2025-04-03 16:58:01 +02:00
parent 2b36992be1
commit 25087d055c
16 changed files with 1394 additions and 816 deletions

View file

@ -8,8 +8,8 @@ from flask import (
redirect,
url_for,
)
from flask_login import login_required
from app.core.models import Subnet, Server, App, Port
from flask_login import login_required, current_user
from app.core.models import Subnet, Server, App, Port, Location
from app.core.extensions import db
from app.scripts.ip_scanner import scan
import random
@ -20,33 +20,51 @@ from datetime import datetime
from flask import flash
from app.utils.app_utils import is_port_in_use, validate_port_data
from difflib import SequenceMatcher
import json
bp = Blueprint("api", __name__, url_prefix="/api")
csrf = CSRFProtect()
@bp.route("/subnets", methods=["GET"])
@csrf.exempt
@login_required
def get_subnets():
"""Get all subnets grouped by site"""
subnets = Subnet.query.all()
# Group subnets by location (site)
sites = {}
for subnet in subnets:
location = subnet.location
if location not in sites:
sites[location] = []
sites[location].append(
{"id": subnet.id, "cidr": subnet.cidr, "location": location}
)
# Convert to list of site objects
result = [
{"name": site_name, "subnets": subnets} for site_name, subnets in sites.items()
]
return jsonify(result)
try:
subnets = Subnet.query.filter_by(user_id=current_user.id).all()
# Group subnets by location (site)
sites = {}
for subnet in subnets:
location = subnet.location_ref # Make sure this attribute matches your model relationship
if not location:
location_name = "Unassigned"
location_id = None
else:
location_name = location.name
location_id = location.id
if location_id not in sites:
sites[location_id] = {
"name": location_name,
"id": location_id,
"subnets": []
}
sites[location_id]["subnets"].append({
"id": subnet.id,
"cidr": subnet.cidr,
"location_id": location_id
})
# Convert to list of site objects
result = list(sites.values())
return jsonify(result)
except Exception as e:
print(f"Error loading subnets: {e}") # Add debugging
return jsonify({"error": str(e)}), 500
@bp.route("/subnets/<int:subnet_id>", methods=["GET"])
@ -288,40 +306,60 @@ def add_app_port(app_id):
"""Add a port to an application"""
app = App.query.get_or_404(app_id)
# Check if the user owns this app
if app.user_id != current_user.id:
flash("Unauthorized access", "danger")
return redirect(url_for("dashboard.app_view", app_id=app_id))
# Get port details from the form
port_number = request.form.get("port_number")
protocol = request.form.get("protocol", "TCP")
description = request.form.get("description", "")
# Validate the port
valid, clean_port, error = validate_port_data ( # validate_port_data(
valid, clean_port, error = validate_port_data(
port_number,
protocol,
description,
app.server_id,
app_id
descriptions=description,
server_id=app.server_id,
exclude_app_id=app_id,
protocol=protocol
)
if not valid:
flash(error, "danger")
return redirect(url_for("dashboard.app_view", app_id=app_id))
# Create the new port
try:
new_port = Port(
# Check if the port already exists for this app
existing_port = Port.query.filter_by(
app_id=app_id,
port_number=clean_port,
protocol=protocol,
description=description
)
db.session.add(new_port)
protocol=protocol
).first()
if existing_port:
# Update the existing port description
existing_port.description = description
flash(f"Port {clean_port}/{protocol} updated", "success")
else:
# Create a new port
port = Port(
port_number=clean_port,
protocol=protocol,
description=description,
app_id=app_id
)
db.session.add(port)
flash(f"Port {clean_port}/{protocol} added successfully", "success")
db.session.commit()
flash(f"Port {clean_port}/{protocol} added successfully", "success")
return redirect(url_for("dashboard.app_view", app_id=app_id))
except Exception as e:
db.session.rollback()
flash(f"Error adding port: {str(e)}", "danger")
return redirect(url_for("dashboard.app_view", app_id=app_id))
return redirect(url_for("dashboard.app_view", app_id=app_id))
@bp.route("/app/<int:app_id>/ports", methods=["GET"])
@ -337,7 +375,7 @@ def get_app_ports(app_id):
"ports": [
{
"id": port.id,
"number": port.number,
"port_number": port.port_number,
"protocol": port.protocol,
"description": port.description,
}
@ -396,7 +434,7 @@ def get_server_ports(server_id):
# Get all ports associated with this server
ports = Port.query.filter_by(server_id=server_id).all()
used_ports = [port.number for port in ports]
used_ports = [port.port_number for port in ports]
return jsonify({"server_id": server_id, "used_ports": used_ports})
@ -409,7 +447,7 @@ def get_free_port(server_id):
# Get all ports associated with this server
used_ports = [
port.number for port in Port.query.filter_by(server_id=server_id).all()
port.port_number for port in Port.query.filter_by(server_id=server_id).all()
]
# Find the first free port (starting from 8000)
@ -524,3 +562,65 @@ def validate_app_port():
except ValueError:
return jsonify({"valid": False, "message": "Invalid port number"})
@bp.route("/locations", methods=["POST"])
@login_required
def create_location():
"""API endpoint to create a new location"""
data = request.json
if not data or not data.get('name'):
return jsonify({'error': 'Location name is required'}), 400
try:
location = Location(
name=data.get('name'),
description=data.get('description', ''),
user_id=current_user.id
)
db.session.add(location)
db.session.commit()
return jsonify({
'id': location.id,
'name': location.name,
'description': location.description
}), 201
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
@bp.route("/subnets", methods=["POST"])
@login_required
def create_subnet():
"""API endpoint to create a new subnet"""
data = request.json
if not data or not data.get('cidr') or not data.get('location_id'):
return jsonify({'error': 'CIDR and location_id are required'}), 400
try:
subnet = Subnet(
cidr=data.get('cidr'),
location_id=data.get('location_id'),
user_id=current_user.id,
auto_scan=data.get('auto_scan', False),
active_hosts=json.dumps([])
)
db.session.add(subnet)
db.session.commit()
return jsonify({
'id': subnet.id,
'cidr': subnet.cidr,
'location_id': subnet.location_id
}), 201
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500

View file

@ -1,10 +1,10 @@
from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify
from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify, abort
from flask_login import login_required, current_user
import markdown
from app.core.models import Server, App, Subnet, Port
from app.core.models import Server, App, Subnet, Port, Location
from app.core.extensions import db, limiter
from datetime import datetime
from app.utils.app_utils import validate_app_data, is_port_in_use
from app.utils.app_utils import validate_app_data, is_port_in_use, validate_port_data
bp = Blueprint("dashboard", __name__, url_prefix="/dashboard")
@ -77,130 +77,121 @@ def server_view(server_id):
@login_required
def server_new():
"""Create a new server"""
subnets = Subnet.query.all()
# Get all subnets and locations for the current user
subnets = Subnet.query.filter_by(user_id=current_user.id).all()
locations = Location.query.filter_by(user_id=current_user.id).all()
if request.method == "POST":
hostname = request.form.get("hostname")
ip_address = request.form.get("ip_address")
subnet_id = request.form.get("subnet_id")
documentation = request.form.get("documentation", "")
# Basic validation
if not hostname or not ip_address or not subnet_id:
flash("Please fill in all required fields", "danger")
subnet_id = request.form.get("subnet_id") or None
location_id = request.form.get("location_id") or None
description = request.form.get("description")
# Validate inputs
if not hostname or not ip_address:
flash("Hostname and IP address are required", "danger")
return render_template(
"dashboard/server_form.html",
title="New Server",
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now(),
locations=locations
)
# Check if hostname or IP already exists
if Server.query.filter_by(hostname=hostname).first():
flash("Hostname already exists", "danger")
# If no subnet is selected, location is required
if not subnet_id and not location_id:
flash("Either a subnet or a location is required", "danger")
return render_template(
"dashboard/server_form.html",
title="New Server",
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now(),
locations=locations
)
if Server.query.filter_by(ip_address=ip_address).first():
flash("IP address already exists", "danger")
return render_template(
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now(),
)
# Create new server
server = Server(
hostname=hostname,
ip_address=ip_address,
subnet_id=subnet_id,
documentation=documentation,
location_id=location_id,
description=description,
user_id=current_user.id
)
db.session.add(server)
db.session.commit()
flash("Server created successfully", "success")
flash(f"Server {hostname} created successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=server.id))
return render_template(
"dashboard/server_form.html",
title="New Server",
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now(),
locations=locations
)
@bp.route("/server/<int:server_id>/edit", methods=["GET", "POST"])
@login_required
def server_edit(server_id):
"""Edit an existing server"""
"""Edit a server"""
server = Server.query.get_or_404(server_id)
subnets = Subnet.query.all()
# Get all unique locations for datalist
subnet_locations = db.session.query(Subnet.location).distinct().all()
server_locations = db.session.query(Server.location).filter(Server.location != None).distinct().all()
locations = sorted(set([loc[0] for loc in subnet_locations + server_locations if loc[0]]))
if request.method == "POST":
hostname = request.form.get("hostname")
ip_address = request.form.get("ip_address")
subnet_id = request.form.get("subnet_id")
documentation = request.form.get("documentation", "")
if not hostname or not ip_address or not subnet_id:
flash("All fields are required", "danger")
subnet_id = request.form.get("subnet_id") or None
location = request.form.get("location")
# Validate inputs
if not hostname or not ip_address:
flash("Hostname and IP address are required", "danger")
return render_template(
"dashboard/server_form.html",
title="Edit Server",
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets,
locations=locations,
edit_mode=True
)
# Check if hostname changed and already exists
if (
hostname != server.hostname
and Server.query.filter_by(hostname=hostname).first()
):
flash("Hostname already exists", "danger")
# If no subnet is selected, location is required
if not subnet_id and not location:
flash("Location is required for servers without a subnet", "danger")
return render_template(
"dashboard/server_form.html",
title="Edit Server",
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets,
locations=locations,
edit_mode=True
)
# Check if IP changed and already exists
if (
ip_address != server.ip_address
and Server.query.filter_by(ip_address=ip_address).first()
):
flash("IP address already exists", "danger")
return render_template(
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets,
)
# Update server
server.hostname = hostname
server.ip_address = ip_address
server.subnet_id = subnet_id
server.documentation = documentation
server.location = location if not subnet_id else None
db.session.commit()
flash("Server updated successfully", "success")
flash(f"Server {hostname} updated successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=server.id))
# GET request - show form with current values
return render_template(
"dashboard/server_form.html",
title=f"Edit Server - {server.hostname}",
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets,
locations=locations,
edit_mode=True
)
@ -221,120 +212,79 @@ def server_delete(server_id):
return redirect(url_for("dashboard.dashboard_home"))
@bp.route("/app/new", methods=["GET", "POST"])
@bp.route("/app/new", defaults={'server_id': None}, methods=["GET", "POST"])
@bp.route("/app/new/<int:server_id>", methods=["GET", "POST"])
@login_required
def app_new(server_id=None):
"""Create a new application"""
servers = Server.query.all()
"""Create a new application for a server"""
# Get all servers for the dropdown
servers = Server.query.filter_by(user_id=current_user.id).all()
# If server_id is provided, validate it
selected_server = None
if server_id:
selected_server = Server.query.get_or_404(server_id)
if selected_server.user_id != current_user.id:
abort(403)
if request.method == "POST":
name = request.form.get("name", "").strip()
server_id = request.form.get("server_id")
documentation = request.form.get("documentation", "")
name = request.form.get("name")
url = request.form.get("url", "")
documentation = request.form.get("documentation", "")
form_server_id = request.form.get("server_id")
# Validate application name
if not name:
flash("Application name is required", "danger")
# Validate inputs
if not name or not form_server_id:
flash("Application name and server are required", "danger")
return render_template(
"dashboard/app_form.html",
edit_mode=False,
"dashboard/app_form.html",
title="New Application",
servers=servers,
selected_server_id=server_id
selected_server_id=form_server_id or server_id,
edit_mode=False,
app={} # Empty app object for the template
)
# Check for duplicate application names on the same server
existing_app = App.query.filter_by(name=name, server_id=server_id).first()
if existing_app:
flash(f"An application with the name '{name}' already exists on this server", "danger")
return render_template(
"dashboard/app_form.html",
edit_mode=False,
servers=servers,
selected_server_id=server_id
)
# Verify the selected server belongs to the user
server = Server.query.get_or_404(form_server_id)
if server.user_id != current_user.id:
abort(403)
# Process port data from form
port_data = []
port_numbers = request.form.getlist("port_numbers[]")
protocols = request.form.getlist("protocols[]")
descriptions = request.form.getlist("port_descriptions[]")
for i in range(len(port_numbers)):
if port_numbers[i] and port_numbers[i].strip():
protocol = protocols[i] if i < len(protocols) else "TCP"
description = descriptions[i] if i < len(descriptions) else ""
port_data.append((port_numbers[i], protocol, description))
# Check for port conflicts proactively
conflicts = []
seen_ports = set() # To track ports already seen in this submission
for i, (port_number, protocol, _) in enumerate(port_data):
try:
clean_port = int(port_number)
# Check if this port has already been seen in this submission
port_key = f"{clean_port}/{protocol}"
if port_key in seen_ports:
conflicts.append((clean_port, protocol, "Duplicate port in submission"))
continue
seen_ports.add(port_key)
# Check if the port is in use by another application
in_use, conflicting_app_name = is_port_in_use(
clean_port, protocol, server_id
)
if in_use:
conflicts.append((clean_port, protocol, f"Port {clean_port}/{protocol} is already in use by application '{conflicting_app_name}'"))
except (ValueError, TypeError):
continue
if conflicts:
for conflict in conflicts:
flash(f"Conflict: {conflict[0]}/{conflict[1]} - {conflict[2]}", "danger")
return render_template(
"dashboard/app_form.html",
edit_mode=False,
servers=servers,
selected_server_id=server_id
)
# Create the application
try:
new_app = App(
# Create new application
app = App(
name=name,
server_id=server_id,
documentation=documentation,
url=url
server_id=form_server_id,
user_id=current_user.id, # Set user_id explicitly
documentation=documentation or "",
url=url or ""
)
db.session.add(new_app)
db.session.flush() # Get the app ID without committing
# Add ports
for port_number, protocol, description in port_data:
new_port = Port(
app_id=new_app.id,
port_number=int(port_number),
protocol=protocol,
description=description
)
db.session.add(new_port)
db.session.add(app)
db.session.commit()
flash(f"Application '{name}' created successfully", "success")
return redirect(url_for("dashboard.app_view", app_id=new_app.id))
flash(f"Application {name} created successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=form_server_id))
except Exception as e:
db.session.rollback()
flash(f"Error creating application: {str(e)}", "danger")
return render_template(
"dashboard/app_form.html",
title="New Application",
servers=servers,
selected_server_id=form_server_id or server_id,
edit_mode=False,
app={} # Empty app object for the template
)
# GET request or validation failed - render the form
# GET request - show the form
return render_template(
"dashboard/app_form.html",
edit_mode=False,
"dashboard/app_form.html",
title="New Application",
servers=servers,
selected_server_id=server_id
selected_server_id=server_id,
edit_mode=False,
app={} # Empty app object for the template
)
@ -357,118 +307,71 @@ def app_view(app_id):
@bp.route("/app/<int:app_id>/edit", methods=["GET", "POST"])
@login_required
def app_edit(app_id):
"""Edit an existing application with comprehensive error handling"""
"""Edit an existing application"""
app = App.query.get_or_404(app_id)
servers = Server.query.all()
# Check that the current user owns this app
if app.user_id != current_user.id:
abort(403)
# Get all servers for the dropdown
servers = Server.query.filter_by(user_id=current_user.id).all()
if request.method == "POST":
# Get form data
name = request.form.get("name", "").strip()
server_id = request.form.get("server_id")
name = request.form.get("name")
url = request.form.get("url", "")
documentation = request.form.get("documentation", "")
url = request.form.get("url", "") # Get the URL
# Validate application name
if not name:
flash("Application name is required", "danger")
return render_template(
"dashboard/app_form.html",
edit_mode=True,
app=app,
servers=servers
)
server_id = request.form.get("server_id")
port_data = request.form.getlist("port")
port_descriptions = request.form.getlist("port_description")
# Check for duplicate application names on the same server (excluding this app)
existing_app = App.query.filter(
App.name == name,
App.server_id == server_id,
App.id != app_id
).first()
# Validate required fields
if not name or not server_id:
flash("Application name and server are required", "danger")
return render_template("dashboard/app_form.html", app=app, servers=servers, edit_mode=True)
if existing_app:
flash(f"An application with the name '{name}' already exists on this server", "danger")
return render_template(
"dashboard/app_form.html",
edit_mode=True,
app=app,
servers=servers
)
# Process port data from form
port_data = []
port_numbers = request.form.getlist("port_numbers[]")
protocols = request.form.getlist("protocols[]")
descriptions = request.form.getlist("port_descriptions[]")
for i in range(len(port_numbers)):
if port_numbers[i] and port_numbers[i].strip():
protocol = protocols[i] if i < len(protocols) else "TCP"
description = descriptions[i] if i < len(descriptions) else ""
port_data.append((port_numbers[i], protocol, description))
# Check for port conflicts proactively
conflicts = []
seen_ports = set() # To track ports already seen in this submission
for i, (port_number, protocol, _) in enumerate(port_data):
try:
clean_port = int(port_number)
# Check if this port has already been seen in this submission
port_key = f"{clean_port}/{protocol}"
if port_key in seen_ports:
conflicts.append((clean_port, protocol, "Duplicate port in submission"))
# Validate port data - exclude current app's ports from conflict check
port_error = validate_port_data(port_data, port_descriptions, server_id, app_id)
if port_error:
flash(port_error, "danger")
return render_template("dashboard/app_form.html", app=app, servers=servers, edit_mode=True)
try:
# Update app details
app.name = name
app.server_id = server_id
app.documentation = documentation
app.url = url
app.updated_at = datetime.utcnow()
# Update ports
# First, remove all existing ports
Port.query.filter_by(app_id=app.id).delete()
# Then add the new ports
for i, port_number in enumerate(port_data):
if not port_number: # Skip empty port entries
continue
seen_ports.add(port_key)
# Check if the port is in use by another application
in_use, conflicting_app_name = is_port_in_use(
clean_port, protocol, server_id, exclude_app_id=app_id
description = port_descriptions[i] if i < len(port_descriptions) else ""
port = Port(
port_number=port_number,
description=description,
app_id=app.id
)
if in_use:
conflicts.append((clean_port, protocol, f"Port {clean_port}/{protocol} is already in use by application '{conflicting_app_name}'"))
except (ValueError, TypeError):
continue
if conflicts:
for conflict in conflicts:
flash(f"Conflict: {conflict[0]}/{conflict[1]} - {conflict[2]}", "danger")
return render_template(
"dashboard/app_form.html",
edit_mode=True,
app=app,
servers=servers
)
# Update application details
app.name = name
app.server_id = server_id
app.documentation = documentation
app.url = url
# Only delete existing ports if new port data is provided
if port_data:
# Remove existing ports and add new ones
Port.query.filter_by(app_id=app_id).delete()
for port_number, protocol, description in port_data:
new_port = Port(
app_id=app_id,
port_number=int(port_number),
protocol=protocol,
description=description
)
db.session.add(new_port)
db.session.commit()
flash("Application updated successfully", "success")
return redirect(url_for("dashboard.app_view", app_id=app_id))
return render_template(
"dashboard/app_form.html",
edit_mode=True,
app=app,
servers=servers
)
db.session.add(port)
db.session.commit()
flash(f"Application {name} updated successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=server_id))
except Exception as e:
db.session.rollback()
flash(f"Error updating application: {str(e)}", "danger")
return render_template("dashboard/app_form.html", app=app, servers=servers, edit_mode=True)
# GET request - show the form
return render_template("dashboard/app_form.html", app=app, servers=servers, edit_mode=True)
@bp.route("/app/<int:app_id>/delete", methods=["POST"])
@ -534,73 +437,73 @@ def settings():
@bp.route("/overview")
@login_required
def overview():
"""Hierarchical overview of subnets, servers, and applications"""
# Get all subnets with their servers
subnets = Subnet.query.all()
"""Display an overview of the infrastructure"""
# Get all servers, subnets, apps for the current user
servers = Server.query.filter_by(user_id=current_user.id).all()
subnets = Subnet.query.filter_by(user_id=current_user.id).all()
locations = Location.query.filter_by(user_id=current_user.id).all()
# Get servers without subnets
standalone_servers = Server.query.filter(Server.subnet_id.is_(None)).all()
# Create location lookup dictionary for quick access
location_lookup = {loc.id: loc.name for loc in locations}
# Create a hierarchical structure
hierarchy = {
'subnets': [],
'standalone_servers': []
}
# Count servers by subnet
servers_by_subnet = {}
unassigned_servers = []
# Organize subnets and their servers
# Process servers
for server in servers:
if server.subnet_id:
if server.subnet_id not in servers_by_subnet:
servers_by_subnet[server.subnet_id] = []
servers_by_subnet[server.subnet_id].append(server)
else:
unassigned_servers.append(server)
# Prepare subnet data for the chart
subnet_data = []
for subnet in subnets:
subnet_data = {
# Get the location name using the location_id
location_name = location_lookup.get(subnet.location_id, 'Unassigned')
subnet_data.append({
'id': subnet.id,
'cidr': subnet.cidr,
'location': subnet.location,
'servers': []
}
# Only add description if it exists as an attribute
if hasattr(subnet, 'description'):
subnet_data['description'] = subnet.description
for server in subnet.servers:
server_data = {
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'apps': []
}
for app in server.apps:
app_data = {
'id': app.id,
'name': app.name,
'ports': app.ports
}
server_data['apps'].append(app_data)
subnet_data['servers'].append(server_data)
hierarchy['subnets'].append(subnet_data)
'location': location_name,
'server_count': len(servers_by_subnet.get(subnet.id, [])),
'servers': servers_by_subnet.get(subnet.id, [])
})
# Organize standalone servers
for server in standalone_servers:
server_data = {
# Count apps by server
servers_with_app_count = []
for server in servers:
app_count = len(server.apps) # Adjust this if needed based on your relationship
servers_with_app_count.append({
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'apps': []
}
for app in server.apps:
app_data = {
'id': app.id,
'name': app.name,
'ports': app.ports
}
server_data['apps'].append(app_data)
hierarchy['standalone_servers'].append(server_data)
'app_count': app_count
})
# Sort by app count descending
servers_with_app_count.sort(key=lambda x: x['app_count'], reverse=True)
return render_template(
"dashboard/overview.html",
title="Infrastructure Overview",
hierarchy=hierarchy
subnet_count=len(subnets),
server_count=len(servers),
subnet_data=subnet_data,
unassigned_servers=unassigned_servers,
servers_with_app_count=servers_with_app_count[:5] # Top 5 servers
)
@bp.route("/apps")
@login_required
def app_list():
"""List all applications"""
apps = App.query.order_by(App.name).all()
return render_template(
"dashboard/app_list.html",
title="Applications",
apps=apps
)

View file

@ -1,6 +1,6 @@
from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify
from flask_login import login_required
from app.core.models import Subnet, Server, App
from flask_login import login_required, current_user
from app.core.models import Subnet, Server, App, Location
from app.core.extensions import db
from app.scripts.ip_scanner import scan
import ipaddress
@ -36,32 +36,24 @@ def ipam_home():
@login_required
def subnet_new():
"""Create a new subnet"""
# Get all locations for the dropdown
locations = Location.query.filter_by(user_id=current_user.id).all()
if request.method == "POST":
cidr = request.form.get("cidr")
location = request.form.get("location")
location_id = request.form.get("location_id")
auto_scan = request.form.get("auto_scan") == "on"
# Basic validation
if not cidr or not location:
flash("Please fill in all required fields", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet")
# Validate CIDR format
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
flash("Invalid CIDR format", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet")
# Check if CIDR already exists
if Subnet.query.filter_by(cidr=cidr).first():
flash("Subnet already exists", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet")
# Create new subnet with JSON string for active_hosts, not a Python list
if not cidr or not location_id:
flash("CIDR notation and location are required", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet", locations=locations)
# Create new subnet
subnet = Subnet(
cidr=cidr,
location=location,
location_id=location_id,
user_id=current_user.id,
active_hosts=json.dumps([]), # Convert empty list to JSON string
last_scanned=None,
auto_scan=auto_scan,
@ -73,7 +65,7 @@ def subnet_new():
flash("Subnet created successfully", "success")
return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id))
return render_template("ipam/subnet_form.html", title="New Subnet")
return render_template("ipam/subnet_form.html", title="New Subnet", locations=locations)
@bp.route("/subnet/<int:subnet_id>")
@ -267,43 +259,39 @@ def subnet_create_ajax():
return jsonify({"success": False, "error": str(e)})
@bp.route("/location/<location>")
@bp.route("/location/<int:location_id>")
@login_required
def location_overview(location):
def location_overview(location_id):
"""View all subnets and servers in a specific location"""
# Get all subnets in this location
subnets = Subnet.query.filter_by(location=location).all()
# Get servers in these subnets
servers = []
for subnet in subnets:
subnet_servers = Server.query.filter_by(subnet_id=subnet.id).all()
servers.extend(subnet_servers)
# Get the location (ensure it belongs to the current user)
location = Location.query.filter_by(id=location_id, user_id=current_user.id).first_or_404()
# Create a hierarchical structure
hierarchy = {
'subnets': [],
'standalone_servers': []
'locations': {
location.name: {
'id': location.id,
'description': location.description,
'subnets': [],
'standalone_servers': []
}
}
}
# Organize subnets and their servers
for subnet in subnets:
for subnet in location.subnets:
subnet_data = {
'id': subnet.id,
'cidr': subnet.cidr,
'location': subnet.location,
'servers': []
}
# Only add description if it exists as an attribute
if hasattr(subnet, 'description'):
subnet_data['description'] = subnet.description
for server in subnet.servers:
server_data = {
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'description': server.description,
'apps': []
}
@ -311,17 +299,59 @@ def location_overview(location):
app_data = {
'id': app.id,
'name': app.name,
'ports': app.ports
'url': app.url,
'ports': []
}
for port in app.ports:
port_data = {
'id': port.id,
'number': port.port_number,
'protocol': port.protocol,
'description': port.description
}
app_data['ports'].append(port_data)
server_data['apps'].append(app_data)
subnet_data['servers'].append(server_data)
hierarchy['subnets'].append(subnet_data)
hierarchy['locations'][location.name]['subnets'].append(subnet_data)
# Add standalone servers
for server in location.standalone_servers:
server_data = {
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'description': server.description,
'apps': []
}
for app in server.apps:
app_data = {
'id': app.id,
'name': app.name,
'url': app.url,
'ports': []
}
for port in app.ports:
port_data = {
'id': port.id,
'number': port.port_number,
'protocol': port.protocol,
'description': port.description
}
app_data['ports'].append(port_data)
server_data['apps'].append(app_data)
hierarchy['locations'][location.name]['standalone_servers'].append(server_data)
return render_template(
"dashboard/overview.html",
title=f"{location} Overview",
"ipam/location_overview.html",
title=f"{location.name} Overview",
hierarchy=hierarchy,
location=location
)