This commit is contained in:
pika 2025-03-31 13:47:34 +02:00
parent 5a309a0f6d
commit 1ab129b798
5 changed files with 499 additions and 107 deletions

View file

@ -295,25 +295,36 @@ def add_app_port(app_id):
protocol = request.form.get("protocol", "TCP")
description = request.form.get("description", "")
# Validate port data
# Validate port data with server-side conflict check
valid, clean_port, error = validate_port_data(
port_number, protocol, description
port_number, protocol, description, app.server_id, app_id
)
if not valid:
flash(error, "danger")
# If port is in use by another app, provide a direct link
if "already in use by application" in error:
app_name = error.split("'")[1] # Extract app name from error message
conflict_app = App.query.filter_by(name=app_name, server_id=app.server_id).first()
if conflict_app:
edit_url = url_for('dashboard.app_edit', app_id=conflict_app.id)
edit_link = f'<a href="{edit_url}">Edit {app_name}</a>'
flash(f"Would you like to edit the conflicting application? {edit_link}", "info")
return (
redirect(url_for("dashboard.app_view", app_id=app_id))
if not is_ajax
else jsonify({"success": False, "error": error})
), 400
# Check if port already exists
# Check if port already exists for this app
existing_port = Port.query.filter_by(
app_id=app_id, port_number=clean_port
app_id=app_id, port_number=clean_port, protocol=protocol
).first()
if existing_port:
error_msg = f"Port {clean_port} already exists for this application"
error_msg = f"Port {clean_port}/{protocol} already exists for this application"
flash(error_msg, "warning")
return (
redirect(url_for("dashboard.app_view", app_id=app_id))
@ -457,3 +468,84 @@ def get_free_port(server_id):
return jsonify(
{"success": False, "error": "No free ports available in the range 8000-9000"}
)
@bp.route("/validate/app-name", methods=["GET"])
@login_required
def validate_app_name():
"""API endpoint to validate application name in real-time"""
name = request.args.get("name", "").strip()
server_id = request.args.get("server_id")
app_id = request.args.get("app_id")
if not name or not server_id:
return jsonify({"valid": False, "message": "Missing required parameters"})
# Check for existing app with same name
query = App.query.filter(App.name == name, App.server_id == server_id)
if app_id:
query = query.filter(App.id != int(app_id))
existing_app = query.first()
if existing_app:
return jsonify({
"valid": False,
"message": f"Application '{name}' already exists on this server",
"app_id": existing_app.id,
"edit_url": url_for('dashboard.app_edit', app_id=existing_app.id)
})
# Check for similar names
similar_apps = App.query.filter(
App.name.ilike(f"%{name}%"),
App.server_id == server_id
).limit(3).all()
if similar_apps and (not app_id or not any(str(app.id) == app_id for app in similar_apps)):
similar_names = [{"name": app.name, "id": app.id} for app in similar_apps]
return jsonify({
"valid": True,
"warning": "Similar application names found",
"similar_apps": similar_names
})
return jsonify({"valid": True})
@bp.route("/validate/app-port", methods=["GET"])
@login_required
def validate_app_port():
"""API endpoint to validate port in real-time"""
port_number = request.args.get("port_number", "").strip()
protocol = request.args.get("protocol", "TCP")
server_id = request.args.get("server_id")
app_id = request.args.get("app_id")
if not port_number or not server_id:
return jsonify({"valid": False, "message": "Missing required parameters"})
try:
clean_port = int(port_number)
if clean_port < 1 or clean_port > 65535:
return jsonify({"valid": False, "message": "Port must be between 1 and 65535"})
in_use, app_name = is_port_in_use(
clean_port, protocol, server_id,
exclude_app_id=app_id if app_id else None
)
if in_use:
conflict_app = App.query.filter_by(name=app_name, server_id=server_id).first()
return jsonify({
"valid": False,
"message": f"Port {clean_port}/{protocol} is already in use by application '{app_name}'",
"app_id": conflict_app.id if conflict_app else None,
"edit_url": url_for('dashboard.app_edit', app_id=conflict_app.id) if conflict_app else None
})
return jsonify({"valid": True})
except ValueError:
return jsonify({"valid": False, "message": "Invalid port number"})

View file

@ -4,7 +4,7 @@ import markdown
from app.core.models import Server, App, Subnet, Port
from app.core.extensions import db, limiter
from datetime import datetime
from app.utils.app_utils import validate_app_data
from app.utils.app_utils import validate_app_data, is_port_in_use
bp = Blueprint("dashboard", __name__, url_prefix="/dashboard")
@ -223,61 +223,54 @@ def server_delete(server_id):
def app_new(server_id=None):
"""Create a new application"""
servers = Server.query.all()
if request.method == "POST":
# Handle form submission
name = request.form.get("name")
name = request.form.get("name", "").strip()
server_id = request.form.get("server_id")
documentation = request.form.get("documentation", "")
url = request.form.get("url", "") # Get the URL
if not name or not server_id:
flash("Name and server are required", "danger")
return render_template(
"dashboard/app_form.html",
title="New Application",
edit_mode=False,
servers=servers,
selected_server_id=server_id,
)
# Create the app with the URL
app = App(name=name, server_id=server_id, documentation=documentation, url=url)
try:
db.session.add(app)
db.session.commit()
# Process port numbers if any
port_numbers = request.form.getlist("port_numbers[]")
protocols = request.form.getlist("protocols[]")
descriptions = request.form.getlist("port_descriptions[]")
for i, port_number in enumerate(port_numbers):
if port_number and port_number.isdigit():
port = Port(
app_id=app.id,
port_number=int(port_number),
protocol=protocols[i] if i < len(protocols) else "TCP",
description=descriptions[i] if i < len(descriptions) else "",
)
db.session.add(port)
db.session.commit()
url = request.form.get("url", "")
# Process port data from form
port_data = []
port_numbers = request.form.getlist("port_numbers[]")
protocols = request.form.getlist("protocols[]")
descriptions = request.form.getlist("port_descriptions[]")
for i in range(len(port_numbers)):
if port_numbers[i] and port_numbers[i].strip():
protocol = protocols[i] if i < len(protocols) else "TCP"
description = descriptions[i] if i < len(descriptions) else ""
port_data.append((port_numbers[i], protocol, description))
# Server-side validation
from app.utils.app_utils import save_app
success, app, error = save_app(name, server_id, documentation, port_data, url=url)
if success:
flash(f"Application '{name}' created successfully", "success")
return redirect(url_for("dashboard.app_view", app_id=app.id))
except Exception as e:
db.session.rollback()
flash(f"Error creating application: {str(e)}", "danger")
# GET request - render the form
else:
flash(error, "danger")
# Check if it's a port conflict and provide direct link
if "already in use by application" in error:
try:
app_name = error.split("'")[1] # Extract app name from error
conflict_app = App.query.filter_by(name=app_name, server_id=server_id).first()
if conflict_app:
edit_url = url_for('dashboard.app_edit', app_id=conflict_app.id)
flash(f'<a href="{edit_url}" class="alert-link">Edit conflicting application</a>', "info")
except:
pass
# GET request or validation failed - render the form
return render_template(
"dashboard/app_form.html",
title="New Application",
edit_mode=False,
servers=servers,
selected_server_id=server_id, # This will pre-select the server
selected_server_id=server_id,
)
@ -324,6 +317,44 @@ def app_edit(app_id):
description = descriptions[i] if i < len(descriptions) else ""
port_data.append((port_numbers[i], protocol, description))
# Check for port conflicts proactively
conflicts = []
for i, (port_number, protocol, _) in enumerate(port_data):
try:
clean_port = int(port_number)
in_use, conflicting_app_name = is_port_in_use(
clean_port, protocol, server_id, exclude_app_id=app_id
)
if in_use:
conflicts.append((clean_port, protocol, conflicting_app_name))
except (ValueError, TypeError):
continue
if conflicts:
# Find the IDs of conflicting apps for linking
conflict_msgs = []
for port, protocol, conflict_app_name in conflicts:
conflict_app = App.query.filter_by(name=conflict_app_name, server_id=server_id).first()
if conflict_app:
edit_url = url_for('dashboard.app_edit', app_id=conflict_app.id)
conflict_msgs.append(
f'Port {port}/{protocol} is in use by <a href="{edit_url}">{conflict_app_name}</a>'
)
else:
conflict_msgs.append(f'Port {port}/{protocol} is in use by {conflict_app_name}')
for msg in conflict_msgs:
flash(msg, "danger")
return render_template(
"dashboard/app_form.html",
title=f"Edit {app.name}",
edit_mode=True,
servers=servers,
app=app
)
# Replace local validation with shared function
valid, error = validate_app_data(name, server_id, existing_app_id=app_id)
@ -338,7 +369,7 @@ def app_edit(app_id):
from app.utils.app_utils import save_app
success, updated_app, error = save_app(
name, server_id, documentation, port_data, app_id
name, server_id, documentation, port_data, app_id, url
)
if success:
@ -346,17 +377,28 @@ def app_edit(app_id):
return redirect(url_for("dashboard.app_view", app_id=app_id))
else:
flash(error, "danger")
# Extract app name from error and provide link if it's a conflict
if "already in use by application" in error:
app_name = error.split("'")[1] # Extract app name from error message
conflict_app = App.query.filter_by(name=app_name, server_id=server_id).first()
if conflict_app:
edit_url = url_for('dashboard.app_edit', app_id=conflict_app.id)
flash(
f'Would you like to edit the conflicting application? '
f'<a href="{edit_url}">Edit {app_name}</a>',
"info"
)
else:
flash(error, "danger")
# For GET requests or failed POSTs
# GET request - display the form
return render_template(
"dashboard/app_form.html",
title=f"Edit Application: {app.name}",
title=f"Edit {app.name}",
edit_mode=True,
app=app,
dashboard_link=url_for("dashboard.dashboard_home"),
servers=servers,
app=app
)

177
app/static/js/validation.js Normal file
View file

@ -0,0 +1,177 @@
// Application Name Validation
function validateAppName() {
const nameField = document.getElementById('app-name');
const serverField = document.getElementById('server-id');
const feedbackElement = document.getElementById('name-feedback');
const submitButton = document.querySelector('button[type="submit"]');
if (!nameField || !serverField) return;
const name = nameField.value.trim();
const serverId = serverField.value;
const appId = document.getElementById('app-id')?.value;
if (!name || !serverId) {
clearFeedback(feedbackElement);
return;
}
// Debounce to avoid too many requests
clearTimeout(nameField.timer);
nameField.timer = setTimeout(() => {
fetch(`/api/validate/app-name?name=${encodeURIComponent(name)}&server_id=${serverId}${appId ? '&app_id=' + appId : ''}`)
.then(response => response.json())
.then(data => {
if (!data.valid) {
showError(feedbackElement, data.message);
if (data.edit_url) {
feedbackElement.innerHTML += ` <a href="${data.edit_url}" class="alert-link">Edit this app instead?</a>`;
}
submitButton.disabled = true;
} else if (data.warning) {
showWarning(feedbackElement, data.warning);
if (data.similar_apps && data.similar_apps.length > 0) {
feedbackElement.innerHTML += '<ul class="mb-0 mt-1">';
data.similar_apps.forEach(app => {
feedbackElement.innerHTML += `<li><a href="/dashboard/app/${app.id}/edit" class="alert-link">${app.name}</a></li>`;
});
feedbackElement.innerHTML += '</ul>';
}
submitButton.disabled = false;
} else {
showSuccess(feedbackElement, "App name is available");
submitButton.disabled = false;
}
})
.catch(error => {
console.error('Validation error:', error);
clearFeedback(feedbackElement);
});
}, 300);
}
// Port Validation
function validatePort(portField, protocolField) {
const serverField = document.getElementById('server-id');
const feedbackElement = portField.nextElementSibling;
const submitButton = document.querySelector('button[type="submit"]');
if (!portField || !serverField || !protocolField) return;
const port = portField.value.trim();
const protocol = protocolField.value;
const serverId = serverField.value;
const appId = document.getElementById('app-id')?.value;
if (!port || !serverId) {
clearFeedback(feedbackElement);
return;
}
// Debounce to avoid too many requests
clearTimeout(portField.timer);
portField.timer = setTimeout(() => {
fetch(`/api/validate/app-port?port_number=${encodeURIComponent(port)}&protocol=${protocol}&server_id=${serverId}${appId ? '&app_id=' + appId : ''}`)
.then(response => response.json())
.then(data => {
if (!data.valid) {
showError(feedbackElement, data.message);
if (data.edit_url) {
feedbackElement.innerHTML += ` <a href="${data.edit_url}" class="alert-link">Edit the conflicting app?</a>`;
}
portField.classList.add('is-invalid');
submitButton.disabled = true;
} else {
clearFeedback(feedbackElement);
portField.classList.remove('is-invalid');
portField.classList.add('is-valid');
submitButton.disabled = false;
}
})
.catch(error => {
console.error('Validation error:', error);
clearFeedback(feedbackElement);
});
}, 300);
}
// Helper functions for feedback display
function showError(element, message) {
if (!element) return;
element.className = 'invalid-feedback d-block';
element.textContent = message;
}
function showWarning(element, message) {
if (!element) return;
element.className = 'text-warning d-block';
element.textContent = message;
}
function showSuccess(element, message) {
if (!element) return;
element.className = 'valid-feedback d-block';
element.textContent = message;
}
function clearFeedback(element) {
if (!element) return;
element.className = '';
element.textContent = '';
}
// Set up event listeners when the document is loaded
document.addEventListener('DOMContentLoaded', function () {
const nameField = document.getElementById('app-name');
const serverField = document.getElementById('server-id');
if (nameField) {
nameField.addEventListener('input', validateAppName);
if (serverField) {
serverField.addEventListener('change', validateAppName);
}
}
// Set up validation for existing port fields
setupPortValidation();
// Make sure new port rows get validation too
const addPortButton = document.getElementById('add-port-btn');
if (addPortButton) {
addPortButton.addEventListener('click', function () {
// Wait a moment for the new row to be added
setTimeout(setupPortValidation, 100);
});
}
});
function setupPortValidation() {
const portFields = document.querySelectorAll('input[name="port_numbers[]"]');
const protocolFields = document.querySelectorAll('select[name="protocols[]"]');
portFields.forEach((field, index) => {
if (!field.hasValidationSetup) {
const protocolField = protocolFields[index];
field.addEventListener('input', function () {
validatePort(field, protocolField);
});
if (protocolField) {
protocolField.addEventListener('change', function () {
validatePort(field, protocolField);
});
}
// Add a div for feedback if it doesn't exist
if (!field.nextElementSibling || !field.nextElementSibling.classList.contains('feedback')) {
const feedback = document.createElement('div');
feedback.className = 'feedback';
field.parentNode.insertBefore(feedback, field.nextSibling);
}
field.hasValidationSetup = true;
}
});
}

View file

@ -43,14 +43,17 @@
{% endwith %}
<form method="POST"
action="{{ url_for('dashboard.app_edit', app_id=app.id) if edit_mode else url_for('dashboard.app_new') }}">
action="{% if edit_mode %}{{ url_for('dashboard.app_edit', app_id=app.id) }}{% else %}{{ url_for('dashboard.app_new') }}{% endif %}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
{% if app %}
<input type="hidden" id="app-id" name="app_id" value="{{ app.id }}">
{% endif %}
<div class="mb-3">
<label class="form-label required">Application Name</label>
<input type="text" class="form-control" name="name" value="{{ app.name if app else '' }}" required
placeholder="Enter application name">
<small class="form-hint">Choose a unique name for this application</small>
<label for="app-name" class="form-label required">Application Name</label>
<input type="text" class="form-control" id="app-name" name="name" required
value="{% if app %}{{ app.name }}{% endif %}">
<div id="name-feedback"></div>
</div>
<div class="mb-3">
@ -68,17 +71,16 @@
</div>
<div class="mb-3">
<label class="form-label required">Server</label>
<select class="form-select" name="server_id" required>
<option value="">Select a server</option>
<label for="server-id" class="form-label required">Server</label>
<select class="form-select" id="server-id" name="server_id" required>
<option value="">Select Server</option>
{% for server in servers %}
<option value="{{ server.id }}" {% if app and server.id==app.server_id %}selected {% elif selected_server_id
and server.id|string==selected_server_id|string %}selected {% endif %}>
<option value="{{ server.id }}" {% if (app and app.server_id==server.id) or (selected_server_id and
selected_server_id|int==server.id|int) %}selected{% endif %}>
{{ server.hostname }} ({{ server.ip_address }})
</option>
{% endfor %}
</select>
<small class="form-hint">Select the server where this application runs</small>
</div>
<div class="mb-3">
@ -305,4 +307,9 @@
}
}
</script>
{% endblock %}
{% block scripts %}
{{ super() }}
<script src="{{ url_for('static', filename='js/validation.js') }}"></script>
{% endblock %}

