batman (working version kinda)

This commit is contained in:
pika 2025-03-30 19:20:13 +02:00
commit 6dd38036e7
65 changed files with 3950 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

199
app/routes/api.py Normal file
View file

@ -0,0 +1,199 @@
from flask import Blueprint, jsonify, request, abort
from flask_login import login_required
from app.core.models import Subnet, Server, App
from app.core.extensions import db
from app.scripts.ip_scanner import scan
bp = Blueprint('api', __name__, url_prefix='/api')
@bp.route('/subnets', methods=['GET'])
@login_required
def get_subnets():
"""Get all subnets"""
subnets = Subnet.query.all()
result = []
for subnet in subnets:
result.append({
'id': subnet.id,
'cidr': subnet.cidr,
'location': subnet.location,
'used_ips': subnet.used_ips,
'auto_scan': subnet.auto_scan,
'created_at': subnet.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify({'subnets': result})
@bp.route('/subnets/<int:subnet_id>', methods=['GET'])
@login_required
def get_subnet(subnet_id):
"""Get details for a specific subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
servers = []
for server in Server.query.filter_by(subnet_id=subnet_id).all():
servers.append({
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
result = {
'id': subnet.id,
'cidr': subnet.cidr,
'location': subnet.location,
'used_ips': subnet.used_ips,
'auto_scan': subnet.auto_scan,
'created_at': subnet.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'servers': servers
}
return jsonify(result)
@bp.route('/subnets/<int:subnet_id>/scan', methods=['POST'])
@login_required
def api_subnet_scan(subnet_id):
"""Scan a subnet via API"""
subnet = Subnet.query.get_or_404(subnet_id)
try:
results = scan(subnet.cidr, save_results=True)
return jsonify({
'success': True,
'subnet': subnet.cidr,
'hosts_found': len(results),
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Error scanning subnet: {str(e)}'
}), 500
@bp.route('/servers', methods=['GET'])
@login_required
def get_servers():
"""Get all servers"""
servers = Server.query.all()
result = []
for server in servers:
result.append({
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'subnet_id': server.subnet_id,
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify({'servers': result})
@bp.route('/servers/<int:server_id>', methods=['GET'])
@login_required
def get_server(server_id):
"""Get details for a specific server"""
server = Server.query.get_or_404(server_id)
apps = []
for app in App.query.filter_by(server_id=server_id).all():
apps.append({
'id': app.id,
'name': app.name,
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
result = {
'id': server.id,
'hostname': server.hostname,
'ip_address': server.ip_address,
'subnet_id': server.subnet_id,
'documentation': server.documentation,
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'ports': server.ports,
'apps': apps
}
return jsonify(result)
@bp.route('/apps', methods=['GET'])
@login_required
def get_apps():
"""Get all applications"""
apps = App.query.all()
result = []
for app in apps:
result.append({
'id': app.id,
'name': app.name,
'server_id': app.server_id,
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify({'apps': result})
@bp.route('/apps/<int:app_id>', methods=['GET'])
@login_required
def get_app(app_id):
"""Get details for a specific application"""
app = App.query.get_or_404(app_id)
result = {
'id': app.id,
'name': app.name,
'server_id': app.server_id,
'documentation': app.documentation,
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'ports': app.ports
}
return jsonify(result)
@bp.route('/status', methods=['GET'])
def status():
return jsonify({'status': 'OK'})
@bp.route('/markdown-preview', methods=['POST'])
def markdown_preview():
data = request.json
md_content = data.get('markdown', '')
html = markdown.markdown(md_content)
return jsonify({'html': html})
@bp.route('/ports/suggest', methods=['GET'])
def suggest_ports():
app_type = request.args.get('type', '').lower()
# Common port suggestions based on app type
suggestions = {
'web': [
{'port': 80, 'type': 'tcp', 'desc': 'HTTP'},
{'port': 443, 'type': 'tcp', 'desc': 'HTTPS'}
],
'database': [
{'port': 3306, 'type': 'tcp', 'desc': 'MySQL'},
{'port': 5432, 'type': 'tcp', 'desc': 'PostgreSQL'},
{'port': 1521, 'type': 'tcp', 'desc': 'Oracle'}
],
'mail': [
{'port': 25, 'type': 'tcp', 'desc': 'SMTP'},
{'port': 143, 'type': 'tcp', 'desc': 'IMAP'},
{'port': 110, 'type': 'tcp', 'desc': 'POP3'}
],
'file': [
{'port': 21, 'type': 'tcp', 'desc': 'FTP'},
{'port': 22, 'type': 'tcp', 'desc': 'SFTP/SSH'},
{'port': 445, 'type': 'tcp', 'desc': 'SMB'}
]
}
if app_type in suggestions:
return jsonify(suggestions[app_type])
# Default suggestions
return jsonify([
{'port': 80, 'type': 'tcp', 'desc': 'HTTP'},
{'port': 22, 'type': 'tcp', 'desc': 'SSH'}
])

76
app/routes/auth.py Normal file
View file

@ -0,0 +1,76 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from app.core.extensions import db
from app.core.auth import User
bp = Blueprint('auth', __name__, url_prefix='/auth')
@bp.route('/login', methods=['GET', 'POST'])
def login():
# If already logged in, redirect to dashboard
if current_user.is_authenticated:
return redirect(url_for('dashboard.dashboard_home'))
if request.method == 'POST':
email = request.form.get('email')
password = request.form.get('password')
remember = 'remember' in request.form
user = User.query.filter_by(email=email).first()
if user and user.check_password(password):
login_user(user, remember=remember)
next_page = request.args.get('next')
if next_page:
return redirect(next_page)
return redirect(url_for('dashboard.dashboard_home'))
flash('Invalid email or password', 'danger')
return render_template('auth/login.html', title='Login')
@bp.route('/register', methods=['GET', 'POST'])
def register():
# If already logged in, redirect to dashboard
if current_user.is_authenticated:
return redirect(url_for('dashboard.dashboard_home'))
if request.method == 'POST':
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
# Check if email already exists
existing_user = User.query.filter_by(email=email).first()
if existing_user:
flash('Email already registered', 'danger')
return render_template('auth/register.html', title='Register')
# Check if passwords match
if password != password_confirm:
flash('Passwords do not match', 'danger')
return render_template('auth/register.html', title='Register')
# Create new user
user = User(email=email)
user.set_password(password)
# Make first user an admin
if User.query.count() == 0:
user.is_admin = True
db.session.add(user)
db.session.commit()
flash('Registration successful! Please log in.', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', title='Register')
@bp.route('/logout')
@login_required
def logout():
logout_user()
flash('You have been logged out', 'info')
return redirect(url_for('auth.login'))

303
app/routes/dashboard.py Normal file
View file

@ -0,0 +1,303 @@
from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify
from flask_login import login_required, current_user
import markdown
from app.core.models import Server, App, Subnet
from app.core.extensions import db, limiter
from datetime import datetime
bp = Blueprint('dashboard', __name__, url_prefix='/dashboard')
@bp.route('/')
@login_required
def dashboard_home():
"""Main dashboard view showing server statistics"""
server_count = Server.query.count()
app_count = App.query.count()
subnet_count = Subnet.query.count()
# Get latest added servers
latest_servers = Server.query.order_by(Server.created_at.desc()).limit(5).all()
# Get subnets with usage stats
subnets = Subnet.query.all()
for subnet in subnets:
subnet.usage_percent = subnet.used_ips / 254 * 100 if subnet.cidr.endswith('/24') else 0
return render_template(
'dashboard/index.html',
title='Dashboard',
server_count=server_count,
app_count=app_count,
subnet_count=subnet_count,
latest_servers=latest_servers,
subnets=subnets,
now=datetime.now()
)
@bp.route('/servers')
@login_required
def server_list():
"""List all servers"""
servers = Server.query.order_by(Server.hostname).all()
return render_template(
'dashboard/server_list.html',
title='Servers',
servers=servers,
now=datetime.now()
)
@bp.route('/server/<int:server_id>')
@login_required
def server_view(server_id):
"""View server details"""
server = Server.query.get_or_404(server_id)
apps = App.query.filter_by(server_id=server_id).all()
return render_template(
'dashboard/server_view.html',
title=f'Server - {server.hostname}',
server=server,
apps=apps,
markdown=markdown.markdown,
now=datetime.now()
)
@bp.route('/server/new', methods=['GET', 'POST'])
@login_required
def server_new():
"""Create a new server"""
subnets = Subnet.query.all()
if request.method == 'POST':
hostname = request.form.get('hostname')
ip_address = request.form.get('ip_address')
subnet_id = request.form.get('subnet_id')
documentation = request.form.get('documentation', '')
# Basic validation
if not hostname or not ip_address or not subnet_id:
flash('Please fill in all required fields', 'danger')
return render_template(
'dashboard/server_form.html',
title='New Server',
subnets=subnets,
now=datetime.now()
)
# Check if hostname or IP already exists
if Server.query.filter_by(hostname=hostname).first():
flash('Hostname already exists', 'danger')
return render_template(
'dashboard/server_form.html',
title='New Server',
subnets=subnets,
now=datetime.now()
)
if Server.query.filter_by(ip_address=ip_address).first():
flash('IP address already exists', 'danger')
return render_template(
'dashboard/server_form.html',
title='New Server',
subnets=subnets,
now=datetime.now()
)
# Create new server
server = Server(
hostname=hostname,
ip_address=ip_address,
subnet_id=subnet_id,
documentation=documentation
)
db.session.add(server)
db.session.commit()
flash('Server created successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server.id))
return render_template(
'dashboard/server_form.html',
title='New Server',
subnets=subnets,
now=datetime.now()
)
@bp.route('/server/<int:server_id>/edit', methods=['GET', 'POST'])
@login_required
def server_edit(server_id):
"""Edit an existing server"""
server = Server.query.get_or_404(server_id)
if request.method == 'POST':
hostname = request.form.get('hostname')
ip_address = request.form.get('ip_address')
subnet_id = request.form.get('subnet_id')
if not hostname or not ip_address or not subnet_id:
flash('All fields are required', 'danger')
return redirect(url_for('dashboard.server_edit', server_id=server_id))
# Check if hostname changed and already exists
if hostname != server.hostname and Server.query.filter_by(hostname=hostname).first():
flash('Hostname already exists', 'danger')
return redirect(url_for('dashboard.server_edit', server_id=server_id))
# Check if IP changed and already exists
if ip_address != server.ip_address and Server.query.filter_by(ip_address=ip_address).first():
flash('IP address already exists', 'danger')
return redirect(url_for('dashboard.server_edit', server_id=server_id))
# Update server
server.hostname = hostname
server.ip_address = ip_address
server.subnet_id = subnet_id
db.session.commit()
flash('Server updated successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server.id))
# GET request - show form with current values
subnets = Subnet.query.all()
return render_template(
'dashboard/server_edit.html',
title=f'Edit Server - {server.hostname}',
server=server,
subnets=subnets
)
@bp.route('/server/<int:server_id>/delete', methods=['POST'])
@login_required
def server_delete(server_id):
"""Delete a server"""
server = Server.query.get_or_404(server_id)
# Delete all apps associated with this server
App.query.filter_by(server_id=server_id).delete()
# Delete the server
db.session.delete(server)
db.session.commit()
flash('Server deleted successfully', 'success')
return redirect(url_for('dashboard.dashboard_home'))
@bp.route('/app/new', methods=['GET', 'POST'])
@login_required
def app_new():
"""Create a new application"""
servers = Server.query.all()
if request.method == 'POST':
name = request.form.get('name')
server_id = request.form.get('server_id')
documentation = request.form.get('documentation', '')
# Basic validation
if not name or not server_id:
flash('Please fill in all required fields', 'danger')
return render_template(
'dashboard/app_form.html',
title='New Application',
servers=servers,
now=datetime.now()
)
app = App(
name=name,
server_id=server_id,
documentation=documentation
)
db.session.add(app)
db.session.commit()
flash('Application created successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server_id))
return render_template(
'dashboard/app_form.html',
title='New Application',
servers=servers,
now=datetime.now()
)
@bp.route('/app/<int:app_id>', methods=['GET'])
@login_required
def app_view(app_id):
"""View a specific application"""
app = App.query.get_or_404(app_id)
server = Server.query.get(app.server_id)
return render_template(
'dashboard/app_view.html',
title=f'Application - {app.name}',
app=app,
server=server,
markdown=markdown.markdown,
now=datetime.now()
)
@bp.route('/app/<int:app_id>/edit', methods=['GET', 'POST'])
@login_required
def app_edit(app_id):
"""Edit an existing application"""
app = App.query.get_or_404(app_id)
if request.method == 'POST':
name = request.form.get('name')
server_id = request.form.get('server_id')
documentation = request.form.get('documentation', '')
# Process ports
ports = []
port_numbers = request.form.getlist('port[]')
port_types = request.form.getlist('port_type[]')
port_descs = request.form.getlist('port_desc[]')
for i in range(len(port_numbers)):
if port_numbers[i]:
port = {
'port': int(port_numbers[i]),
'type': port_types[i] if i < len(port_types) else 'tcp',
'desc': port_descs[i] if i < len(port_descs) else '',
'status': 'open'
}
ports.append(port)
# Update app
app.name = name
app.server_id = server_id
app.documentation = documentation
app.ports = ports
db.session.commit()
flash('Application updated successfully', 'success')
return redirect(url_for('dashboard.app_view', app_id=app.id))
# GET request - show form with current values
servers = Server.query.all()
return render_template(
'dashboard/app_edit.html',
title=f'Edit Application - {app.name}',
app=app,
servers=servers,
use_editor=True
)
@bp.route('/app/<int:app_id>/delete', methods=['POST'])
@login_required
def app_delete(app_id):
"""Delete an application"""
app = App.query.get_or_404(app_id)
server_id = app.server_id
db.session.delete(app)
db.session.commit()
flash('Application deleted successfully', 'success')
return redirect(url_for('dashboard.server_view', server_id=server_id))

104
app/routes/importexport.py Normal file
View file

@ -0,0 +1,104 @@
from flask import Blueprint, request, jsonify, make_response, render_template
from app.core.models import Subnet, Server, App
from app.core.extensions import db
import csv
import io
import datetime
bp = Blueprint('importexport', __name__, url_prefix='/import-export')
MODEL_MAP = {
'subnet': Subnet,
'server': Server,
'app': App
}
@bp.route('/export/<model_name>', methods=['GET'])
def export_model(model_name):
if model_name not in MODEL_MAP:
return jsonify({'error': 'Invalid model name'}), 400
model = MODEL_MAP[model_name]
instances = model.query.all()
# Create a CSV file in memory
output = io.StringIO()
writer = csv.writer(output)
# Get column names from model
columns = [column.name for column in model.__table__.columns]
writer.writerow(columns)
# Write data
for instance in instances:
row = [getattr(instance, column) for column in columns]
writer.writerow(row)
# Create response
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{model_name}_{timestamp}.csv"
response = make_response(output.getvalue())
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
response.headers['Content-type'] = 'text/csv'
return response
@bp.route('/import/<model_name>', methods=['GET', 'POST'])
def import_model(model_name):
if model_name not in MODEL_MAP:
return jsonify({'error': 'Invalid model name'}), 400
model = MODEL_MAP[model_name]
if request.method == 'GET':
# Show import form
return render_template('import_form.html', model_name=model_name)
# Process CSV upload
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not file.filename.endswith('.csv'):
return jsonify({'error': 'File must be CSV format'}), 400
try:
# Read CSV
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_reader = csv.reader(stream)
# Get headers
headers = next(csv_reader)
# Validate required columns
required_columns = [col.name for col in model.__table__.columns
if not col.nullable and col.name != 'id']
for col in required_columns:
if col not in headers:
return jsonify({'error': f'Required column {col} missing'}), 400
# Process rows
imported = 0
for row in csv_reader:
data = dict(zip(headers, row))
# Remove id to create new record
if 'id' in data:
del data['id']
# Create new instance
instance = model(**data)
db.session.add(instance)
imported += 1
db.session.commit()
return jsonify({'success': f'Imported {imported} records successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500

145
app/routes/ipam.py Normal file
View file

@ -0,0 +1,145 @@
from flask import Blueprint, render_template, redirect, url_for, request, flash, jsonify
from flask_login import login_required
from app.core.models import Subnet, Server
from app.core.extensions import db
from app.scripts.ip_scanner import scan
import ipaddress
from datetime import datetime
bp = Blueprint('ipam', __name__, url_prefix='/ipam')
@bp.route('/')
@login_required
def ipam_home():
"""Main IPAM dashboard"""
subnets = Subnet.query.all()
# Calculate usage for each subnet
for subnet in subnets:
subnet.usage_percent = subnet.used_ips / 254 * 100 if subnet.cidr.endswith('/24') else 0
return render_template(
'ipam/index.html',
title='IPAM Dashboard',
subnets=subnets,
now=datetime.now()
)
@bp.route('/subnet/new', methods=['GET', 'POST'])
@login_required
def subnet_new():
"""Create a new subnet"""
if request.method == 'POST':
cidr = request.form.get('cidr')
location = request.form.get('location')
auto_scan = 'auto_scan' in request.form
# Basic validation
if not cidr or not location:
flash('Please fill in all required fields', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet',
now=datetime.now()
)
# Check if valid CIDR
try:
ipaddress.ip_network(cidr)
except ValueError:
flash('Invalid CIDR notation', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet',
now=datetime.now()
)
# Check if subnet already exists
if Subnet.query.filter_by(cidr=cidr).first():
flash('Subnet already exists', 'danger')
return render_template(
'ipam/subnet_form.html',
title='New Subnet',
now=datetime.now()
)
subnet = Subnet(
cidr=cidr,
location=location,
auto_scan=auto_scan
)
db.session.add(subnet)
db.session.commit()
flash('Subnet created successfully', 'success')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet.id))
return render_template(
'ipam/subnet_form.html',
title='New Subnet',
now=datetime.now()
)
@bp.route('/subnet/<int:subnet_id>')
@login_required
def subnet_view(subnet_id):
"""View subnet details"""
subnet = Subnet.query.get_or_404(subnet_id)
servers = Server.query.filter_by(subnet_id=subnet_id).all()
# Get network info
network = ipaddress.ip_network(subnet.cidr)
total_ips = network.num_addresses - 2 # Excluding network and broadcast addresses
used_ips = len(servers)
usage_percent = (used_ips / total_ips) * 100 if total_ips > 0 else 0
return render_template(
'ipam/subnet_view.html',
title=f'Subnet - {subnet.cidr}',
subnet=subnet,
servers=servers,
total_ips=total_ips,
used_ips=used_ips,
usage_percent=usage_percent,
now=datetime.now()
)
@bp.route('/subnet/<int:subnet_id>/scan')
@login_required
def subnet_scan(subnet_id):
"""Scan a subnet for active hosts"""
subnet = Subnet.query.get_or_404(subnet_id)
try:
results = scan(subnet.cidr, save_results=True)
flash(f'Scan completed for subnet {subnet.cidr}. Found {len(results)} active hosts.', 'success')
except Exception as e:
flash(f'Error scanning subnet: {e}', 'danger')
return redirect(url_for('ipam.subnet_view', subnet_id=subnet_id))
@bp.route('/subnet/<int:subnet_id>/visualize')
@login_required
def subnet_visualize(subnet_id):
"""Visualize IP usage in a subnet"""
subnet = Subnet.query.get_or_404(subnet_id)
servers = Server.query.filter_by(subnet_id=subnet_id).all()
# Create a dictionary of used IPs
used_ips = {server.ip_address: server.hostname for server in servers}
# Get network info
network = ipaddress.ip_network(subnet.cidr)
total_ips = network.num_addresses - 2 # Excluding network and broadcast addresses
used_ip_count = len(servers)
return render_template(
'ipam/subnet_visualization.html',
title=f'Subnet Visualization - {subnet.cidr}',
subnet=subnet,
network=network,
used_ips=used_ips,
total_ips=total_ips,
used_ip_count=used_ip_count,
now=datetime.now()
)