This commit is contained in:
pika 2025-03-30 23:42:24 +02:00
parent 3b2f1db4ce
commit 5c16964b76
47 changed files with 2080 additions and 1053 deletions

Binary file not shown.

View file

@ -2,43 +2,44 @@ from flask import Flask, g, redirect, url_for, render_template
import datetime
import os
def create_app(config_name='development'):
app = Flask(__name__,
static_folder='static',
template_folder='templates')
def create_app(config_name="development"):
app = Flask(__name__, static_folder="static", template_folder="templates")
# Load configuration
if config_name == 'production':
app.config.from_object('config.ProductionConfig')
elif config_name == 'testing':
app.config.from_object('config.TestingConfig')
if config_name == "production":
app.config.from_object("config.ProductionConfig")
elif config_name == "testing":
app.config.from_object("config.TestingConfig")
else:
app.config.from_object('config.DevelopmentConfig')
app.config.from_object("config.DevelopmentConfig")
# Initialize extensions
from app.core.extensions import db, migrate, login_manager, bcrypt, limiter, csrf
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
bcrypt.init_app(app)
csrf.init_app(app)
limiter.init_app(app)
# Initialize login manager
from app.core.auth import User
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
login_manager.login_view = 'auth.login'
login_manager.login_message = 'Please log in to access this page.'
login_manager.login_message_category = 'info'
login_manager.login_view = "auth.login"
login_manager.login_message = "Please log in to access this page."
login_manager.login_message_category = "info"
# Register template filters
from app.core.template_filters import bp as filters_bp
app.register_blueprint(filters_bp)
# Create database tables without seeding any data
with app.app_context():
try:
@ -46,31 +47,44 @@ def create_app(config_name='development'):
print("Database tables created successfully")
except Exception as e:
print(f"Error with database setup: {e}")
# Register blueprints
from app.routes.auth import bp as auth_bp
app.register_blueprint(auth_bp)
from app.routes.dashboard import bp as dashboard_bp
app.register_blueprint(dashboard_bp)
from app.routes.ipam import bp as ipam_bp
app.register_blueprint(ipam_bp)
from app.routes.api import bp as api_bp
app.register_blueprint(api_bp)
from app.routes.importexport import bp as importexport_bp
app.register_blueprint(importexport_bp)
# Add static assets blueprint
from app.routes.static import bp as static_bp
app.register_blueprint(static_bp)
# Add error handlers
@app.errorhandler(404)
def page_not_found(e):
return render_template('errors/404.html', title='Page Not Found'), 404
return render_template("errors/404.html", title="Page Not Found"), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template('errors/500.html', title='Server Error'), 500
return render_template("errors/500.html", title="Server Error"), 500
@app.errorhandler(403)
def forbidden(e):
return render_template('errors/403.html', title='Forbidden'), 403
return app
return render_template("errors/403.html", title="Forbidden"), 403
return app

View file

@ -4,11 +4,12 @@ from .extensions import db, bcrypt
from datetime import datetime
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.login_view = "auth.login"
class User(UserMixin, db.Model):
__tablename__ = 'users'
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=True)
email = db.Column(db.String(120), unique=True, nullable=False)
@ -16,19 +17,20 @@ class User(UserMixin, db.Model):
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<User {self.username}>'
return f"<User {self.username}>"
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def get_id(self):
return str(self.id)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
return User.query.get(int(user_id))

View file

@ -10,13 +10,12 @@ from flask_wtf.csrf import CSRFProtect
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.login_message = 'Please log in to access this page.'
login_manager.login_message_category = 'info'
login_manager.login_view = "auth.login"
login_manager.login_message = "Please log in to access this page."
login_manager.login_message_category = "info"
bcrypt = Bcrypt()
csrf = CSRFProtect()
limiter = Limiter(
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
key_func=get_remote_address, default_limits=["200 per day", "50 per hour"]
)

View file

@ -8,42 +8,49 @@ from flask_login import UserMixin
# User model has been moved to app.core.auth
# Import it from there instead if needed: from app.core.auth import User
class Port(db.Model):
__tablename__ = 'ports'
__tablename__ = "ports"
id = db.Column(db.Integer, primary_key=True)
app_id = db.Column(db.Integer, db.ForeignKey('apps.id', ondelete='CASCADE'), nullable=False)
app_id = db.Column(
db.Integer, db.ForeignKey("apps.id", ondelete="CASCADE"), nullable=False
)
port_number = db.Column(db.Integer, nullable=False)
protocol = db.Column(db.String(10), default='TCP') # TCP, UDP, etc.
protocol = db.Column(db.String(10), default="TCP") # TCP, UDP, etc.
description = db.Column(db.String(200))
# Relationship
app = db.relationship('App', back_populates='ports')
app = db.relationship("App", back_populates="ports")
def __repr__(self):
return f'<Port {self.port_number}/{self.protocol}>'
return f"<Port {self.port_number}/{self.protocol}>"
class Server(db.Model):
__tablename__ = 'servers'
__tablename__ = "servers"
id = db.Column(db.Integer, primary_key=True)
hostname = db.Column(db.String(64), nullable=False)
ip_address = db.Column(db.String(39), nullable=False) # IPv4 or IPv6
subnet_id = db.Column(db.Integer, db.ForeignKey('subnets.id'), nullable=False)
subnet_id = db.Column(db.Integer, db.ForeignKey("subnets.id"), nullable=False)
documentation = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
# Relationships
subnet = db.relationship('Subnet', back_populates='servers')
apps = db.relationship('App', back_populates='server', cascade='all, delete-orphan')
subnet = db.relationship("Subnet", back_populates="servers")
apps = db.relationship("App", back_populates="server", cascade="all, delete-orphan")
def __repr__(self):
return f'<Server {self.hostname}>'
return f"<Server {self.hostname}>"
class Subnet(db.Model):
__tablename__ = 'subnets'
__tablename__ = "subnets"
id = db.Column(db.Integer, primary_key=True)
cidr = db.Column(db.String(18), unique=True, nullable=False) # e.g., 192.168.1.0/24
location = db.Column(db.String(64))
@ -51,43 +58,48 @@ class Subnet(db.Model):
last_scanned = db.Column(db.DateTime)
auto_scan = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
# Relationships
servers = db.relationship('Server', back_populates='subnet')
servers = db.relationship("Server", back_populates="subnet")
def __repr__(self):
return f'<Subnet {self.cidr}>'
return f"<Subnet {self.cidr}>"
@property
def used_ips(self):
"""Number of IPs used in this subnet (servers)"""
return len(self.servers)
# Getter and setter for active_hosts as JSON
@property
def active_hosts_list(self):
if not self.active_hosts:
return []
return json.loads(self.active_hosts)
@active_hosts_list.setter
def active_hosts_list(self, hosts):
self.active_hosts = json.dumps(hosts)
class App(db.Model):
__tablename__ = 'apps'
__tablename__ = "apps"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False)
server_id = db.Column(db.Integer, db.ForeignKey('servers.id'), nullable=False)
server_id = db.Column(db.Integer, db.ForeignKey("servers.id"), nullable=False)
documentation = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
# Relationships
server = db.relationship('Server', back_populates='apps')
ports = db.relationship('Port', back_populates='app', cascade='all, delete-orphan')
server = db.relationship("Server", back_populates="apps")
ports = db.relationship("Port", back_populates="app", cascade="all, delete-orphan")
def __repr__(self):
return f'<App {self.name}>'
return f"<App {self.name}>"

View file

@ -3,45 +3,42 @@ import markdown as md_package
import re
from flask import Blueprint
bp = Blueprint('filters', __name__)
bp = Blueprint("filters", __name__)
def github_style_admonition(text):
"""Transform GitHub-style alerts (> [!NOTE], etc.) to custom HTML"""
patterns = {
r'> \[!NOTE\](.*?)(?:\n\n|\Z)': '<div class="markdown-alert markdown-alert-note"><p class="markdown-alert-title">Note</p>\\1</div>',
r'> \[!TIP\](.*?)(?:\n\n|\Z)': '<div class="markdown-alert markdown-alert-tip"><p class="markdown-alert-title">Tip</p>\\1</div>',
r'> \[!IMPORTANT\](.*?)(?:\n\n|\Z)': '<div class="markdown-alert markdown-alert-important"><p class="markdown-alert-title">Important</p>\\1</div>',
r'> \[!WARNING\](.*?)(?:\n\n|\Z)': '<div class="markdown-alert markdown-alert-warning"><p class="markdown-alert-title">Warning</p>\\1</div>',
r'> \[!CAUTION\](.*?)(?:\n\n|\Z)': '<div class="markdown-alert markdown-alert-caution"><p class="markdown-alert-title">Caution</p>\\1</div>'
r"> \[!NOTE\](.*?)(?:\n\n|\Z)": '<div class="markdown-alert markdown-alert-note"><p class="markdown-alert-title">Note</p>\\1</div>',
r"> \[!TIP\](.*?)(?:\n\n|\Z)": '<div class="markdown-alert markdown-alert-tip"><p class="markdown-alert-title">Tip</p>\\1</div>',
r"> \[!IMPORTANT\](.*?)(?:\n\n|\Z)": '<div class="markdown-alert markdown-alert-important"><p class="markdown-alert-title">Important</p>\\1</div>',
r"> \[!WARNING\](.*?)(?:\n\n|\Z)": '<div class="markdown-alert markdown-alert-warning"><p class="markdown-alert-title">Warning</p>\\1</div>',
r"> \[!CAUTION\](.*?)(?:\n\n|\Z)": '<div class="markdown-alert markdown-alert-caution"><p class="markdown-alert-title">Caution</p>\\1</div>',
}
for pattern, replacement in patterns.items():
text = re.sub(pattern, replacement, text, flags=re.DOTALL)
return text
@bp.app_template_filter('markdown')
@bp.app_template_filter("markdown")
def markdown_filter(text):
"""Convert markdown text to HTML with support for GitHub-style features"""
if text:
# Pre-process GitHub-style alerts
text = github_style_admonition(text)
# Convert to HTML with regular markdown
html = md_package.markdown(
text,
extensions=[
'tables',
'fenced_code',
'codehilite',
'nl2br'
]
text, extensions=["tables", "fenced_code", "codehilite", "nl2br"]
)
return html
return ""
@bp.app_template_filter('ip_network')
@bp.app_template_filter("ip_network")
def ip_network_filter(cidr):
"""Convert a CIDR string to an IP network object"""
try:
@ -49,7 +46,8 @@ def ip_network_filter(cidr):
except ValueError:
return None
@bp.app_template_filter('ip_address')
@bp.app_template_filter("ip_address")
def ip_address_filter(ip):
"""Convert an IP string to an IP address object"""
try:
@ -57,10 +55,11 @@ def ip_address_filter(ip):
except ValueError:
return None
@bp.app_template_global('get_ip_network')
@bp.app_template_global("get_ip_network")
def get_ip_network(cidr):
"""Global function to get an IP network object from CIDR"""
try:
return ipaddress.ip_network(cidr, strict=False)
except ValueError:
return None
return None

Binary file not shown.

Binary file not shown.

View file

