365 lines
No EOL
10 KiB
Python
365 lines
No EOL
10 KiB
Python
from flask import Blueprint, jsonify, request, abort
|
|
from flask_login import login_required
|
|
from app.core.models import Subnet, Server, App, Port
|
|
from app.core.extensions import db
|
|
from app.scripts.ip_scanner import scan
|
|
import random
|
|
import ipaddress
|
|
|
|
bp = Blueprint('api', __name__, url_prefix='/api')
|
|
|
|
@bp.route('/subnets', methods=['GET'])
|
|
def get_subnets():
|
|
"""Get all subnets grouped by site"""
|
|
subnets = Subnet.query.all()
|
|
|
|
# Group subnets by location (site)
|
|
sites = {}
|
|
for subnet in subnets:
|
|
location = subnet.location
|
|
if location not in sites:
|
|
sites[location] = []
|
|
|
|
sites[location].append({
|
|
'id': subnet.id,
|
|
'cidr': subnet.cidr,
|
|
'location': location
|
|
})
|
|
|
|
# Convert to list of site objects
|
|
result = [
|
|
{
|
|
'name': site_name,
|
|
'subnets': subnets
|
|
} for site_name, subnets in sites.items()
|
|
]
|
|
|
|
return jsonify(result)
|
|
|
|
@bp.route('/subnets/<int:subnet_id>', methods=['GET'])
|
|
@login_required
|
|
def get_subnet(subnet_id):
|
|
"""Get details for a specific subnet"""
|
|
subnet = Subnet.query.get_or_404(subnet_id)
|
|
|
|
servers = []
|
|
for server in Server.query.filter_by(subnet_id=subnet_id).all():
|
|
servers.append({
|
|
'id': server.id,
|
|
'hostname': server.hostname,
|
|
'ip_address': server.ip_address,
|
|
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
|
|
})
|
|
|
|
result = {
|
|
'id': subnet.id,
|
|
'cidr': subnet.cidr,
|
|
'location': subnet.location,
|
|
'used_ips': subnet.used_ips,
|
|
'auto_scan': subnet.auto_scan,
|
|
'created_at': subnet.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'servers': servers
|
|
}
|
|
|
|
return jsonify(result)
|
|
|
|
@bp.route('/subnets/<int:subnet_id>/scan', methods=['POST'])
|
|
@login_required
|
|
def api_subnet_scan(subnet_id):
|
|
"""Scan a subnet via API"""
|
|
subnet = Subnet.query.get_or_404(subnet_id)
|
|
|
|
try:
|
|
results = scan(subnet.cidr, save_results=True)
|
|
return jsonify({
|
|
'success': True,
|
|
'subnet': subnet.cidr,
|
|
'hosts_found': len(results),
|
|
'results': results
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f'Error scanning subnet: {str(e)}'
|
|
}), 500
|
|
|
|
@bp.route('/servers', methods=['GET'])
|
|
@login_required
|
|
def get_servers():
|
|
"""Get all servers"""
|
|
servers = Server.query.all()
|
|
result = []
|
|
|
|
for server in servers:
|
|
result.append({
|
|
'id': server.id,
|
|
'hostname': server.hostname,
|
|
'ip_address': server.ip_address,
|
|
'subnet_id': server.subnet_id,
|
|
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
|
|
})
|
|
|
|
return jsonify({'servers': result})
|
|
|
|
@bp.route('/servers/<int:server_id>', methods=['GET'])
|
|
@login_required
|
|
def get_server(server_id):
|
|
"""Get a specific server"""
|
|
server = Server.query.get_or_404(server_id)
|
|
|
|
apps = []
|
|
for app in server.apps:
|
|
ports = []
|
|
for port in app.ports:
|
|
ports.append({
|
|
'id': port.id,
|
|
'port_number': port.port_number,
|
|
'protocol': port.protocol,
|
|
'description': port.description
|
|
})
|
|
|
|
apps.append({
|
|
'id': app.id,
|
|
'name': app.name,
|
|
'ports': ports,
|
|
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S')
|
|
})
|
|
|
|
result = {
|
|
'id': server.id,
|
|
'hostname': server.hostname,
|
|
'ip_address': server.ip_address,
|
|
'subnet_id': server.subnet_id,
|
|
'documentation': server.documentation,
|
|
'apps': apps,
|
|
'created_at': server.created_at.strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
return jsonify(result)
|
|
|
|
@bp.route('/apps', methods=['GET'])
|
|
@login_required
|
|
def get_apps():
|
|
"""Get all applications"""
|
|
apps = App.query.all()
|
|
result = []
|
|
|
|
for app in apps:
|
|
result.append({
|
|
'id': app.id,
|
|
'name': app.name,
|
|
'server_id': app.server_id,
|
|
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S')
|
|
})
|
|
|
|
return jsonify({'apps': result})
|
|
|
|
@bp.route('/apps/<int:app_id>', methods=['GET'])
|
|
@login_required
|
|
def get_app(app_id):
|
|
"""Get details for a specific application"""
|
|
app = App.query.get_or_404(app_id)
|
|
|
|
result = {
|
|
'id': app.id,
|
|
'name': app.name,
|
|
'server_id': app.server_id,
|
|
'documentation': app.documentation,
|
|
'created_at': app.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'ports': app.ports
|
|
}
|
|
|
|
return jsonify(result)
|
|
|
|
@bp.route('/status', methods=['GET'])
|
|
def status():
|
|
return jsonify({'status': 'OK'})
|
|
|
|
@bp.route('/markdown-preview', methods=['POST'])
|
|
def markdown_preview():
|
|
data = request.json
|
|
md_content = data.get('markdown', '')
|
|
html = markdown.markdown(md_content)
|
|
return jsonify({'html': html})
|
|
|
|
@bp.route('/ports/suggest', methods=['GET'])
|
|
def suggest_ports():
|
|
app_type = request.args.get('type', '').lower()
|
|
|
|
# Common port suggestions based on app type
|
|
suggestions = {
|
|
'web': [
|
|
{'port': 80, 'type': 'tcp', 'desc': 'HTTP'},
|
|
{'port': 443, 'type': 'tcp', 'desc': 'HTTPS'}
|
|
],
|
|
'database': [
|
|
{'port': 3306, 'type': 'tcp', 'desc': 'MySQL'},
|
|
{'port': 5432, 'type': 'tcp', 'desc': 'PostgreSQL'},
|
|
{'port': 1521, 'type': 'tcp', 'desc': 'Oracle'}
|
|
],
|
|
'mail': [
|
|
{'port': 25, 'type': 'tcp', 'desc': 'SMTP'},
|
|
{'port': 143, 'type': 'tcp', 'desc': 'IMAP'},
|
|
{'port': 110, 'type': 'tcp', 'desc': 'POP3'}
|
|
],
|
|
'file': [
|
|
{'port': 21, 'type': 'tcp', 'desc': 'FTP'},
|
|
{'port': 22, 'type': 'tcp', 'desc': 'SFTP/SSH'},
|
|
{'port': 445, 'type': 'tcp', 'desc': 'SMB'}
|
|
]
|
|
}
|
|
|
|
if app_type in suggestions:
|
|
return jsonify(suggestions[app_type])
|
|
|
|
# Default suggestions
|
|
return jsonify([
|
|
{'port': 80, 'type': 'tcp', 'desc': 'HTTP'},
|
|
{'port': 22, 'type': 'tcp', 'desc': 'SSH'}
|
|
])
|
|
|
|
@bp.route('/servers/<int:server_id>/suggest_port', methods=['GET'])
|
|
@login_required
|
|
def suggest_port(server_id):
|
|
"""Suggest a random unused port for a server"""
|
|
server = Server.query.get_or_404(server_id)
|
|
|
|
# Get all used ports for this server
|
|
used_ports = []
|
|
for app in server.apps:
|
|
for port in app.ports:
|
|
used_ports.append(port.port_number)
|
|
|
|
# Find an unused port in the dynamic/private port range
|
|
available_port = None
|
|
attempts = 0
|
|
|
|
while attempts < 50: # Try 50 times to find a random port
|
|
# Random port between 10000 and 65535
|
|
port = random.randint(10000, 65535)
|
|
|
|
if port not in used_ports:
|
|
available_port = port
|
|
break
|
|
|
|
attempts += 1
|
|
|
|
if available_port is None:
|
|
# If no random port found, find first available in sequence
|
|
for port in range(10000, 65536):
|
|
if port not in used_ports:
|
|
available_port = port
|
|
break
|
|
|
|
return jsonify({'port': available_port})
|
|
|
|
@bp.route('/apps/<int:app_id>/ports', methods=['GET'])
|
|
@login_required
|
|
def get_app_ports(app_id):
|
|
"""Get all ports for an app"""
|
|
app = App.query.get_or_404(app_id)
|
|
|
|
ports = []
|
|
for port in app.ports:
|
|
ports.append({
|
|
'id': port.id,
|
|
'port_number': port.port_number,
|
|
'protocol': port.protocol,
|
|
'description': port.description
|
|
})
|
|
|
|
return jsonify({'ports': ports})
|
|
|
|
@bp.route('/apps/<int:app_id>/ports', methods=['POST'])
|
|
@login_required
|
|
def add_app_port(app_id):
|
|
"""Add a new port to an app"""
|
|
app = App.query.get_or_404(app_id)
|
|
|
|
data = request.json
|
|
if not data or 'port_number' not in data:
|
|
return jsonify({'error': 'Missing port number'}), 400
|
|
|
|
port_number = data.get('port_number')
|
|
protocol = data.get('protocol', 'TCP')
|
|
description = data.get('description', '')
|
|
|
|
# Check if port already exists for this app
|
|
existing_port = Port.query.filter_by(app_id=app_id, port_number=port_number).first()
|
|
if existing_port:
|
|
return jsonify({'error': 'Port already exists for this app'}), 400
|
|
|
|
new_port = Port(
|
|
app_id=app_id,
|
|
port_number=port_number,
|
|
protocol=protocol,
|
|
description=description
|
|
)
|
|
|
|
db.session.add(new_port)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'id': new_port.id,
|
|
'port_number': new_port.port_number,
|
|
'protocol': new_port.protocol,
|
|
'description': new_port.description
|
|
})
|
|
|
|
@bp.route('/ports/<int:port_id>', methods=['DELETE'])
|
|
@login_required
|
|
def delete_port(port_id):
|
|
"""Delete a port"""
|
|
port = Port.query.get_or_404(port_id)
|
|
|
|
db.session.delete(port)
|
|
db.session.commit()
|
|
|
|
return jsonify({'success': True})
|
|
|
|
@bp.route('/subnets/<int:subnet_id>/servers', methods=['GET'])
|
|
def get_subnet_servers(subnet_id):
|
|
"""Get all servers for a specific subnet"""
|
|
servers = Server.query.filter_by(subnet_id=subnet_id).all()
|
|
return jsonify([{
|
|
'id': server.id,
|
|
'hostname': server.hostname,
|
|
'ip_address': server.ip_address
|
|
} for server in servers])
|
|
|
|
@bp.route('/server/<int:server_id>/ports', methods=['GET'])
|
|
@login_required
|
|
def get_server_ports(server_id):
|
|
"""Get all used ports for a server"""
|
|
server = Server.query.get_or_404(server_id)
|
|
|
|
# Get all ports associated with this server
|
|
ports = Port.query.filter_by(server_id=server_id).all()
|
|
used_ports = [port.number for port in ports]
|
|
|
|
return jsonify({
|
|
'server_id': server_id,
|
|
'used_ports': used_ports
|
|
})
|
|
|
|
@bp.route('/server/<int:server_id>/free-port', methods=['GET'])
|
|
@login_required
|
|
def get_free_port(server_id):
|
|
"""Find a free port for a server"""
|
|
server = Server.query.get_or_404(server_id)
|
|
|
|
# Get all ports associated with this server
|
|
used_ports = [port.number for port in Port.query.filter_by(server_id=server_id).all()]
|
|
|
|
# Find the first free port (starting from 8000)
|
|
for port_number in range(8000, 9000):
|
|
if port_number not in used_ports:
|
|
return jsonify({
|
|
'success': True,
|
|
'port': port_number
|
|
})
|
|
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'No free ports available in the range 8000-9000'
|
|
}) |