from flask import Blueprint, request, jsonify, make_response, render_template from app.core.models import Subnet, Server, App from app.core.extensions import db import csv import io import datetime bp = Blueprint("importexport", __name__, url_prefix="/import-export") MODEL_MAP = {"subnet": Subnet, "server": Server, "app": App} @bp.route("/export/", methods=["GET"]) def export_model(model_name): if model_name not in MODEL_MAP: return jsonify({"error": "Invalid model name"}), 400 model = MODEL_MAP[model_name] instances = model.query.all() # Create a CSV file in memory output = io.StringIO() writer = csv.writer(output) # Get column names from model columns = [column.name for column in model.__table__.columns] writer.writerow(columns) # Write data for instance in instances: row = [getattr(instance, column) for column in columns] writer.writerow(row) # Create response timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{model_name}_{timestamp}.csv" response = make_response(output.getvalue()) response.headers["Content-Disposition"] = f"attachment; filename={filename}" response.headers["Content-type"] = "text/csv" return response @bp.route("/import/", methods=["GET", "POST"]) def import_model(model_name): if model_name not in MODEL_MAP: return jsonify({"error": "Invalid model name"}), 400 model = MODEL_MAP[model_name] if request.method == "GET": # Show import form return render_template("import_form.html", model_name=model_name) # Process CSV upload if "file" not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files["file"] if file.filename == "": return jsonify({"error": "No selected file"}), 400 if not file.filename.endswith(".csv"): return jsonify({"error": "File must be CSV format"}), 400 try: # Read CSV stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) csv_reader = csv.reader(stream) # Get headers headers = next(csv_reader) # Validate required columns required_columns = [ col.name for col in model.__table__.columns if not col.nullable and col.name != "id" ] for col in required_columns: if col not in headers: return jsonify({"error": f"Required column {col} missing"}), 400 # Process rows imported = 0 for row in csv_reader: data = dict(zip(headers, row)) # Remove id to create new record if "id" in data: del data["id"] # Create new instance instance = model(**data) db.session.add(instance) imported += 1 db.session.commit() return jsonify({"success": f"Imported {imported} records successfully"}) except Exception as e: db.session.rollback() return jsonify({"error": str(e)}), 500