@ -1,365 +1,439 @@
from flask import Blueprint, jsonify, request, abort
from flask import Blueprint, jsonify, request, abort, current_app, render_template
from flask_login import login_required
from app.core.models import Subnet, Server, App, Port
from app.core.extensions import db
from app.scripts.ip_scanner import scan
import random
import ipaddress
from flask_wtf import CSRFProtect
import markdown
from datetime import datetime
bp = Blueprint('api', __name__, url_prefix='/api')
bp = Blueprint("api", __name__, url_prefix="/api")
csrf = CSRFProtect()
@bp.route('/subnets', methods=['GET'])
@bp.route("/subnets", methods=["GET"])
def get_subnets():
"""Get all subnets grouped by site"""
subnets = Subnet.query.all()
# Group subnets by location (site)
sites = {}
for subnet in subnets:
location = subnet.location
if location not in sites:
sites[location] = []
sites[location].append({
'id': subnet.id,
'cidr': subnet.cidr,
'location': location
})
sites[location].append(
{"id": subnet.id, "cidr": subnet.cidr, "location": location}
)
# Convert to list of site objects
result = [
{
'name': site_name,
'subnets': subnets
} for site_name, subnets in sites.items()
{"name": site_name, "subnets": subnets} for site_name, subnets in sites.items()
]
return jsonify(result)
@bp.route('/subnets/<int:subnet_id>', methods=['GET'])
@bp.route("/subnets/<int:subnet_id>", methods=["GET"])
@login_required
def get_subnet(subnet_id):
"""Get details for a specific subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
servers = []
for server in Server.query.filter_by(subnet_id=subnet_id).all():
servers.append({
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
servers.append(
{
"id": server.id,
"hostname": server.hostname,
"ip_address": server.ip_address,
"created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
)
result = {
'id': subnet.id,
'cidr': subnet.cidr,
'location': subnet.location,
'used_ips': subnet.used_ips,
'auto_scan': subnet.auto_scan,
'created_at': subnet.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'servers': servers
"id": subnet.id,
"cidr": subnet.cidr,
"location": subnet.location,
"used_ips": subnet.used_ips,
"auto_scan": subnet.auto_scan,
"created_at": subnet.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"servers": servers,
}
return jsonify(result)
@bp.route('/subnets/<int:subnet_id>/scan', methods=['POST'])
@bp.route("/subnets/<int:subnet_id>/scan", methods=["POST"])
@login_required
def api_subnet_scan(subnet_id):
"""Scan a subnet via API"""
subnet = Subnet.query.get_or_404(subnet_id)
try:
results = scan(subnet.cidr, save_results=True)
return jsonify({
'success': True,
'subnet': subnet.cidr,
'hosts_found': len(results),
'results': results
})
return jsonify(
{
"success": True,
"subnet": subnet.cidr,
"hosts_found": len(results),
"results": results,
}
)
except Exception as e:
return jsonify({
'success': False,
'message': f'Error scanning subnet: {str(e)}'
}), 500
return (
jsonify({"success": False, "message": f"Error scanning subnet: {str(e)}"}),
500,
)
@bp.route('/servers', methods=['GET'])
@bp.route("/servers", methods=["GET"])
@login_required
def get_servers():
"""Get all servers"""
servers = Server.query.all()
result = []
for server in servers:
result.append({
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'subnet_id': server.subnet_id,
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify({'servers': result})
@bp.route('/servers/<int:server_id>', methods=['GET'])
for server in servers:
result.append(
{
"id": server.id,
"hostname": server.hostname,
"ip_address": server.ip_address,
"subnet_id": server.subnet_id,
"created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
)
return jsonify({"servers": result})
@bp.route("/servers/<int:server_id>", methods=["GET"])
@login_required
def get_server(server_id):
"""Get a specific server"""
server = Server.query.get_or_404(server_id)
apps = []
for app in server.apps:
ports = []
for port in app.ports:
ports.append({
'id': port.id,
'port_number': port.port_number,
'protocol': port.protocol,
'description': port.description
})
apps.append({
'id': app.id,
'name': app.name,
'ports': ports,
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
ports.append(
{
"id": port.id,
"port_number": port.port_number,
"protocol": port.protocol,
"description": port.description,
}
)
apps.append(
{
"id": app.id,
"name": app.name,
"ports": ports,
"created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
)
result = {
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'subnet_id': server.subnet_id,
'documentation': server.documentation,
'apps': apps,
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
"id": server.id,
"hostname": server.hostname,
"ip_address": server.ip_address,
"subnet_id": server.subnet_id,
"documentation": server.documentation,
"apps": apps,
"created_at": server.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
return jsonify(result)
@bp.route('/apps', methods=['GET'])
@bp.route("/apps", methods=["GET"])
@login_required
def get_apps():
"""Get all applications"""
apps = App.query.all()
result = []
for app in apps:
result.append({
'id': app.id,
'name': app.name,
'server_id': app.server_id,
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify({'apps': result})
@bp.route('/apps/<int:app_id>', methods=['GET'])
for app in apps:
result.append(
{
"id": app.id,
"name": app.name,
"server_id": app.server_id,
"created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
)
return jsonify({"apps": result})
@bp.route("/apps/<int:app_id>", methods=["GET"])
@login_required
def get_app(app_id):
"""Get details for a specific application"""
app = App.query.get_or_404(app_id)
result = {
'id': app.id,
'name': app.name,
'server_id': app.server_id,
'documentation': app.documentation,
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'ports': app.ports
"id": app.id,
"name": app.name,
"server_id": app.server_id,
"documentation": app.documentation,
"created_at": app.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"ports": app.ports,
}
return jsonify(result)
@bp.route('/status', methods=['GET'])
def status():
return jsonify({'status': 'OK'})
@bp.route('/markdown-preview', methods=['POST'])
@bp.route("/status", methods=["GET"])
def status():
return jsonify({"status": "OK"})
@bp.route("/markdown-preview", methods=["POST"])
@csrf.exempt # Remove this line in production! Temporary fix for demo purposes
def markdown_preview():
data = request.json
md_content = data.get('markdown', '')
md_content = data.get("markdown", "")
html = markdown.markdown(md_content)
return jsonify({'html': html})
return jsonify({"html": html})
@bp.route('/ports/suggest', methods=['GET'])
@bp.route("/ports/suggest", methods=["GET"])
def suggest_ports():
app_type = request.args.get('type', '').lower()
app_type = request.args.get("type", "").lower()
# Common port suggestions based on app type
suggestions = {
'web': [
{'port': 80, 'type': 'tcp', 'desc': 'HTTP'},
{'port': 443, 'type': 'tcp', 'desc': 'HTTPS'}
"web": [
{"port": 80, "type": "tcp", "desc": "HTTP"},
{"port": 443, "type": "tcp", "desc": "HTTPS"},
],
'database': [
{'port': 3306, 'type': 'tcp', 'desc': 'MySQL'},
{'port': 5432, 'type': 'tcp', 'desc': 'PostgreSQL'},
{'port': 1521, 'type': 'tcp', 'desc': 'Oracle'}
"database": [
{"port": 3306, "type": "tcp", "desc": "MySQL"},
{"port": 5432, "type": "tcp", "desc": "PostgreSQL"},
{"port": 1521, "type": "tcp", "desc": "Oracle"},
],
'mail': [
{'port': 25, 'type': 'tcp', 'desc': 'SMTP'},
{'port': 143, 'type': 'tcp', 'desc': 'IMAP'},
{'port': 110, 'type': 'tcp', 'desc': 'POP3'}
"mail": [
{"port": 25, "type": "tcp", "desc": "SMTP"},
{"port": 143, "type": "tcp", "desc": "IMAP"},
{"port": 110, "type": "tcp", "desc": "POP3"},
],
"file": [
{"port": 21, "type": "tcp", "desc": "FTP"},
{"port": 22, "type": "tcp", "desc": "SFTP/SSH"},
{"port": 445, "type": "tcp", "desc": "SMB"},
],
'file': [
{'port': 21, 'type': 'tcp', 'desc': 'FTP'},
{'port': 22, 'type': 'tcp', 'desc': 'SFTP/SSH'},
{'port': 445, 'type': 'tcp', 'desc': 'SMB'}
]
}
if app_type in suggestions:
return jsonify(suggestions[app_type])
# Default suggestions
return jsonify([
{'port': 80, 'type': 'tcp', 'desc': 'HTTP'},
{'port': 22, 'type': 'tcp', 'desc': 'SSH'}
])
@bp.route('/servers/<int:server_id>/suggest_port', methods=['GET'])
# Default suggestions
return jsonify(
[
{"port": 80, "type": "tcp", "desc": "HTTP"},
{"port": 22, "type": "tcp", "desc": "SSH"},
]
)
@bp.route("/servers/<int:server_id>/suggest_port", methods=["GET"])
@login_required
def suggest_port(server_id):
"""Suggest a random unused port for a server"""
server = Server.query.get_or_404(server_id)
# Get all used ports for this server
used_ports = []
for app in server.apps:
for port in app.ports:
used_ports.append(port.port_number)
# Find an unused port in the dynamic/private port range
available_port = None
attempts = 0
while attempts < 50: # Try 50 times to find a random port
# Random port between 10000 and 65535
port = random.randint(10000, 65535)
if port not in used_ports:
available_port = port
break
attempts += 1
if available_port is None:
# If no random port found, find first available in sequence
for port in range(10000, 65536):
if port not in used_ports:
available_port = port
break
return jsonify({'port': available_port})
@bp.route('/apps/<int:app_id>/ports', methods=['GET'])
@login_required
def get_app_ports(app_id):
"""Get all ports for an app"""
app = App.query.get_or_404(app_id)
ports = []
for port in app.ports:
ports.append({
'id': port.id,
'port_number': port.port_number,
'protocol': port.protocol,
'description': port.description
})
return jsonify({'ports': ports})
return jsonify({"port": available_port})
@bp.route('/apps/<int:app_id>/ports', methods=['POST'])
@bp.route("/app/<int:app_id>/add-port", methods=["POST"])
@login_required
def add_app_port(app_id):
"""Add a new port to an app"""
"""Add a port to an application"""
app = App.query.get_or_404(app_id)
data = request.json
if not data or 'port_number' not in data:
return jsonify({'error': 'Missing port number'}), 400
port_number = data.get('port_number')
protocol = data.get('protocol', 'TCP')
description = data.get('description', '')
# Check if port already exists for this app
existing_port = Port.query.filter_by(app_id=app_id, port_number=port_number).first()
if existing_port:
return jsonify({'error': 'Port already exists for this app'}), 400
new_port = Port(
app_id=app_id,
port_number=port_number,
protocol=protocol,
description=description
)
db.session.add(new_port)
db.session.commit()
return jsonify({
'id': new_port.id,
'port_number': new_port.port_number,
'protocol': new_port.protocol,
'description': new_port.description
})
@bp.route('/ports/<int:port_id>', methods=['DELETE'])
# Accept both JSON and form data
if request.is_json:
data = request.json
else:
data = request.form
port_number = data.get("port")
protocol = data.get("protocol", "TCP")
description = data.get("description", "")
if not port_number:
return jsonify({"success": False, "error": "Port number is required"}), 400
try:
port_number = int(port_number)
# Check if port already exists for this app
existing_port = Port.query.filter_by(app_id=app_id, number=port_number).first()
if existing_port:
return (
jsonify(
{
"success": False,
"error": "Port already exists for this application",
}
),
400,
)
# Create new port
port = Port(
number=port_number,
protocol=protocol,
description=description,
app_id=app_id,
)
db.session.add(port)
db.session.commit()
return jsonify(
{
"success": True,
"message": f"Port {port_number} added to {app.name}",
"port": {
"id": port.id,
"number": port.number,
"protocol": port.protocol,
"description": port.description,
},
}
)
except ValueError:
return jsonify({"success": False, "error": "Invalid port number"}), 400
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": str(e)}), 500
@bp.route("/app/<int:app_id>/ports", methods=["GET"])
@login_required
def get_app_ports(app_id):
"""Get all ports for an application"""
app = App.query.get_or_404(app_id)
ports = Port.query.filter_by(app_id=app_id).all()
result = {
"app_id": app_id,
"ports": [
{
"id": port.id,
"number": port.number,
"protocol": port.protocol,
"description": port.description,
}
for port in ports
],
}
return jsonify(result)
@bp.route("/port/<int:port_id>/delete", methods=["POST"])
@login_required
def delete_port(port_id):
"""Delete a port"""
port = Port.query.get_or_404(port_id)
db.session.delete(port)
db.session.commit()
return jsonify({'success': True})
# Add CSRF validation
if request.is_json: # For AJAX requests
csrf_token = request.json.get("csrf_token")
if not csrf_token or not csrf.validate_csrf(csrf_token):
return jsonify({"success": False, "error": "CSRF validation failed"}), 403
@bp.route('/subnets/<int:subnet_id>/servers', methods=['GET'])
port = Port.query.get_or_404(port_id)
try:
db.session.delete(port)
db.session.commit()
return jsonify({"success": True, "message": f"Port {port.number} deleted"})
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": str(e)}), 500
@bp.route("/subnets/<int:subnet_id>/servers", methods=["GET"])
def get_subnet_servers(subnet_id):
"""Get all servers for a specific subnet"""
servers = Server.query.filter_by(subnet_id=subnet_id).all()
return jsonify([{
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address
} for server in servers])
return jsonify(
[
{
"id": server.id,
"hostname": server.hostname,
"ip_address": server.ip_address,
}
for server in servers
]
)
@bp.route('/server/<int:server_id>/ports', methods=['GET'])
@bp.route("/server/<int:server_id>/ports", methods=["GET"])
@login_required
def get_server_ports(server_id):
"""Get all used ports for a server"""
server = Server.query.get_or_404(server_id)
# Get all ports associated with this server
ports = Port.query.filter_by(server_id=server_id).all()
used_ports = [port.number for port in ports]
return jsonify({
'server_id': server_id,
'used_ports': used_ports
})
@bp.route('/server/<int:server_id>/free-port', methods=['GET'])
return jsonify({"server_id": server_id, "used_ports": used_ports})
@bp.route("/server/<int:server_id>/free-port", methods=["GET"])
@login_required
def get_free_port(server_id):
"""Find a free port for a server"""
server = Server.query.get_or_404(server_id)
# Get all ports associated with this server
used_ports = [port.number for port in Port.query.filter_by(server_id=server_id).all()]
used_ports = [
port.number for port in Port.query.filter_by(server_id=server_id).all()
]
# Find the first free port (starting from 8000)
for port_number in range(8000, 9000):
if port_number not in used_ports:
return jsonify({
'success': True,
'port': port_number
})
return jsonify({
'success': False,
'error': 'No free ports available in the range 8000-9000'
})
return jsonify({"success": True, "port": port_number})
return jsonify(
{"success": False, "error": "No free ports available in the range 8000-9000"}
)

View file

@ -3,75 +3,87 @@ from flask_login import login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from app.core.extensions import db
from app.core.auth import User
import re
from flask_wtf.csrf import CSRFProtect
bp = Blueprint('auth', __name__, url_prefix='/auth')
bp = Blueprint("auth", __name__, url_prefix="/auth")
csrf = CSRFProtect()
@bp.route('/login', methods=['GET', 'POST'])
@bp.route("/login", methods=["GET", "POST"])
def login():
"""User login"""
if current_user.is_authenticated:
return redirect(url_for('dashboard.dashboard_home'))
if request.method == 'POST':
email = request.form.get('email')
password = request.form.get('password')
remember = 'remember' in request.form
user = User.query.filter_by(email=email).first()
if not user or not user.check_password(password):
flash('Invalid email or password', 'danger')
return render_template('auth/login.html', title='Login')
login_user(user, remember=remember)
next_page = request.args.get('next')
if not next_page or not next_page.startswith('/'):
next_page = url_for('dashboard.dashboard_home')
return redirect(next_page)
return render_template('auth/login.html', title='Login')
return redirect(url_for("dashboard.dashboard_home"))
@bp.route('/register', methods=['GET', 'POST'])
if request.method == "POST":
email = request.form.get("email")
password = request.form.get("password")
remember = "remember" in request.form
user = User.query.filter_by(email=email).first()
if not user or not user.check_password(password):
flash("Invalid email or password", "danger")
return render_template("auth/login.html", title="Login")
login_user(user, remember=remember)
next_page = request.args.get("next")
if not next_page or not next_page.startswith("/"):
next_page = url_for("dashboard.dashboard_home")
return redirect(next_page)
return render_template("auth/login.html", title="Login")
@bp.route("/register", methods=["GET", "POST"])
@csrf.exempt # Remove for production! Temporary allow registration without CSRF
def register():
"""User registration"""
if current_user.is_authenticated:
return redirect(url_for('dashboard.dashboard_home'))
if request.method == 'POST':
email = request.form.get('email')
password = request.form.get('password')
# Validation
if not email or not password:
flash('Email and password are required', 'danger')
return render_template('auth/register.html', title='Register')
if User.query.filter_by(email=email).first():
flash('Email already registered', 'danger')
return render_template('auth/register.html', title='Register')
# Create new user
user = User(email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('Registration successful! You are now logged in.', 'success')
# Auto-login after registration
login_user(user)
return redirect(url_for('dashboard.dashboard_home'))
return render_template('auth/register.html', title='Register')
return redirect(url_for("dashboard.dashboard_home"))
@bp.route('/logout')
if request.method == "POST":
email = request.form.get("email")
username = request.form.get("username")
password = request.form.get("password")
password_confirm = request.form.get("password_confirm")
# Validate form data
error = None
if not email or not username or not password:
error = "All fields are required."
elif not re.match(r"[^@]+@[^@]+\.[^@]+", email):
error = "Please enter a valid email address."
elif password != password_confirm:
error = "Passwords do not match."
elif User.query.filter_by(email=email).first():
error = "Email address already registered."
elif User.query.filter_by(username=username).first():
error = "Username already taken."
if error:
flash(error, "danger")
else:
# Create new user
new_user = User(email=email, username=username)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
flash("Registration successful! You can now log in.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/register.html", title="Register")
@bp.route("/logout")
@login_required
def logout():
"""User logout"""
logout_user()
flash('You have been logged out', 'info')
return redirect(url_for('auth.login'))
flash("You have been logged out", "info")
return redirect(url_for("auth.login"))

View file

@ -4,383 +4,384 @@ import markdown
from app.core.models import Server, App, Subnet, Port
from app.core.extensions import db, limiter
from datetime import datetime
from app.utils.app_utils import validate_app_data
bp = Blueprint('dashboard', __name__, url_prefix='/dashboard')
bp = Blueprint("dashboard", __name__, url_prefix="/dashboard")
@bp.route('/')
@bp.route("/")
@login_required
def dashboard_home():
"""Main dashboard view showing server statistics"""
server_count = Server.query.count()
app_count = App.query.count()
subnet_count = Subnet.query.count()
# Get latest added servers
latest_servers = Server.query.order_by(Server.created_at.desc()).limit(5).all()
# Get subnets with usage stats
subnets = Subnet.query.all()
for subnet in subnets:
subnet.usage_percent = subnet.used_ips / 254 * 100 if subnet.cidr.endswith('/24') else 0
subnet.usage_percent = (
subnet.used_ips / 254 * 100 if subnet.cidr.endswith("/24") else 0
)
return render_template(
'dashboard/index.html',
title='Dashboard',
"dashboard/index.html",
title="Dashboard",
server_count=server_count,
app_count=app_count,
subnet_count=subnet_count,
latest_servers=latest_servers,
subnets=subnets,
now=datetime.now()
now=datetime.now(),
)
@bp.route('/servers')
@bp.route("/servers")
@login_required
def server_list():
"""List all servers"""
servers = Server.query.order_by(Server.hostname).all()
return render_template(
'dashboard/server_list.html',
title='Servers',
"dashboard/server_list.html",
title="Servers",
servers=servers,
now=datetime.now()
now=datetime.now(),
)
@bp.route('/server/<int:server_id>')
@bp.route("/server/<int:server_id>")
@login_required
def server_view(server_id):
"""View server details"""
server = Server.query.get_or_404(server_id)
apps = App.query.filter_by(server_id=server_id).all()
return render_template(
'dashboard/server_view.html',
title=f'Server - {server.hostname}',
"dashboard/server_view.html",
title=f"Server - {server.hostname}",
server=server,
apps=apps,
now=datetime.now()
now=datetime.now(),
)
@bp.route('/server/new', methods=['GET', 'POST'])
@bp.route("/server/new", methods=["GET", "POST"])
@login_required
def server_new():
"""Create a new server"""
subnets = Subnet.query.all()
if request.method == 'POST':
hostname = request.form.get('hostname')
ip_address = request.form.get('ip_address')
subnet_id = request.form.get('subnet_id')
documentation = request.form.get('documentation', '')
if request.method == "POST":
hostname = request.form.get("hostname")
ip_address = request.form.get("ip_address")
subnet_id = request.form.get("subnet_id")
documentation = request.form.get("documentation", "")
# Basic validation
if not hostname or not ip_address or not subnet_id:
flash('Please fill in all required fields', 'danger')
flash("Please fill in all required fields", "danger")
return render_template(
'dashboard/server_form.html',
title='New Server',
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now()
now=datetime.now(),
)
# Check if hostname or IP already exists
if Server.query.filter_by(hostname=hostname).first():
flash('Hostname already exists', 'danger')
flash("Hostname already exists", "danger")
return render_template(
'dashboard/server_form.html',
title='New Server',
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now()
now=datetime.now(),
)
if Server.query.filter_by(ip_address=ip_address).first():
flash('IP address already exists', 'danger')
flash("IP address already exists", "danger")
return render_template(
'dashboard/server_form.html',
title='New Server',
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now()
now=datetime.now(),
)
# Create new server
server = Server(
hostname=hostname,
ip_address=ip_address,
subnet_id=subnet_id,
documentation=documentation
documentation=documentation,
)
db.session.add(server)
db.session.commit()
flash('Server created successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server.id))
flash("Server created successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=server.id))
return render_template(
'dashboard/server_form.html',
title='New Server',
"dashboard/server_form.html",
title="New Server",
subnets=subnets,
now=datetime.now()
now=datetime.now(),
)
@bp.route('/server/<int:server_id>/edit', methods=['GET', 'POST'])
@bp.route("/server/<int:server_id>/edit", methods=["GET", "POST"])
@login_required
def server_edit(server_id):
"""Edit an existing server"""
server = Server.query.get_or_404(server_id)
subnets = Subnet.query.all()
if request.method == 'POST':
hostname = request.form.get('hostname')
ip_address = request.form.get('ip_address')
subnet_id = request.form.get('subnet_id')
documentation = request.form.get('documentation', '')
if request.method == "POST":
hostname = request.form.get("hostname")
ip_address = request.form.get("ip_address")
subnet_id = request.form.get("subnet_id")
documentation = request.form.get("documentation", "")
if not hostname or not ip_address or not subnet_id:
flash('All fields are required', 'danger')
flash("All fields are required", "danger")
return render_template(
'dashboard/server_form.html',
title='Edit Server',
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets
subnets=subnets,
)
# Check if hostname changed and already exists
if hostname != server.hostname and Server.query.filter_by(hostname=hostname).first():
flash('Hostname already exists', 'danger')
if (
hostname != server.hostname
and Server.query.filter_by(hostname=hostname).first()
):
flash("Hostname already exists", "danger")
return render_template(
'dashboard/server_form.html',
title='Edit Server',
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets
subnets=subnets,
)
# Check if IP changed and already exists
if ip_address != server.ip_address and Server.query.filter_by(ip_address=ip_address).first():
flash('IP address already exists', 'danger')
if (
ip_address != server.ip_address
and Server.query.filter_by(ip_address=ip_address).first()
):
flash("IP address already exists", "danger")
return render_template(
'dashboard/server_form.html',
title='Edit Server',
"dashboard/server_form.html",
title="Edit Server",
server=server,
subnets=subnets
subnets=subnets,
)
# Update server
server.hostname = hostname
server.ip_address = ip_address
server.subnet_id = subnet_id
server.documentation = documentation
db.session.commit()
flash('Server updated successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server.id))
flash("Server updated successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=server.id))
# GET request - show form with current values
return render_template(
'dashboard/server_form.html',
title=f'Edit Server - {server.hostname}',
"dashboard/server_form.html",
title=f"Edit Server - {server.hostname}",
server=server,
subnets=subnets
subnets=subnets,
)
@bp.route('/server/<int:server_id>/delete', methods=['POST'])
@bp.route("/server/<int:server_id>/delete", methods=["POST"])
@login_required
def server_delete(server_id):
"""Delete a server"""
server = Server.query.get_or_404(server_id)
# Delete all apps associated with this server
App.query.filter_by(server_id=server_id).delete()
# Delete the server
db.session.delete(server)
db.session.commit()
flash('Server deleted successfully', 'success')
return redirect(url_for('dashboard.dashboard_home'))
@bp.route('/app/new', methods=['GET', 'POST'])
flash("Server deleted successfully", "success")
return redirect(url_for("dashboard.dashboard_home"))
@bp.route("/app/new", methods=["GET", "POST"])
@login_required
def app_new():
"""Create a new application"""
"""Create a new application with comprehensive error handling"""
# Get all servers for dropdown
servers = Server.query.all()
if request.method == 'POST':
name = request.form.get('name')
server_id = request.form.get('server_id')
documentation = request.form.get('documentation', '')
# Get port data from form
port_numbers = request.form.getlist('port_numbers[]')
protocols = request.form.getlist('protocols[]')
port_descriptions = request.form.getlist('port_descriptions[]')
# Basic validation
if not name or not server_id:
flash('Please fill in all required fields', 'danger')
return render_template(
'dashboard/app_form.html',
title='New Application',
servers=servers
)
# Create new app
app = App(
name=name,
server_id=server_id,
documentation=documentation
)
db.session.add(app)
db.session.flush() # Get the app ID without committing
# Add ports if provided
if not servers:
flash("You need to create a server before adding applications", "warning")
return redirect(url_for("dashboard.server_new"))
if request.method == "POST":
# Get form data
name = request.form.get("name", "").strip()
server_id = request.form.get("server_id")
documentation = request.form.get("documentation", "")
# Process port data from form
port_data = []
port_numbers = request.form.getlist("port_numbers[]")
protocols = request.form.getlist("protocols[]")
descriptions = request.form.getlist("port_descriptions[]")
for i in range(len(port_numbers)):
if port_numbers[i] and port_numbers[i].strip():
try:
port_num = int(port_numbers[i])
# Get protocol and description, handling index errors
protocol = protocols[i] if i < len(protocols) else 'TCP'
description = port_descriptions[i] if i < len(port_descriptions) else ''
new_port = Port(
app_id=app.id,
port_number=port_num,
protocol=protocol,
description=description
)
db.session.add(new_port)
except (ValueError, IndexError):
continue
db.session.commit()
flash('Application created successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server_id))
protocol = protocols[i] if i < len(protocols) else "TCP"
description = descriptions[i] if i < len(descriptions) else ""
port_data.append((port_numbers[i], protocol, description))
# Save application
from app.utils.app_utils import save_app
success, app, error = save_app(name, server_id, documentation, port_data)
if success:
flash("Application created successfully", "success")
return redirect(url_for("dashboard.app_view", app_id=app.id))
else:
flash(error, "danger")
# For GET requests or failed POSTs
return render_template(
'dashboard/app_form.html',
title='New Application',
servers=servers
"dashboard/app_form.html",
title="Create New Application",
edit_mode=False,
dashboard_link=url_for("dashboard.dashboard_home"),
servers=servers,
)
@bp.route('/app/<int:app_id>', methods=['GET'])
@bp.route("/app/<int:app_id>", methods=["GET"])
@login_required
def app_view(app_id):
"""View a specific application"""
app = App.query.get_or_404(app_id)
server = Server.query.get(app.server_id)
return render_template(
'dashboard/app_view.html',
title=f'Application - {app.name}',
"dashboard/app_view.html",
title=f"Application - {app.name}",
app=app,
server=server,
now=datetime.now()
now=datetime.now(),
)
@bp.route('/app/<int:app_id>/edit', methods=['GET', 'POST'])
@bp.route("/app/<int:app_id>/edit", methods=["GET", "POST"])
@login_required
def app_edit(app_id):
"""Edit an existing application"""
"""Edit an existing application with comprehensive error handling"""
# Get the application and all servers
app = App.query.get_or_404(app_id)
servers = Server.query.all()
if request.method == 'POST':
name = request.form.get('name')
server_id = request.form.get('server_id')
documentation = request.form.get('documentation', '')
if not name or not server_id:
flash('All required fields must be filled', 'danger')
return render_template(
'dashboard/app_form.html',
title='Edit Application',
app=app,
servers=servers
if request.method == "POST":
# Get form data
name = request.form.get("name", "").strip()
server_id = request.form.get("server_id")
documentation = request.form.get("documentation", "")
# Process port data from form
port_data = []
port_numbers = request.form.getlist("port_numbers[]")
protocols = request.form.getlist("protocols[]")
descriptions = request.form.getlist("port_descriptions[]")
for i in range(len(port_numbers)):
if port_numbers[i] and port_numbers[i].strip():
protocol = protocols[i] if i < len(protocols) else "TCP"
description = descriptions[i] if i < len(descriptions) else ""
port_data.append((port_numbers[i], protocol, description))
# Replace local validation with shared function
valid, error = validate_app_data(name, server_id, existing_app_id=app_id)
if valid:
# Update application
from app.utils.app_utils import save_app
success, updated_app, error = save_app(
name, server_id, documentation, port_data, app_id
)
# Check if name changed and already exists on the same server
existing_app = App.query.filter(App.name == name,
App.server_id == server_id,
App.id != app.id).first()
if existing_app:
flash('Application with this name already exists on the selected server', 'danger')
return render_template(
'dashboard/app_form.html',
title='Edit Application',
app=app,
servers=servers
)
# Update application
app.name = name
app.server_id = server_id
app.documentation = documentation
try:
db.session.commit()
flash('Application updated successfully', 'success')
return redirect(url_for('dashboard.app_view', app_id=app.id))
except Exception as e:
db.session.rollback()
flash(f'Error updating application: {str(e)}', 'danger')
if success:
flash("Application updated successfully", "success")
return redirect(url_for("dashboard.app_view", app_id=app_id))
else:
flash(error, "danger")
else:
flash(error, "danger")
# For GET requests or failed POSTs
return render_template(
'dashboard/app_form.html',
title=f'Edit Application - {app.name}',
"dashboard/app_form.html",
title=f"Edit Application: {app.name}",
edit_mode=True,
app=app,
servers=servers
dashboard_link=url_for("dashboard.dashboard_home"),
servers=servers,
)
@bp.route('/app/<int:app_id>/delete', methods=['POST'])
@bp.route("/app/<int:app_id>/delete", methods=["POST"])
@login_required
def app_delete(app_id):
"""Delete an application"""
app = App.query.get_or_404(app_id)
server_id = app.server_id
db.session.delete(app)
db.session.commit()
flash('Application deleted successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server_id))
@bp.route('/settings', methods=['GET', 'POST'])
flash("Application deleted successfully", "success")
return redirect(url_for("dashboard.server_view", server_id=server_id))
@bp.route("/settings", methods=["GET", "POST"])
@login_required
def settings():
"""User settings page"""
if request.method == 'POST':
if request.method == "POST":
# Handle user settings update
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
current_password = request.form.get("current_password")
new_password = request.form.get("new_password")
confirm_password = request.form.get("confirm_password")
# Validate inputs
if not current_password:
flash('Current password is required', 'danger')
return redirect(url_for('dashboard.settings'))
flash("Current password is required", "danger")
return redirect(url_for("dashboard.settings"))
if new_password != confirm_password:
flash('New passwords do not match', 'danger')
return redirect(url_for('dashboard.settings'))
flash("New passwords do not match", "danger")
return redirect(url_for("dashboard.settings"))
# Verify current password
if not current_user.check_password(current_password):
flash('Current password is incorrect', 'danger')
return redirect(url_for('dashboard.settings'))
flash("Current password is incorrect", "danger")
return redirect(url_for("dashboard.settings"))
# Update password
current_user.set_password(new_password)
db.session.commit()
flash('Password updated successfully', 'success')
return redirect(url_for('dashboard.settings'))
return render_template(
'dashboard/settings.html',
title='User Settings'
)
flash("Password updated successfully", "success")
return redirect(url_for("dashboard.settings"))
return render_template("dashboard/settings.html", title="User Settings")

View file

@ -5,100 +5,101 @@ import csv
import io
import datetime
bp = Blueprint('importexport', __name__, url_prefix='/import-export')
bp = Blueprint("importexport", __name__, url_prefix="/import-export")
MODEL_MAP = {
'subnet': Subnet,
'server': Server,
'app': App
}
MODEL_MAP = {"subnet": Subnet, "server": Server, "app": App}
@bp.route('/export/<model_name>', methods=['GET'])
@bp.route("/export/<model_name>", methods=["GET"])
def export_model(model_name):
if model_name not in MODEL_MAP:
return jsonify({'error': 'Invalid model name'}), 400
return jsonify({"error": "Invalid model name"}), 400
model = MODEL_MAP[model_name]
instances = model.query.all()
# Create a CSV file in memory
output = io.StringIO()
writer = csv.writer(output)
# Get column names from model
columns = [column.name for column in model.__table__.columns]
writer.writerow(columns)
# Write data
for instance in instances:
row = [getattr(instance, column) for column in columns]
writer.writerow(row)
# Create response
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{model_name}_{timestamp}.csv"
response = make_response(output.getvalue())
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
response.headers['Content-type'] = 'text/csv'
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-type"] = "text/csv"
return response
@bp.route('/import/<model_name>', methods=['GET', 'POST'])
@bp.route("/import/<model_name>", methods=["GET", "POST"])
def import_model(model_name):
if model_name not in MODEL_MAP:
return jsonify({'error': 'Invalid model name'}), 400
return jsonify({"error": "Invalid model name"}), 400
model = MODEL_MAP[model_name]
if request.method == 'GET':
if request.method == "GET":
# Show import form
return render_template('import_form.html', model_name=model_name)
return render_template("import_form.html", model_name=model_name)
# Process CSV upload
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not file.filename.endswith('.csv'):
return jsonify({'error': 'File must be CSV format'}), 400
if "file" not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "No selected file"}), 400
if not file.filename.endswith(".csv"):
return jsonify({"error": "File must be CSV format"}), 400
try:
# Read CSV
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_reader = csv.reader(stream)
# Get headers
headers = next(csv_reader)
# Validate required columns
required_columns = [col.name for col in model.__table__.columns
if not col.nullable and col.name != 'id']
required_columns = [
col.name
for col in model.__table__.columns
if not col.nullable and col.name != "id"
]
for col in required_columns:
if col not in headers:
return jsonify({'error': f'Required column {col} missing'}), 400
return jsonify({"error": f"Required column {col} missing"}), 400
# Process rows
imported = 0
for row in csv_reader:
data = dict(zip(headers, row))
# Remove id to create new record
if 'id' in data:
del data['id']
if "id" in data:
del data["id"]
# Create new instance
instance = model(**data)
db.session.add(instance)
imported += 1
db.session.commit()
return jsonify({'success': f'Imported {imported} records successfully'})
return jsonify({"success": f"Imported {imported} records successfully"})
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
return jsonify({"error": str(e)}), 500

View file

@ -1,260 +1,267 @@
from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify
from flask_login import login_required
from app.core.models import Subnet, Server
from app.core.models import Subnet, Server, App
from app.core.extensions import db
from app.scripts.ip_scanner import scan
import ipaddress
from datetime import datetime
import json
bp = Blueprint('ipam', __name__, url_prefix='/ipam')
bp = Blueprint("ipam", __name__, url_prefix="/ipam")
@bp.route('/')
@bp.route("/")
@login_required
def ipam_home():
"""Main IPAM dashboard"""
subnets = Subnet.query.all()
# Calculate usage for each subnet
for subnet in subnets:
network = ipaddress.ip_network(subnet.cidr, strict=False)
max_hosts = network.num_addresses - 2 if network.prefixlen < 31 else network.num_addresses
max_hosts = (
network.num_addresses - 2
if network.prefixlen < 31
else network.num_addresses
)
used_count = Server.query.filter_by(subnet_id=subnet.id).count()
subnet.usage_percent = (used_count / max_hosts) * 100 if max_hosts > 0 else 0
return render_template(
'ipam/index.html',
title='IPAM Dashboard',
subnets=subnets,
now=datetime.now()
"ipam/index.html", title="IPAM Dashboard", subnets=subnets, now=datetime.now()
)
@bp.route('/subnet/new', methods=['GET', 'POST'])
@bp.route("/subnet/new", methods=["GET", "POST"])
@login_required
def subnet_new():
"""Create a new subnet"""
if request.method == 'POST':
cidr = request.form.get('cidr')
location = request.form.get('location')
auto_scan = request.form.get('auto_scan') == 'on'
if request.method == "POST":
cidr = request.form.get("cidr")
location = request.form.get("location")
auto_scan = request.form.get("auto_scan") == "on"
# Basic validation
if not cidr or not location:
flash('Please fill in all required fields', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
flash("Please fill in all required fields", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet")
# Validate CIDR format
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
flash('Invalid CIDR format', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
flash("Invalid CIDR format", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet")
# Check if CIDR already exists
if Subnet.query.filter_by(cidr=cidr).first():
flash('Subnet already exists', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
flash("Subnet already exists", "danger")
return render_template("ipam/subnet_form.html", title="New Subnet")
# Create new subnet with JSON string for active_hosts, not a Python list
subnet = Subnet(
cidr=cidr,
location=location,
active_hosts=json.dumps([]), # Convert empty list to JSON string
last_scanned=None,
auto_scan=auto_scan
auto_scan=auto_scan,
)
db.session.add(subnet)
db.session.commit()
flash('Subnet created successfully', 'success')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id))
return render_template(
'ipam/subnet_form.html',
title='New Subnet'
)
@bp.route('/subnet/<int:subnet_id>')
flash("Subnet created successfully", "success")
return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id))
return render_template("ipam/subnet_form.html", title="New Subnet")
@bp.route("/subnet/<int:subnet_id>")
@login_required
def subnet_view(subnet_id):
"""View a specific subnet"""
"""View a subnet and all its hosts"""
subnet = Subnet.query.get_or_404(subnet_id)
# Get all servers in this subnet
# Get servers in this subnet
servers = Server.query.filter_by(subnet_id=subnet_id).all()
# Parse CIDR for display
# Get applications in this subnet
subnet_apps = []
for server in servers:
apps = App.query.filter_by(server_id=server.id).all()
subnet_apps.extend(apps)
# Calculate usage statistics
network = ipaddress.ip_network(subnet.cidr, strict=False)
subnet_info = {
'network_address': str(network.network_address),
'broadcast_address': str(network.broadcast_address),
'netmask': str(network.netmask),
'num_addresses': network.num_addresses,
'host_range': f"{str(network.network_address + 1)} - {str(network.broadcast_address - 1)}" if network.prefixlen < 31 else subnet.cidr
}
total_ips = network.num_addresses - 2 # Subtract network and broadcast addresses
used_ips = Server.query.filter_by(subnet_id=subnet_id).count()
return render_template(
'ipam/subnet_view.html',
title=subnet.cidr,
"ipam/subnet_view.html",
title=f"Subnet {subnet.cidr}",
subnet=subnet,
subnet_info=subnet_info,
servers=servers,
now=datetime.now()
subnet_apps=subnet_apps,
total_ips=total_ips,
used_ips=used_ips,
)
@bp.route('/subnet/<int:subnet_id>/edit', methods=['GET', 'POST'])
@bp.route("/subnet/<int:subnet_id>/edit", methods=["GET", "POST"])
@login_required
def subnet_edit(subnet_id):
"""Edit a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
if request.method == 'POST':
cidr = request.form.get('cidr')
location = request.form.get('location')
auto_scan = request.form.get('auto_scan') == 'on'
if request.method == "POST":
cidr = request.form.get("cidr")
location = request.form.get("location")
auto_scan = request.form.get("auto_scan") == "on"
# Validate inputs
if not all([cidr, location]):
flash('All fields are required', 'danger')
return render_template('ipam/subnet_form.html',
title='Edit Subnet',
subnet=subnet,
edit_mode=True)
flash("All fields are required", "danger")
return render_template(
"ipam/subnet_form.html",
title="Edit Subnet",
subnet=subnet,
edit_mode=True,
)
# Validate CIDR format
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
flash('Invalid CIDR format', 'danger')
return render_template('ipam/subnet_form.html',
title='Edit Subnet',
subnet=subnet,
edit_mode=True)
flash("Invalid CIDR format", "danger")
return render_template(
"ipam/subnet_form.html",
title="Edit Subnet",
subnet=subnet,
edit_mode=True,
)
# Update subnet
subnet.cidr = cidr
subnet.location = location
subnet.auto_scan = auto_scan
try:
db.session.commit()
flash(f'Subnet {cidr} has been updated', 'success')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id))
flash(f"Subnet {cidr} has been updated", "success")
return redirect(url_for("ipam.subnet_view", subnet_id=subnet.id))
except Exception as e:
db.session.rollback()
flash(f'Error updating subnet: {str(e)}', 'danger')
return render_template('ipam/subnet_form.html',
title='Edit Subnet',
subnet=subnet,
edit_mode=True)
flash(f"Error updating subnet: {str(e)}", "danger")
@bp.route('/subnet/<int:subnet_id>/delete', methods=['POST'])
return render_template(
"ipam/subnet_form.html", title="Edit Subnet", subnet=subnet, edit_mode=True
)
@bp.route("/subnet/<int:subnet_id>/delete", methods=["POST"])
@login_required
def subnet_delete(subnet_id):
"""Delete a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
# Check if subnet has servers
servers_count = Server.query.filter_by(subnet_id=subnet_id).count()
if servers_count > 0:
flash(f'Cannot delete subnet {subnet.cidr}. It has {servers_count} servers assigned.', 'danger')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id))
flash(
f"Cannot delete subnet {subnet.cidr}. It has {servers_count} servers assigned.",
"danger",
)
return redirect(url_for("ipam.subnet_view", subnet_id=subnet_id))
db.session.delete(subnet)
db.session.commit()
flash(f'Subnet {subnet.cidr} has been deleted', 'success')
return redirect(url_for('ipam.ipam_home'))
@bp.route('/subnet/<int:subnet_id>/scan', methods=['POST'])
flash(f"Subnet {subnet.cidr} has been deleted", "success")
return redirect(url_for("ipam.ipam_home"))
@bp.route("/subnet/<int:subnet_id>/scan", methods=["POST"])
@login_required
def subnet_scan(subnet_id):
"""Manually scan a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
try:
# Call the scan function with manual_trigger=True
scan(subnet, manual_trigger=True)
db.session.commit()
flash(f'Scan completed for subnet {subnet.cidr}', 'success')
flash(f"Scan completed for subnet {subnet.cidr}", "success")
except Exception as e:
db.session.rollback()
flash(f'Error scanning subnet: {str(e)}', 'danger')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id))
flash(f"Error scanning subnet: {str(e)}", "danger")
@bp.route('/subnet/<int:subnet_id>/force-delete', methods=['POST'])
return redirect(url_for("ipam.subnet_view", subnet_id=subnet_id))
@bp.route("/subnet/<int:subnet_id>/force-delete", methods=["POST"])
@login_required
def subnet_force_delete(subnet_id):
"""Force delete a subnet and all its related servers and applications"""
subnet = Subnet.query.get_or_404(subnet_id)
try:
# Get all servers to be deleted for reporting
servers = Server.query.filter_by(subnet_id=subnet_id).all()
server_count = len(servers)
# This will cascade delete all related servers and their applications
db.session.delete(subnet)
db.session.commit()
flash(f'Subnet {subnet.cidr} and {server_count} related servers were deleted successfully', 'success')
return redirect(url_for('dashboard.ipam_home'))
flash(
f"Subnet {subnet.cidr} and {server_count} related servers were deleted successfully",
"success",
)
return redirect(url_for("dashboard.ipam_home"))
except Exception as e:
db.session.rollback()
flash(f'Error deleting subnet: {str(e)}', 'danger')
return redirect(url_for('dashboard.subnet_view', subnet_id=subnet_id))
flash(f"Error deleting subnet: {str(e)}", "danger")
return redirect(url_for("dashboard.subnet_view", subnet_id=subnet_id))
@bp.route('/subnet/create-ajax', methods=['POST'])
@bp.route("/subnet/create-ajax", methods=["POST"])
@login_required
def subnet_create_ajax():
"""Create a subnet via AJAX"""
data = request.json
if not data:
return jsonify({'success': False, 'error': 'No data provided'})
cidr = data.get('cidr')
location = data.get('location')
auto_scan = data.get('auto_scan', False)
return jsonify({"success": False, "error": "No data provided"})
cidr = data.get("cidr")
location = data.get("location")
auto_scan = data.get("auto_scan", False)
if not cidr or not location:
return jsonify({'success': False, 'error': 'CIDR and location are required'})
return jsonify({"success": False, "error": "CIDR and location are required"})
# Validate CIDR
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError as e:
return jsonify({'success': False, 'error': f'Invalid CIDR: {str(e)}'})
return jsonify({"success": False, "error": f"Invalid CIDR: {str(e)}"})
# Create subnet
subnet = Subnet(
cidr=cidr,
location=location,
auto_scan=auto_scan,
active_hosts=json.dumps([])
cidr=cidr, location=location, auto_scan=auto_scan, active_hosts=json.dumps([])
)
try:
db.session.add(subnet)
db.session.commit()
return jsonify({
'success': True,
'subnet_id': subnet.id,
'cidr': subnet.cidr,
'location': subnet.location
})
return jsonify(
{
"success": True,
"subnet_id": subnet.id,
"cidr": subnet.cidr,
"location": subnet.location,
}
)
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)})
return jsonify({"success": False, "error": str(e)})