View file

@ -22,51 +22,122 @@ def validate_app_data(name, server_id, existing_app_id=None):
if query.first():
return False, f"Application '{name}' already exists on this server"
# Find similar app names on this server for suggestion
similar_apps = App.query.filter(
App.name.ilike(f"%{name}%"),
App.server_id == server_id
).limit(3).all()
if similar_apps and (not existing_app_id or not any(app.id == existing_app_id for app in similar_apps)):
similar_names = ", ".join([f"'{app.name}'" for app in similar_apps])
return False, f"Similar application names found on this server: {similar_names}"
return True, None
def validate_port_data(port_number, protocol, description=None):
def is_port_in_use(port_number, protocol, server_id, exclude_app_id=None):
"""
Check if a port+protocol combination is already in use by any application on the server
Args:
port_number: The port number to check
protocol: The protocol (TCP, UDP, etc.)
server_id: The server ID
exclude_app_id: Optional app ID to exclude from the check (for edit operations)
Returns:
Tuple (bool, app_name): Is port in use and by which app
"""
from sqlalchemy import and_
# Join App and Port models to find ports used by apps on this server
query = db.session.query(Port, App).join(App).filter(
Port.port_number == port_number,
Port.protocol == protocol,
App.server_id == server_id
)
# Exclude the current app if editing
if exclude_app_id:
query = query.filter(App.id != exclude_app_id)
result = query.first()
if result:
return True, result.App.name
return False, None
def validate_port_data(port_number, protocol, description, server_id=None, app_id=None):
"""
Validate port data
Returns tuple (valid, clean_port_number, error_message)
Returns tuple (valid, clean_port, error_message)
"""
# Clean and validate port number
# Existing validation
if not port_number:
return False, None, "Port number is required"
try:
port = int(port_number)
if port < 1 or port > 65535:
return False, None, "Port must be between 1 and 65535"
except (ValueError, TypeError):
return False, None, "Invalid port number"
clean_port = int(port_number)
except ValueError:
return False, None, "Port number must be a valid integer"
# Validate protocol
valid_protocols = ["TCP", "UDP", "SCTP", "OTHER"]
if protocol not in valid_protocols:
return False, None, f"Protocol must be one of: {', '.join(valid_protocols)}"
if clean_port < 1 or clean_port > 65535:
return False, None, "Port number must be between 1 and 65535"
return True, port, None
# Add server-side check for conflicts if server_id is provided
if server_id:
in_use, app_name = is_port_in_use(clean_port, protocol, server_id, app_id)
if in_use:
return False, clean_port, f"Port {clean_port}/{protocol} is already in use by application '{app_name}'"
return True, clean_port, None
def process_app_ports(app_id, port_data):
def process_app_ports(app_id, port_data, server_id=None):
"""
Process port data for an application
port_data should be a list of tuples (port_number, protocol, description)
Returns (success, error_message)
"""
# Get the app's server_id if not provided
if not server_id and app_id:
app = App.query.get(app_id)
if app:
server_id = app.server_id
# Track the port+protocol combinations we've seen to avoid duplicates
seen_ports = set()
try:
for port_number, protocol, description in port_data:
# Skip empty port entries
if not port_number or not port_number.strip():
continue
port_key = f"{port_number}/{protocol}"
# Check for duplicates within this form submission
if port_key in seen_ports:
return False, f"Duplicate port: {port_key} is specified multiple times"
seen_ports.add(port_key)
# Validate the port data
valid, clean_port, error = validate_port_data(
port_number, protocol, description
port_number, protocol, description, server_id, app_id
)
if not valid:
continue # Skip invalid ports
return False, error
# Check if port already exists
# Check if port already exists for this app
existing_port = Port.query.filter_by(
app_id=app_id, port_number=clean_port
app_id=app_id, port_number=clean_port, protocol=protocol
).first()
if existing_port:
# Update existing port
existing_port.protocol = protocol
existing_port.description = description
else:
# Create new port
@ -84,46 +155,49 @@ def process_app_ports(app_id, port_data):
return False, str(e)
def save_app(name, server_id, documentation, port_data=None, existing_app_id=None):
def save_app(name, server_id, documentation, port_data, app_id=None, url=None):
"""
Save or update an application and its ports
Returns tuple (success, app_object_or_none, error_message)
Save or update an application
Returns (success, app, error_message)
"""
try:
# Validate input data
valid, error = validate_app_data(name, server_id, existing_app_id)
# Validate application data
valid, error = validate_app_data(name, server_id, app_id)
if not valid:
return False, None, error
# Create or update app
if existing_app_id:
if app_id:
# Update existing app
app = App.query.get(existing_app_id)
app = App.query.get(app_id)
if not app:
return False, None, f"Application with ID {existing_app_id} not found"
return False, None, "Application not found"
app.name = name
app.server_id = server_id
app.documentation = documentation
app.url = url
else:
# Create new app
app = App(name=name, server_id=server_id, documentation=documentation)
app = App(
name=name,
server_id=server_id,
documentation=documentation,
url=url
)
db.session.add(app)
db.session.flush() # Get the app ID without committing
# Flush to get the app ID if new
db.session.flush()
# Remove all existing ports if updating
if app_id:
Port.query.filter_by(app_id=app_id).delete()
# Process ports if provided
if port_data:
success, error = process_app_ports(app.id, port_data)
if not success:
db.session.rollback()
return False, None, f"Error processing ports: {error}"
# Process and save ports
port_success, port_error = process_app_ports(app.id, port_data, server_id)
if not port_success:
db.session.rollback()
return False, None, port_error
# Commit all changes
db.session.commit()
return True, app, None
except Exception as e:
db.session.rollback()
return False, None, str(e)