96
app/routes/static.py Normal file
View file

@ -0,0 +1,96 @@
from flask import Blueprint, send_from_directory, current_app
import os
bp = Blueprint("static_assets", __name__)
@bp.route("/static/libs/tabler-icons/tabler-icons.min.css")
def tabler_icons():
"""Serve tabler-icons CSS from node_modules or download if missing"""
icons_path = os.path.join(current_app.static_folder, "libs", "tabler-icons")
# Create directory if it doesn't exist
if not os.path.exists(icons_path):
os.makedirs(icons_path)
css_file = os.path.join(icons_path, "tabler-icons.min.css")
# If file doesn't exist, download from CDN
if not os.path.exists(css_file):
import requests
try:
cdn_url = "https://cdn.jsdelivr.net/npm/@tabler/icons@latest/iconfont/tabler-icons.min.css"
response = requests.get(cdn_url)
if response.status_code == 200:
with open(css_file, "wb") as f:
f.write(response.content)
print(f"Downloaded tabler-icons.min.css from CDN")
else:
print(f"Failed to download tabler-icons CSS: {response.status_code}")
except Exception as e:
print(f"Error downloading tabler-icons CSS: {e}")
return send_from_directory(icons_path, "tabler-icons.min.css")
@bp.route("/static/css/tabler.min.css")
def tabler_css():
"""Serve tabler CSS from static folder or download if missing"""
css_path = os.path.join(current_app.static_folder, "css")
# Create directory if it doesn't exist
if not os.path.exists(css_path):
os.makedirs(css_path)
css_file = os.path.join(css_path, "tabler.min.css")
# If file doesn't exist, download from CDN
if not os.path.exists(css_file):
import requests
try:
cdn_url = "https://cdn.jsdelivr.net/npm/@tabler/core@latest/dist/css/tabler.min.css"
response = requests.get(cdn_url)
if response.status_code == 200:
with open(css_file, "wb") as f:
f.write(response.content)
print(f"Downloaded tabler.min.css from CDN")
else:
print(f"Failed to download tabler CSS: {response.status_code}")
except Exception as e:
print(f"Error downloading tabler CSS: {e}")
return send_from_directory(css_path, "tabler.min.css")
@bp.route("/static/img/favicon.png")
def favicon():
"""Serve favicon from static folder or create a default one if missing"""
img_path = os.path.join(current_app.static_folder, "img")
# Create directory if it doesn't exist
if not os.path.exists(img_path):
os.makedirs(img_path)
favicon_file = os.path.join(img_path, "favicon.png")
# If file doesn't exist, create a simple one
if not os.path.exists(favicon_file):
# Try to download a default favicon
import requests
try:
# Using a simple placeholder favicon
cdn_url = "https://www.google.com/favicon.ico"
response = requests.get(cdn_url)
if response.status_code == 200:
with open(favicon_file, "wb") as f:
f.write(response.content)
print(f"Created default favicon.png")
else:
print(f"Failed to download favicon: {response.status_code}")
except Exception as e:
print(f"Error creating favicon: {e}")
return send_from_directory(img_path, "favicon.png")

View file

@ -3,51 +3,52 @@ from app.core.models import Subnet, Server, App, Port
from app.core.auth import User # Import User from auth module
import json
def seed_database():
"""Add sample data to the database"""
# Create a default subnet if none exists
if Subnet.query.count() == 0:
subnet = Subnet(
cidr='192.168.1.0/24',
location='Office',
cidr="192.168.1.0/24",
location="Office",
auto_scan=True,
active_hosts=json.dumps([])
active_hosts=json.dumps([]),
)
db.session.add(subnet)
# Create a sample server
server = Server(
hostname='server1',
ip_address='192.168.1.10',
hostname="server1",
ip_address="192.168.1.10",
subnet=subnet,
documentation='# Server 1\n\nThis is a sample server.'
documentation="# Server 1\n\nThis is a sample server.",
)
db.session.add(server)
# Create a sample app
app = App(
name='Web App',
server=server,
documentation='# Welcome to Web App\n\nThis is a sample application.'
name="Web App",
server=server,
documentation="# Welcome to Web App\n\nThis is a sample application.",
)
db.session.add(app)
# Add some ports
ports = [
Port(app=app, port_number=80, protocol='TCP', description='HTTP'),
Port(app=app, port_number=443, protocol='TCP', description='HTTPS')
Port(app=app, port_number=80, protocol="TCP", description="HTTP"),
Port(app=app, port_number=443, protocol="TCP", description="HTTPS"),
]
db.session.add_all(ports)
# Create a default user if none exists
if User.query.count() == 0:
admin = User(username='admin', email='admin@example.com', is_admin=True)
admin.set_password('admin')
admin = User(username="admin", email="admin@example.com", is_admin=True)
admin.set_password("admin")
db.session.add(admin)
try:
db.session.commit()
print("Database seeded successfully")
except Exception as e:
db.session.rollback()
print(f"Error seeding database: {e}")
print(f"Error seeding database: {e}")

View file

@ -11,10 +11,11 @@ import concurrent.futures
from datetime import datetime
import platform
def scan(subnet, manual_trigger=False):
"""
Scan a subnet for active hosts
Args:
subnet: The subnet object to scan
manual_trigger: If False, only scan if the subnet hasn't been scanned recently
@ -22,50 +23,56 @@ def scan(subnet, manual_trigger=False):
# Skip if not auto scan and not manually triggered
if not subnet.auto_scan and not manual_trigger:
return False
active_hosts = []
try:
# Parse the CIDR notation
network = ipaddress.ip_network(subnet.cidr, strict=False)
# For each address in this network, ping it
for ip in network.hosts():
if ping(str(ip)):
active_hosts.append(str(ip))
# Update subnet with scan results
subnet.active_hosts = json.dumps(active_hosts)
subnet.last_scanned = datetime.utcnow()
return True
except Exception as e:
print(f"Error scanning subnet {subnet.cidr}: {str(e)}")
return False
def scan_worker(ip_list, results, index):
"""Worker function for threading"""
for ip in ip_list:
if ping(ip):
hostname = get_hostname(ip)
results[index].append({
'ip': str(ip),
'hostname': hostname if hostname else str(ip),
'status': 'up'
})
results[index].append(
{
"ip": str(ip),
"hostname": hostname if hostname else str(ip),
"status": "up",
}
)
def ping(host):
"""
Returns True if host responds to a ping request
"""
# Ping parameters based on OS
param = '-n' if platform.system().lower() == 'windows' else '-c'
param = "-n" if platform.system().lower() == "windows" else "-c"
# Build the command
command = ['ping', param, '1', '-w', '1', host]
command = ["ping", param, "1", "-w", "1", host]
try:
# Run the command and capture output
output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=2)
output = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=2
)
# Return True if ping was successful
return output.returncode == 0
except subprocess.TimeoutExpired:
@ -73,6 +80,7 @@ def ping(host):
except Exception:
return False
def get_hostname(ip):
"""Try to get the hostname for an IP address"""
try:
@ -81,43 +89,53 @@ def get_hostname(ip):
except (socket.herror, socket.gaierror):
return None
def is_host_active_ping(ip):
"""Simple ICMP ping test (platform dependent)"""
import platform
import subprocess
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', '-w', '1', ip]
param = "-n" if platform.system().lower() == "windows" else "-c"
command = ["ping", param, "1", "-w", "1", ip]
try:
return subprocess.call(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) == 0
return (
subprocess.call(
command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
== 0
)
except:
return False
def save_scan_results(cidr, results):
"""Save scan results to the database"""
from flask import current_app
# Need to be in application context
if not hasattr(current_app, 'app_context'):
if not hasattr(current_app, "app_context"):
print("Not in Flask application context, cannot save results")
return
try:
# Find subnet by CIDR
subnet = Subnet.query.filter_by(cidr=cidr).first()
if not subnet:
print(f"Subnet {cidr} not found in database")
return
# Get existing servers in this subnet
existing_servers = {server.ip_address: server for server in Server.query.filter_by(subnet_id=subnet.id).all()}
existing_servers = {
server.ip_address: server
for server in Server.query.filter_by(subnet_id=subnet.id).all()
}
# Process scan results
for host in results:
ip = host['ip']
hostname = host['hostname']
ip = host["ip"]
hostname = host["hostname"]
# Check if server already exists
if ip in existing_servers:
# Update hostname if it was previously unknown
@ -130,37 +148,34 @@ def save_scan_results(cidr, results):
hostname=hostname,
ip_address=ip,
subnet_id=subnet.id,
documentation=f"# {hostname}\n\nAutomatically discovered by network scan on {time.strftime('%Y-%m-%d %H:%M:%S')}"
documentation=f"# {hostname}\n\nAutomatically discovered by network scan on {time.strftime('%Y-%m-%d %H:%M:%S')}",
)
db.session.add(server)
db.session.commit()
print(f"Saved scan results for {cidr}")
except Exception as e:
db.session.rollback()
print(f"Error saving scan results: {e}")
def schedule_subnet_scans():
"""Schedule automatic scans for subnets marked as auto_scan"""
from flask import current_app
with current_app.app_context():
try:
# Find all subnets with auto_scan enabled
subnets = Subnet.query.filter_by(auto_scan=True).all()
for subnet in subnets:
# Start a thread for each subnet
thread = threading.Thread(
target=scan,
args=(subnet,),
daemon=True
)
thread = threading.Thread(target=scan, args=(subnet,), daemon=True)
thread.start()
# Sleep briefly to avoid overloading
time.sleep(1)
except Exception as e:
print(f"Error scheduling subnet scans: {e}")
print(f"Error scheduling subnet scans: {e}")

9
app/static/css/tabler.min.css vendored Normal file

File diff suppressed because one or more lines are too long

165
app/static/css/theme.css Normal file
View file

@ -0,0 +1,165 @@
/* Custom theme with unique color palette */
:root {
/* Base Colors - Teal and Purple Theme */
--primary-color: #3a86ff;
--secondary-color: #8338ec;
--accent-color: #ff006e;
--success-color: #06d6a0;
--info-color: #0dcaf0;
--warning-color: #ffbe0b;
--danger-color: #ef476f;
/* Light Mode */
--bg-color: #f8fafc;
--card-bg: #ffffff;
--text-color: #1a1d23;
--text-muted: #64748b;
--border-color: rgba(226, 232, 240, 0.8);
--hover-bg: rgba(226, 232, 240, 0.4);
}
[data-bs-theme="dark"] {
/* Dark Mode */
--bg-color: #0f172a;
--card-bg: #1e293b;
--text-color: #e2e8f0;
--text-muted: #94a3b8;
--border-color: rgba(51, 65, 85, 0.7);
--hover-bg: rgba(51, 65, 85, 0.4);
}
/* Override Bootstrap classes */
.btn-primary {
background-color: var(--primary-color);
border-color: var(--primary-color);
}
.btn-primary:hover {
background-color: #2563eb;
border-color: #2563eb;
}
.btn-secondary {
background-color: var(--secondary-color);
border-color: var(--secondary-color);
}
.btn-outline-primary {
color: var(--primary-color);
border-color: var(--primary-color);
}
.text-primary {
color: var(--primary-color) !important;
}
.bg-primary {
background-color: var(--primary-color) !important;
}
/* Card styling */
.card {
border-radius: 10px;
overflow: hidden;
transition: box-shadow 0.2s ease;
border: 1px solid var(--border-color);
background-color: var(--card-bg);
}
.card:hover {
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.08);
}
.card-header {
background-color: transparent;
border-bottom: 1px solid var(--border-color);
padding: 1rem 1.25rem;
}
.card-title {
margin-bottom: 0;
color: var(--text-color);
}
/* Table styling */
.table {
--bs-table-hover-bg: var(--hover-bg);
}
/* Custom scrollbar */
::-webkit-scrollbar {
width: 8px;
height: 8px;
}
::-webkit-scrollbar-track {
background: transparent;
}
::-webkit-scrollbar-thumb {
background: rgba(100, 116, 139, 0.5);
border-radius: 4px;
}
::-webkit-scrollbar-thumb:hover {
background: rgba(100, 116, 139, 0.7);
}
/* Improved form controls */
.form-control,
.form-select {
border-radius: 8px;
border: 1px solid var(--border-color);
padding: 0.5rem 0.75rem;
background-color: var(--card-bg);
color: var(--text-color);
}
.form-control:focus,
.form-select:focus {
border-color: var(--primary-color);
box-shadow: 0 0 0 0.2rem rgba(58, 134, 255, 0.25);
}
/* Sidebar enhancements */
.sidebar {
background: linear-gradient(to bottom, #1a1d23, #0f172a);
box-shadow: 0 0 20px rgba(0, 0, 0, 0.1);
}
[data-bs-theme="light"] .sidebar {
background: linear-gradient(to bottom, #f1f5f9, #e2e8f0);
}
.sidebar-item:hover {
background-color: rgba(255, 255, 255, 0.1);
}
[data-bs-theme="light"] .sidebar-item:hover {
background-color: rgba(0, 0, 0, 0.05);
}
/* Animation for page transitions */
.page-transition {
animation: fadeIn 0.3s ease-in-out;
}
@keyframes fadeIn {
from {
opacity: 0;
transform: translateY(10px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
/* Pill badges */
.badge {
border-radius: 50rem;
padding: 0.35em 0.65em;
font-weight: 500;
font-size: 0.75em;
}

BIN
app/static/img/favicon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.3 KiB

View file

@ -1,3 +1,7 @@
/**
* DITTO Application JavaScript
* Modern ES6+ syntax with proper error handling
*/
document.addEventListener('DOMContentLoaded', () => {
console.log('App script loaded.');
@ -76,8 +80,121 @@ document.addEventListener('DOMContentLoaded', () => {
// Initialize notifications
initNotifications();
});
// Initialize Bootstrap components
initBootstrapComponents();
// Setup sidebar toggle functionality
setupSidebar();
// Add form validation
setupFormValidation();
});
/**
* Initialize Bootstrap components
*/
function initBootstrapComponents() {
// Initialize all tooltips
const tooltipTriggerList = document.querySelectorAll('[data-bs-toggle="tooltip"]');
tooltipTriggerList.forEach(el => {
try {
new bootstrap.Tooltip(el);
} catch (e) {
console.warn('Error initializing tooltip:', e);
}
});
// Initialize all popovers
const popoverTriggerList = document.querySelectorAll('[data-bs-toggle="popover"]');
popoverTriggerList.forEach(el => {
try {
new bootstrap.Popover(el);
} catch (e) {
console.warn('Error initializing popover:', e);
}
});
}
/**
* Setup sidebar toggle functionality
*/
function setupSidebar() {
const sidebarToggler = document.querySelector('.sidebar-toggler');
const sidebar = document.querySelector('.sidebar');
if (sidebarToggler && sidebar) {
sidebarToggler.addEventListener('click', () => {
sidebar.classList.toggle('show');
});
// Close sidebar when clicking outside on mobile
document.addEventListener('click', (event) => {
const isClickInside = sidebar.contains(event.target) ||
sidebarToggler.contains(event.target);
if (!isClickInside && sidebar.classList.contains('show') && window.innerWidth < 992) {
sidebar.classList.remove('show');
}
});
}
}
/**
* Setup form validation
*/
function setupFormValidation() {
// Add custom validation for forms
const forms = document.querySelectorAll('.needs-validation');
forms.forEach(form => {
form.addEventListener('submit', event => {
if (!form.checkValidity()) {
event.preventDefault();
event.stopPropagation();
}
form.classList.add('was-validated');
}, false);
});
}
/**
* Safe query selector with error handling
* @param {string} selector - CSS selector
* @param {Element} parent - Parent element (optional)
* @returns {Element|null} - The selected element or null
*/
function $(selector, parent = document) {
try {
return parent.querySelector(selector);
} catch (e) {
console.warn(`Error selecting "${selector}":`, e);
return null;
}
}
/**
* Format date for display
* @param {string|Date} date - Date to format
* @returns {string} - Formatted date string
*/
function formatDate(date) {
try {
const d = new Date(date);
return d.toLocaleDateString(undefined, {
year: 'numeric',
month: 'short',
day: 'numeric',
hour: '2-digit',
minute: '2-digit'
});
} catch (e) {
console.warn('Error formatting date:', e);
return String(date);
}
}
function initTiptapEditor(element) {
// Load required Tiptap scripts
const editorContainer = document.getElementById('editor-container');

File diff suppressed because one or more lines are too long

View file

@ -1,11 +1,14 @@
{% extends "layout.html" %}
{% block content %}
<div class="container-narrow py-4">
<div class="card card-md">
<div class="card-body">
<h2 class="card-title text-center mb-4">Create New Account</h2>
<div class="container-tight py-4">
<div class="text-center mb-4">
<h1>Create an account</h1>
</div>
<form class="card card-md" method="POST" action="{{ url_for('auth.register') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="card-body">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
@ -17,33 +20,37 @@
{% endif %}
{% endwith %}
<form method="POST" action="{{ url_for('auth.register') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="mb-3">
<label class="form-label">Email address</label>
<input type="email" class="form-control" name="email" placeholder="your@email.com" required
autocomplete="username">
</div>
<div class="mb-3">
<label class="form-label">Password</label>
<input type="password" class="form-control" name="password" placeholder="Password" required
autocomplete="new-password">
</div>
<div class="form-footer">
<button type="submit" class="btn btn-primary w-100">Create Account</button>
</div>
</form>
</div>
<div class="hr-text">or</div>
<div class="card-body">
<div class="row">
<div class="col">
<a href="{{ url_for('auth.login') }}" class="btn w-100">
Login with existing account
</a>
<div class="mb-3">
<label class="form-label">Email address</label>
<input type="email" name="email" class="form-control" placeholder="your@email.com" required>
</div>
<div class="mb-3">
<label class="form-label">Username</label>
<input type="text" name="username" class="form-control" placeholder="Username" required>
</div>
<div class="mb-3">
<label class="form-label">Password</label>
<input type="password" name="password" class="form-control" placeholder="Password" required>
</div>
<div class="mb-3">
<label class="form-label">Confirm Password</label>
<input type="password" name="password_confirm" class="form-control" placeholder="Confirm password" required>
<div class="form-text text-muted">
Make sure to use a strong, unique password
</div>
</div>
<div class="form-footer">
<button type="submit" class="btn btn-primary w-100">Create account</button>
</div>
</div>
</form>
<div class="text-center text-muted mt-3">
Already have an account? <a href="{{ url_for('auth.login') }}" tabindex="-1">Sign in</a>
</div>
</div>
{% endblock %}

View file

@ -6,8 +6,25 @@
<div class="row align-items-center">
<div class="col">
<h2 class="page-title">
{% if app %}Edit Application{% else %}Add New Application{% endif %}
{{ title }}
</h2>
<div class="text-muted mt-1">
{% if edit_mode %}Edit{% else %}Create{% endif %} application details and configure ports
</div>
</div>
<div class="col-auto ms-auto">
<div class="btn-list">
<a href="{{ dashboard_link }}" class="btn btn-outline-primary">
<span class="ti ti-dashboard"></span>
Dashboard
</a>
{% if edit_mode %}
<a href="{{ url_for('dashboard.app_view', app_id=app.id) }}" class="btn btn-outline-secondary">
<span class="ti ti-eye"></span>
View Application
</a>
{% endif %}
</div>
</div>
</div>
</div>
@ -26,86 +43,148 @@
{% endwith %}
<form method="POST"
action="{% if app %}{{ url_for('dashboard.app_edit', app_id=app.id) }}{% else %}{{ url_for('dashboard.app_new') }}{% endif %}">
action="{{ url_for('dashboard.app_edit', app_id=app.id) if edit_mode else url_for('dashboard.app_new') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="mb-3">
<label class="form-label required">Application Name</label>
<input type="text" class="form-control" name="name" required value="{% if app %}{{ app.name }}{% endif %}">
<input type="text" class="form-control" name="name" value="{{ app.name if app else '' }}" required
placeholder="Enter application name">
<small class="form-hint">Choose a unique name for this application</small>
</div>
<div class="mb-3">
<label class="form-label required">Server</label>
<select class="form-select" name="server_id" required>
<option value="">Select a server</option>
{% for server in servers %}
<option value="{{ server.id }}" {% if app and app.server_id==server.id %}selected {% elif server_id and
server.id|string==server_id|string %}selected{% endif %}>
<option value="{{ server.id }}" {% if app and server.id==app.server_id %}selected{% endif %}>
{{ server.hostname }} ({{ server.ip_address }})
</option>
{% endfor %}
</select>
<small class="form-hint">Select the server where this application runs</small>
</div>
<div class="mb-3">
<label class="form-label">Documentation</label>
<textarea class="form-control" name="documentation"
rows="10">{% if app %}{{ app.documentation }}{% endif %}</textarea>
<div class="form-text">Markdown is supported</div>
<ul class="nav nav-tabs mb-2" role="tablist">
<li class="nav-item" role="presentation">
<a href="#markdown-edit" class="nav-link active" data-bs-toggle="tab" role="tab">Edit</a>
</li>
<li class="nav-item" role="presentation">
<a href="#markdown-preview" class="nav-link" data-bs-toggle="tab" role="tab" id="preview-tab">Preview</a>
</li>
</ul>
<div class="tab-content">
<div class="tab-pane active" id="markdown-edit" role="tabpanel">
<textarea class="form-control" name="documentation" id="documentation" rows="6"
placeholder="Document your application using Markdown...">{{ app.documentation if app else '' }}</textarea>
<small class="form-hint">
Markdown formatting is supported. Include details about what this application does, contact info, etc.
</small>
</div>
<div class="tab-pane" id="markdown-preview" role="tabpanel">
<div class="markdown-content border rounded p-3" style="min-height: 12rem;">
<div id="preview-content">Preview will be shown here...</div>
</div>
</div>
</div>
</div>
<div class="hr-text">Port Configuration</div>
<div class="mb-3">
<div class="d-flex justify-content-between align-items-center mb-2">
<label class="form-label mb-0">Application Ports</label>
<div class="btn-group">
<button type="button" class="btn btn-sm btn-outline-primary" id="add-port-btn">
<span class="ti ti-plus"></span> Add Port
</button>
<button type="button" class="btn btn-sm btn-outline-secondary" id="random-port-btn"
title="Generate random available port">
<span class="ti ti-dice"></span> Random Port
</button>
</div>
</div>
<div class="table-responsive">
<table class="table table-vcenter card-table" id="ports-table">
<thead>
<tr>
<th style="width: 20%">Port Number</th>
<th style="width: 20%">Protocol</th>
<th style="width: 50%">Description</th>
<th style="width: 10%">Actions</th>
</tr>
</thead>
<tbody>
{% if app and app.ports %}
{% for port in app.ports %}
<tr data-port-id="{{ port.id }}">
<td>
<input type="number" name="port_numbers[]" class="form-control" min="1" max="65535"
value="{{ port.port_number }}" required>
</td>
<td>
<select name="protocols[]" class="form-select">
<option value="TCP" {% if port.protocol=='TCP' %}selected{% endif %}>TCP</option>
<option value="UDP" {% if port.protocol=='UDP' %}selected{% endif %}>UDP</option>
<option value="SCTP" {% if port.protocol=='SCTP' %}selected{% endif %}>SCTP</option>
<option value="OTHER" {% if port.protocol=='OTHER' %}selected{% endif %}>OTHER</option>
</select>
</td>
<td>
<input type="text" name="port_descriptions[]" class="form-control" value="{{ port.description }}"
placeholder="Description">
</td>
<td>
<button type="button" class="btn btn-sm btn-ghost-danger" onclick="removePortRow(this)">
<span class="ti ti-trash"></span>
</button>
</td>
</tr>
{% endfor %}
{% endif %}
<!-- New rows will be added here dynamically -->
</tbody>
</table>
</div>
<small class="form-hint">Configure the network ports used by this application</small>
</div>
<div class="form-footer">
<button type="submit" class="btn btn-primary">Save</button>
{% if app %}
<button type="submit" class="btn btn-primary">Save Application</button>
{% if edit_mode %}
<a href="{{ url_for('dashboard.app_view', app_id=app.id) }}" class="btn btn-outline-secondary ms-2">Cancel</a>
{% elif server_id %}
<a href="{{ url_for('dashboard.server_view', server_id=server_id) }}"
class="btn btn-outline-secondary ms-2">Cancel</a>
{% else %}
<a href="{{ url_for('dashboard.dashboard_home') }}" class="btn btn-outline-secondary ms-2">Cancel</a>
<a href="{{ dashboard_link }}" class="btn btn-outline-secondary ms-2">Cancel</a>
{% endif %}
</div>
</form>
</div>
</div>
</div>
{% endblock %}
{% block extra_js %}
<script>
let portRowIndex = 1;
{% if app and app.id %}
const appId = {{ app.id | tojson }};
{% else %}
const appId = null;
{% endif %}
function addPortRow() {
const tbody = document.querySelector('#ports-table tbody');
const tr = document.createElement('tr');
tr.classList.add('port-row');
tr.innerHTML = `
<td>
<input type="number" name="port_number_${portRowIndex}" class="form-control"
min="1" max="65535" placeholder="Port number">
</td>
<td>
<select name="protocol_${portRowIndex}" class="form-select">
<option value="TCP">TCP</option>
<option value="UDP">UDP</option>
<option value="SCTP">SCTP</option>
<option value="OTHER">OTHER</option>
</select>
</td>
<td>
<input type="text" name="description_${portRowIndex}" class="form-control"
placeholder="Description">
</td>
<td>
<button type="button" class="btn btn-sm btn-ghost-danger" onclick="removePortRow(this)">
<span class="ti ti-trash"></span>
</button>
</td>
`;
tbody.appendChild(tr);
portRowIndex++;
}
// Setup markdown preview
setupMarkdownPreview();
function removePortRow(button) {
const row = button.closest('tr');
row.remove();
}
// Setup port management
setupPortHandlers();
</script>
{% endblock %}
<script>
document.body.addEventListener('htmx:configRequest', (event) => {
event.detail.headers['X-CSRFToken'] = "{{ csrf_token() }}";
});
</script>
{% endblock %}

View file

@ -155,6 +155,69 @@
</div>
</div>
</div>
<!-- Applications in this Subnet Section -->
<div class="row mt-3">
<div class="col-12">
<div class="card glass-card">
<div class="card-header">
<h3 class="card-title">Applications in this Subnet</h3>
</div>
<div class="card-body">
{% if subnet_apps %}
<div class="app-grid">
{% for app in subnet_apps %}
<div class="app-card">
<div class="app-card-header">
<div class="app-card-title">
<h4>{{ app.name }}</h4>
<span class="text-muted small">on {{ app.server.hostname }} ({{ app.server.ip_address }})</span>
</div>
<div class="app-card-actions">
<a href="{{ url_for('dashboard.app_view', app_id=app.id) }}" class="btn btn-sm btn-outline-primary">
View
</a>
</div>
</div>
<div class="app-card-body markdown-content">
{% if app.documentation %}
{{ app.documentation|markdown|truncate(300, true) }}
<a href="{{ url_for('dashboard.app_view', app_id=app.id) }}" class="text-primary">Read more</a>
{% else %}
<p class="text-muted">No documentation available for this application.</p>
{% endif %}
</div>
</div>
{% endfor %}
</div>
{% else %}
<div class="empty">
<div class="empty-icon">
<span class="ti ti-app-window"></span>
</div>
<p class="empty-title">No applications found</p>
<p class="empty-subtitle text-muted">
No applications are running on servers in this subnet.
</p>
</div>
{% endif %}
</div>
</div>
</div>
</div>
<!-- Update the usage progress bar section -->
<div class="mb-3">
<div class="form-label">Usage</div>
<div class="progress mb-2">
<div class="progress-bar" style="width: {{ (used_ips / total_ips * 100) if total_ips > 0 else 0 }}%"
role="progressbar"></div>
</div>
<div class="d-flex justify-content-between">
<span>{{ used_ips }} used</span>
<span>{{ total_ips }} total</span>
</div>
</div>
</div>
<!-- Delete Confirmation Modal -->
@ -185,4 +248,65 @@
</div>
</div>
</div>
<style>
.glass-card {
background-color: rgba(255, 255, 255, 0.03);
backdrop-filter: blur(10px);
border: 1px solid rgba(255, 255, 255, 0.1);
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
}
[data-bs-theme="light"] .glass-card {
background-color: rgba(255, 255, 255, 0.7);
border: 1px solid rgba(0, 0, 0, 0.08);
}
.app-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(350px, 1fr));
gap: 1.5rem;
}
.app-card {
border-radius: 10px;
overflow: hidden;
transition: transform 0.2s, box-shadow 0.2s;
background-color: rgba(255, 255, 255, 0.05);
border: 1px solid rgba(255, 255, 255, 0.1);
}
[data-bs-theme="light"] .app-card {
background-color: rgba(255, 255, 255, 0.9);
border: 1px solid rgba(0, 0, 0, 0.08);
}
.app-card:hover {
transform: translateY(-3px);
box-shadow: 0 10px 20px rgba(0, 0, 0, 0.1);
}
.app-card-header {
padding: 1rem;
display: flex;
justify-content: space-between;
align-items: flex-start;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
}
[data-bs-theme="light"] .app-card-header {
border-bottom: 1px solid rgba(0, 0, 0, 0.08);
}
.app-card-title h4 {
margin: 0 0 0.25rem 0;
font-size: 1.25rem;
}
.app-card-body {
padding: 1rem;
max-height: 200px;
overflow: hidden;
}
</style>
{% endblock %}

View file

@ -15,10 +15,13 @@
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap" rel="stylesheet">
<!-- Custom CSS -->
<link rel="stylesheet" href="{{ url_for('static', filename='css/app.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='css/tabler.min.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='libs/tabler-icons/tabler-icons.min.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='css/tabler.min.css') }}"
onerror="this.onerror=null;this.href='https://cdn.jsdelivr.net/npm/@tabler/core@latest/dist/css/tabler.min.css';">
<link rel="stylesheet" href="{{ url_for('static', filename='libs/tabler-icons/tabler-icons.min.css') }}"
onerror="this.onerror=null;this.href='https://cdn.jsdelivr.net/npm/@tabler/icons@latest/iconfont/tabler-icons.min.css';">
<link rel="stylesheet" href="{{ url_for('static', filename='css/custom.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='css/markdown.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='css/theme.css') }}">
<!-- Favicon -->
<link rel="icon" type="image/png" href="{{ url_for('static', filename='img/favicon.png') }}">
{% block styles %}{% endblock %}
@ -267,11 +270,13 @@
<li><a class="dropdown-item" href="{{ url_for('auth.logout') }}">Logout</a></li>
</ul>
</div>
<div class="nav-item ms-2">
<button id="theme-toggle" class="btn btn-icon" aria-label="Toggle theme">
<span class="ti ti-moon dark-icon d-none"></span>
<span class="ti ti-sun light-icon"></span>
</button>
<div class="navbar-nav flex-row order-md-last">
<div class="nav-item me-2">
<button id="theme-toggle" class="btn btn-icon" aria-label="Toggle theme">
<span class="ti ti-moon dark-icon d-none"></span>
<span class="ti ti-sun light-icon"></span>
</button>
</div>
</div>
</div>
</div>
@ -529,6 +534,73 @@
});
});
</script>
<script>
document.addEventListener('DOMContentLoaded', function () {
// Add transition class to main content
const mainContent = document.querySelector('.page-body');
if (mainContent) {
mainContent.classList.add('page-transition');
}
// Theme toggle enhancement
const themeToggle = document.getElementById('theme-toggle');
if (themeToggle) {
themeToggle.addEventListener('click', function () {
const currentTheme = document.documentElement.getAttribute('data-bs-theme');
const newTheme = currentTheme === 'dark' ? 'light' : 'dark';
document.documentElement.setAttribute('data-bs-theme', newTheme);
localStorage.setItem('theme', newTheme);
// Update toggle icon
const darkIcon = document.querySelector('.dark-icon');
const lightIcon = document.querySelector('.light-icon');
if (darkIcon && lightIcon) {
if (newTheme === 'dark') {
darkIcon.classList.remove('d-none');
lightIcon.classList.add('d-none');
} else {
darkIcon.classList.add('d-none');
lightIcon.classList.remove('d-none');
}
}
// Show theme change notification
showNotification('success', `${newTheme.charAt(0).toUpperCase() + newTheme.slice(1)} mode activated`);
});
}
});
// Notification function
function showNotification(type, message) {
const notificationArea = document.getElementById('notification-area');
if (!notificationArea) return;
const notification = document.createElement('div');
notification.className = `alert alert-${type} alert-dismissible fade show`;
notification.style.animation = 'fadeIn 0.3s ease-out';
notification.innerHTML = `
${message}
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
`;
notificationArea.appendChild(notification);
// Auto dismiss after 3 seconds
setTimeout(() => {
notification.style.animation = 'fadeOut 0.3s ease-in';
setTimeout(() => {
notification.remove();
}, 300);
}, 3000);
}
@keyframes fadeOut {
from { opacity: 1; }
to { opacity: 0; }
}
</script>
{% block scripts %}{% endblock %}
</body>

Binary file not shown.

129
app/utils/app_utils.py Normal file
View file

@ -0,0 +1,129 @@
from app.core.models import App, Port, Server
from app.core.extensions import db
from flask import flash
import re
def validate_app_data(name, server_id, existing_app_id=None):
"""
Validate application data
Returns tuple (valid, error_message)
"""
if not name or not server_id:
return False, "Please fill in all required fields"
# Check if app exists with same name on same server
query = App.query.filter(App.name == name, App.server_id == server_id)
# If editing, exclude the current app
if existing_app_id:
query = query.filter(App.id != existing_app_id)
if query.first():
return False, f"Application '{name}' already exists on this server"
return True, None
def validate_port_data(port_number, protocol, description=None):
"""
Validate port data
Returns tuple (valid, clean_port_number, error_message)
"""
# Clean and validate port number
try:
port = int(port_number)
if port < 1 or port > 65535:
return False, None, "Port must be between 1 and 65535"
except (ValueError, TypeError):
return False, None, "Invalid port number"
# Validate protocol
valid_protocols = ["TCP", "UDP", "SCTP", "OTHER"]
if protocol not in valid_protocols:
return False, None, f"Protocol must be one of: {', '.join(valid_protocols)}"
return True, port, None
def process_app_ports(app_id, port_data):
"""
Process port data for an application
port_data should be a list of tuples (port_number, protocol, description)
Returns (success, error_message)
"""
try:
for port_number, protocol, description in port_data:
valid, clean_port, error = validate_port_data(
port_number, protocol, description
)
if not valid:
continue # Skip invalid ports
# Check if port already exists
existing_port = Port.query.filter_by(
app_id=app_id, port_number=clean_port
).first()
if existing_port:
# Update existing port
existing_port.protocol = protocol
existing_port.description = description
else:
# Create new port
new_port = Port(
app_id=app_id,
port_number=clean_port,
protocol=protocol,
description=description,
)
db.session.add(new_port)
return True, None
except Exception as e:
db.session.rollback()
return False, str(e)
def save_app(name, server_id, documentation, port_data=None, existing_app_id=None):
"""
Save or update an application and its ports
Returns tuple (success, app_object_or_none, error_message)
"""
try:
# Validate input data
valid, error = validate_app_data(name, server_id, existing_app_id)
if not valid:
return False, None, error
# Create or update app
if existing_app_id:
# Update existing app
app = App.query.get(existing_app_id)
if not app:
return False, None, f"Application with ID {existing_app_id} not found"
app.name = name
app.server_id = server_id
app.documentation = documentation
else:
# Create new app
app = App(name=name, server_id=server_id, documentation=documentation)
db.session.add(app)
# Flush to get the app ID if new
db.session.flush()
# Process ports if provided
if port_data:
success, error = process_app_ports(app.id, port_data)
if not success:
db.session.rollback()
return False, None, f"Error processing ports: {error}"
# Commit all changes
db.session.commit()
return True, app, None
except Exception as e:
db.session.rollback()
return False, None, str(e)

View file

@ -1,28 +1,36 @@
import os
class Config:
"""Base config."""
SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-key-placeholder')
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-key-placeholder")
SQLALCHEMY_TRACK_MODIFICATIONS = False
WTF_CSRF_ENABLED = True
SESSION_COOKIE_SECURE = False # Set to True in production with HTTPS
class DevelopmentConfig(Config):
"""Development config."""
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', 'sqlite:///app.db')
SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL", "sqlite:///app.db")
SQLALCHEMY_ECHO = True
class ProductionConfig(Config):
"""Production config."""
DEBUG = False
TESTING = False
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', 'sqlite:///app.db')
SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL", "sqlite:///app.db")
SESSION_COOKIE_SECURE = True
REMEMBER_COOKIE_SECURE = True
class TestingConfig(Config):
"""Testing config."""
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
WTF_CSRF_ENABLED = False
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
WTF_CSRF_ENABLED = False

Binary file not shown.

Binary file not shown.

145
run.py
View file

@ -15,92 +15,100 @@ import json
current_dir = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, current_dir)
def create_basic_app():
"""Create a Flask app without database dependencies"""
app = Flask(__name__,
template_folder=os.path.join(current_dir, 'app', 'templates'),
static_folder=os.path.join(current_dir, 'app', 'static'))
app = Flask(
__name__,
template_folder=os.path.join(current_dir, "app", "templates"),
static_folder=os.path.join(current_dir, "app", "static"),
)
# Basic configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-key-placeholder')
app.config['DEBUG'] = True
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "dev-key-placeholder")
app.config["DEBUG"] = True
# Register basic routes
register_routes(app)
# Add a fallback index route if no routes match
@app.route('/')
@app.route("/")
def index():
return "Your Network Management Flask Application is running! Navigate to /dashboard to see content."
return app
def register_routes(app):
"""Register blueprints without database dependencies"""
routes_dir = os.path.join(current_dir, 'app', 'routes')
routes_dir = os.path.join(current_dir, "app", "routes")
# Check if routes directory exists
if not os.path.isdir(routes_dir):
print(f"Warning: Routes directory {routes_dir} not found")
return
# Try to register API blueprint which is simplest
try:
from app.routes.api import bp as api_bp
app.register_blueprint(api_bp)
print("Registered API blueprint")
except Exception as e:
print(f"Could not register API blueprint: {e}")
# Try to register other blueprints with basic error handling
try:
from app.routes.dashboard import bp as dashboard_bp
app.register_blueprint(dashboard_bp)
print("Registered dashboard blueprint")
except ImportError as e:
print(f"Could not import dashboard blueprint: {e}")
try:
from app.routes.ipam import bp as ipam_bp
app.register_blueprint(ipam_bp)
print("Registered IPAM blueprint")
except ImportError as e:
print(f"Could not import IPAM blueprint: {e}")
# Create a development application instance
print("Starting Flask app with SQLite database...")
app = create_app('development')
app = create_app("development")
@app.shell_context_processor
def make_shell_context():
return {
'db': db,
'User': User,
'Server': Server,
'Subnet': Subnet,
'App': App,
'Port': Port
"db": db,
"User": User,
"Server": Server,
"Subnet": Subnet,
"App": App,
"Port": Port,
}
def init_db():
"""Initialize database tables"""
with app.app_context():
db.create_all()
def create_admin_user():
"""Create an admin user if no users exist"""
with app.app_context():
if User.query.count() == 0:
admin = User(
username='admin',
email='admin@example.com',
is_admin=True
)
admin.set_password('admin')
admin = User(username="admin", email="admin@example.com", is_admin=True)
admin.set_password("admin")
db.session.add(admin)
db.session.commit()
print("Created admin user: admin@example.com (password: admin)")
# Update seed_data to use consistent structures
def seed_data():
"""Add some sample data to the database"""
@ -108,42 +116,75 @@ def seed_data():
# Only seed if the database is empty
if Subnet.query.count() == 0:
# Create sample subnets
subnet1 = Subnet(cidr='192.168.1.0/24', location='Office', active_hosts=json.dumps([]))
subnet2 = Subnet(cidr='10.0.0.0/24', location='Datacenter', active_hosts=json.dumps([]))
subnet1 = Subnet(
cidr="192.168.1.0/24", location="Office", active_hosts=json.dumps([])
)
subnet2 = Subnet(
cidr="10.0.0.0/24", location="Datacenter", active_hosts=json.dumps([])
)
db.session.add_all([subnet1, subnet2])
db.session.commit()
# Create sample servers
server1 = Server(hostname='web-server', ip_address='192.168.1.10', subnet=subnet1)
server2 = Server(hostname='db-server', ip_address='192.168.1.11', subnet=subnet1)
server3 = Server(hostname='app-server', ip_address='10.0.0.5', subnet=subnet2)
server1 = Server(
hostname="web-server", ip_address="192.168.1.10", subnet=subnet1
)
server2 = Server(
hostname="db-server", ip_address="192.168.1.11", subnet=subnet1
)
server3 = Server(
hostname="app-server", ip_address="10.0.0.5", subnet=subnet2
)
db.session.add_all([server1, server2, server3])
db.session.commit()
# Create sample apps
app1 = App(name='Website', server=server1, documentation='# Company Website\nRunning on Nginx/PHP')
app2 = App(name='PostgreSQL', server=server2, documentation='# Database Server\nPostgreSQL 15')
app3 = App(name='API Service', server=server3, documentation='# REST API\nNode.js service')
app1 = App(
name="Website",
server=server1,
documentation="# Company Website\nRunning on Nginx/PHP",
)
app2 = App(
name="PostgreSQL",
server=server2,
documentation="# Database Server\nPostgreSQL 15",
)
app3 = App(
name="API Service",
server=server3,
documentation="# REST API\nNode.js service",
)
db.session.add_all([app1, app2, app3])
db.session.commit()
# Create sample ports
port1 = Port(app=app1, port_number=80, protocol='TCP', description='HTTP')
port2 = Port(app=app1, port_number=443, protocol='TCP', description='HTTPS')
port3 = Port(app=app2, port_number=5432, protocol='TCP', description='PostgreSQL')
port4 = Port(app=app3, port_number=3000, protocol='TCP', description='Node.js API')
port1 = Port(app=app1, port_number=80, protocol="TCP", description="HTTP")
port2 = Port(app=app1, port_number=443, protocol="TCP", description="HTTPS")
port3 = Port(
app=app2, port_number=5432, protocol="TCP", description="PostgreSQL"
)
port4 = Port(
app=app3, port_number=3000, protocol="TCP", description="Node.js API"
)
db.session.add_all([port1, port2, port3, port4])
db.session.commit()
print("Sample data has been added to the database")
if __name__ == '__main__':
if __name__ == "__main__":
# Create the app first
app = create_app()
# Initialize database if needed
if not os.path.exists('app.db') and 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']:
if (
not os.path.exists("app.db")
and "sqlite" in app.config["SQLALCHEMY_DATABASE_URI"]
):
print("Database not found, initializing...")
try:
init_db()
@ -153,10 +194,10 @@ if __name__ == '__main__':
except Exception as e:
print(f"Error initializing database: {e}")
sys.exit(1)
# Run the application
try:
app.run(debug=True, port=5000)
except Exception as e:
print(f"Error starting Flask app: {e}")
sys.exit(1)
sys.exit(1)

View file

@ -9,79 +9,102 @@ import re
from flask import Flask
# Add the parent directory to sys.path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
def find_all_routes():
"""Find all route definitions in Python files"""
routes = []
route_pattern = re.compile(r'@\w+\.route\([\'"]([^\'"]+)[\'"]')
for root, _, files in os.walk('app'):
for root, _, files in os.walk("app"):
for file in files:
if file.endswith('.py'):
if file.endswith(".py"):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
content = f.read()
matches = route_pattern.findall(content)
for match in matches:
routes.append(match)
return routes
def find_template_references():
"""Find all url_for calls in template files"""
references = []
url_for_pattern = re.compile(r'url_for\([\'"]([^\'"]+)[\'"]')
for root, _, files in os.walk('app/templates'):
# Add patterns for direct links and HTMX references
direct_href_pattern = re.compile(r'href=[\'"]([^\'"]+)[\'"]')
htmx_pattern = re.compile(r'hx-(get|post|put|delete)=[\'"]([^\'"]+)[\'"]')
# Template files scanning
for root, _, files in os.walk("app/templates"):
for file in files:
if file.endswith('.html'):
if file.endswith(".html"):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
content = f.read()
# Find url_for references
matches = url_for_pattern.findall(content)
for match in matches:
references.append(match)
# Also check for direct route references in hrefs that aren't url_for
href_matches = direct_href_pattern.findall(content)
for href in href_matches:
if href.startswith("/") and not href.startswith("//"):
references.append(href)
# Check HTMX references
htmx_matches = htmx_pattern.findall(content)
for _, url in htmx_matches:
if url.startswith("/") and not url.startswith("//"):
references.append(url)
# Also check Python files for url_for calls
for root, _, files in os.walk('app'):
for root, _, files in os.walk("app"):
for file in files:
if file.endswith('.py'):
if file.endswith(".py"):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
content = f.read()
matches = url_for_pattern.findall(content)
for match in matches:
references.append(match)
return references
def check_unused_routes():
"""Find routes that are not referenced by url_for"""
from app import create_app
app = create_app()
# Get all route endpoints from the app
all_endpoints = set()
for rule in app.url_map.iter_rules():
all_endpoints.add(rule.endpoint)
# Get all url_for references
all_references = set(find_template_references())
# Find unused endpoints
unused_endpoints = all_endpoints - all_references
if unused_endpoints:
print("The following routes are defined but not referenced in templates or code:")
print(
"The following routes are defined but not referenced in templates or code:"
)
for endpoint in sorted(unused_endpoints):
# Skip static routes, error handlers, etc.
if endpoint.startswith('static') or endpoint == 'static':
if endpoint.startswith("static") or endpoint == "static":
continue
print(f" - {endpoint}")
# Find the URL for this endpoint
for rule in app.url_map.iter_rules():
if rule.endpoint == endpoint:
@ -90,5 +113,6 @@ def check_unused_routes():
else:
print("All routes are referenced in templates or code. Good job!")
if __name__ == "__main__":
check_unused_routes()
check_unused_routes()

View file

@ -8,18 +8,19 @@ import os
import shutil
import argparse
def cleanup(directory, verbose=False):
"""Clean up cache files and database files"""
cleaned_dirs = 0
cleaned_files = 0
# Files to clean
file_patterns = ['.pyc', '.pyo', '.~', '.swp', '.swo']
db_patterns = ['.db', '.sqlite', '.sqlite3', '-journal']
file_patterns = [".pyc", ".pyo", ".~", ".swp", ".swo"]
db_patterns = [".db", ".sqlite", ".sqlite3", "-journal"]
# Directories to clean
dir_patterns = ['__pycache__', '.pytest_cache', '.coverage', 'htmlcov']
dir_patterns = ["__pycache__", ".pytest_cache", ".coverage", "htmlcov"]
# Clean main directory
for root, dirs, files in os.walk(directory):
# Clean directories
@ -31,7 +32,7 @@ def cleanup(directory, verbose=False):
shutil.rmtree(dir_path)
cleaned_dirs += 1
dirs.remove(dir_name)
# Clean files
for file in files:
if any(file.endswith(pattern) for pattern in file_patterns + db_patterns):
@ -40,9 +41,9 @@ def cleanup(directory, verbose=False):
print(f"Removing file: {file_path}")
os.remove(file_path)
cleaned_files += 1
# Clean instance directory
instance_dir = os.path.join(directory, 'instance')
instance_dir = os.path.join(directory, "instance")
if os.path.exists(instance_dir):
for file in os.listdir(instance_dir):
if any(file.endswith(pattern) for pattern in db_patterns):
@ -51,13 +52,25 @@ def cleanup(directory, verbose=False):
print(f"Removing database file: {file_path}")
os.remove(file_path)
cleaned_files += 1
print(f"Cleanup completed! Removed {cleaned_dirs} directories and {cleaned_files} files.")
print(
f"Cleanup completed! Removed {cleaned_dirs} directories and {cleaned_files} files."
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Clean up Flask application cache and database files")
parser.add_argument("-v", "--verbose", action="store_true", help="Show detailed output")
parser.add_argument("-d", "--directory", default=".", help="Directory to clean (default: current directory)")
parser = argparse.ArgumentParser(
description="Clean up Flask application cache and database files"
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Show detailed output"
)
parser.add_argument(
"-d",
"--directory",
default=".",
help="Directory to clean (default: current directory)",
)
args = parser.parse_args()
cleanup(args.directory, args.verbose)
cleanup(args.directory, args.verbose)

View file

@ -10,59 +10,61 @@ from flask import Flask
from werkzeug.security import generate_password_hash
# Add the parent directory to sys.path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from app import create_app
from app.core.extensions import db
from app.core.auth import User
def create_admin_user(email=None, password=None):
"""Create an admin user in the database"""
app = create_app()
with app.app_context():
# Check if users already exist
if User.query.count() > 0:
print("Users already exist in the database.")
choice = input("Do you want to create another admin user? (y/n): ")
if choice.lower() != 'y':
if choice.lower() != "y":
print("Operation cancelled.")
return
# Prompt for email if not provided
if not email:
email = input("Enter admin email: ")
# Check if user with this email already exists
existing_user = User.query.filter_by(email=email).first()
if existing_user:
print(f"User with email {email} already exists!")
return
# Prompt for password if not provided
if not password:
password = getpass.getpass("Enter admin password: ")
confirm_password = getpass.getpass("Confirm password: ")
if password != confirm_password:
print("Passwords do not match!")
return
# Create the admin user
admin = User(email=email, is_admin=True)
admin.set_password(password)
db.session.add(admin)
db.session.commit()
print(f"Admin user created successfully: {email}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Create an admin user")
parser.add_argument("--email", help="Admin user email")
parser.add_argument("--password", help="Admin user password")
args = parser.parse_args()
create_admin_user(args.email, args.password)
create_admin_user(args.email, args.password)

View file

@ -5,9 +5,9 @@ from app import create_app
# os.environ['DATABASE_URL'] = 'your_production_database_url'
# Create a production application
app = create_app('production')
app = create_app("production")
if __name__ == '__main__':
if __name__ == "__main__":
# This is only used for development
# In production, a WSGI server would import this file
app.run(host='0.0.0.0', port=5000)
app.run(host="0.0.0.0", port=5000)