kinda working safe point

This commit is contained in:
pika 2025-03-23 03:29:05 +01:00
parent b9a82af12f
commit 6dda02141e
31 changed files with 4302 additions and 2937 deletions

47
app/routes/admin.py Normal file
View file

@ -0,0 +1,47 @@
from flask import Blueprint, render_template, jsonify, redirect, url_for, flash, current_app
from flask_login import login_required, current_user
from ..models import User
from ..migrations import run_migrations
from ..utils.reset_db import reset_database
bp = Blueprint('admin', __name__, url_prefix='/admin')
@bp.route('/')
@login_required
def index():
"""Admin panel home"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('dashboard.index'))
return render_template('admin/panel.html')
@bp.route('/run-migrations', methods=['POST'])
@login_required
def trigger_migrations():
"""Manually trigger database migrations (admin only)"""
# Check if user is admin
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized. Admin privileges required'}), 403
# Run migrations
with current_app.app_context():
run_migrations()
return jsonify({'success': True, 'message': 'Migrations completed successfully'})
@bp.route('/reset-database', methods=['POST'])
@login_required
def reset_db():
"""Reset the entire database (admin only)"""
# Check if user is admin
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized. Admin privileges required'}), 403
# Reset database
success = reset_database()
if success:
return jsonify({'success': True, 'message': 'Database reset successfully. You will be logged out.'})
else:
return jsonify({'error': 'Failed to reset database. Check logs for details.'}), 500

View file

@ -1,4 +1,4 @@
from flask import render_template, redirect, url_for, flash, request, current_app, jsonify, session
from flask import Blueprint, render_template, redirect, url_for, flash, request, current_app, jsonify, session
from flask_login import login_user, logout_user, login_required, current_user
from urllib.parse import urlparse
from app import db
@ -8,6 +8,13 @@ from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Length, EqualTo, ValidationError
from werkzeug.exceptions import BadRequest
from werkzeug.security import generate_password_hash
import logging
# Setup logging
logger = logging.getLogger(__name__)
bp = Blueprint('auth', __name__)
# Login form
class LoginForm(FlaskForm):
@ -28,51 +35,77 @@ class RegistrationForm(FlaskForm):
if user is not None:
raise ValidationError('Please use a different username.')
@auth_bp.route('/login', methods=['GET', 'POST'])
@bp.route('/login', methods=['GET', 'POST'])
def login():
"""User login page"""
if current_user.is_authenticated:
return redirect(url_for('dashboard.index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
if user is None or not user.verify_password(form.password.data):
flash('Invalid username or password', 'error')
return redirect(url_for('auth.login'))
login_user(user, remember=form.remember_me.data)
# Redirect to requested page or dashboard
next_page = request.args.get('next')
if not next_page or urlparse(next_page).netloc != '':
next_page = url_for('dashboard.index')
flash('Login successful!', 'success')
return redirect(next_page)
return render_template('auth/login.html', title='Sign In', form=form)
return render_template('auth/login.html', form=form)
@auth_bp.route('/logout')
@bp.route('/logout')
@login_required
def logout():
"""User logout"""
logout_user()
flash('You have been logged out', 'info')
flash('You have been logged out.', 'info')
return redirect(url_for('auth.login'))
@auth_bp.route('/register', methods=['GET', 'POST'])
@bp.route('/register', methods=['GET', 'POST'])
def register():
"""User registration page"""
if current_user.is_authenticated:
return redirect(url_for('dashboard.index'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(username=form.username.data)
user.set_password(form.password.data)
# Create new user
user = User(
username=form.username.data,
)
# Try both ways to set password
try:
# First try with set_password method
logger.info("Trying to set password with set_password method")
if hasattr(user, 'set_password'):
user.set_password(form.password.data)
else:
# Fall back to property setter
logger.info("set_password not found, using password property instead")
user.password = form.password.data
except Exception as e:
logger.error(f"Error setting password: {e}")
# Ensure we set the password somehow
user.password_hash = generate_password_hash(form.password.data)
logger.info("Set password_hash directly")
# Save to database
db.session.add(user)
db.session.commit()
logger.info(f"User {user.username} registered successfully")
flash('Registration successful! You can now log in.', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', title='Register', form=form)
return render_template('auth/register.html', form=form)
@auth_bp.route('/update_profile', methods=['POST'])
@login_required
@ -136,12 +169,8 @@ def update_preferences():
flash('Preferences updated successfully', 'success')
return redirect(url_for('auth.profile'))
@auth_bp.route('/profile')
@bp.route('/profile')
@login_required
def profile():
# Get theme preference from session or default to system
theme_preference = session.get('theme_preference', 'system')
return render_template('auth/profile.html',
title='User Profile',
theme_preference=theme_preference)
"""User profile page"""
return render_template('auth/profile.html', user=current_user)

View file

@ -1,59 +1,46 @@
from flask import Blueprint, render_template
from flask import Blueprint, render_template, redirect, url_for
from flask_login import login_required, current_user
from datetime import datetime, timedelta
from app.models import File, Share, Download
from ..models import File, Folder
import os
dashboard_bp = Blueprint('dashboard', __name__)
# Create blueprint with the name expected by __init__.py
bp = Blueprint('dashboard', __name__)
@dashboard_bp.route('/')
@bp.route('/')
@bp.route('/index')
@login_required
def index():
# Get some stats for the dashboard
total_files = File.query.filter_by(user_id=current_user.id, is_folder=False).count()
total_folders = File.query.filter_by(user_id=current_user.id, is_folder=True).count()
"""Dashboard index page"""
# Count user's files and folders
file_count = File.query.filter_by(user_id=current_user.id).count()
folder_count = Folder.query.filter_by(user_id=current_user.id).count()
# Recent files for quick access
recent_files = File.query.filter_by(user_id=current_user.id, is_folder=False)\
.order_by(File.updated_at.desc())\
.limit(8).all()
# Get storage usage
storage_used = sum(file.size for file in File.query.filter_by(user_id=current_user.id).all())
# Root folders for quick navigation
root_folders = File.query.filter_by(user_id=current_user.id, is_folder=True, parent_id=None)\
.order_by(File.name)\
.limit(8).all()
# Format size for display
if storage_used < 1024:
storage_used_formatted = f"{storage_used} bytes"
elif storage_used < 1024 * 1024:
storage_used_formatted = f"{storage_used / 1024:.2f} KB"
elif storage_used < 1024 * 1024 * 1024:
storage_used_formatted = f"{storage_used / (1024 * 1024):.2f} MB"
else:
storage_used_formatted = f"{storage_used / (1024 * 1024 * 1024):.2f} GB"
# Count active shares (if Share model exists)
active_shares = 0
recent_activities = 0
# Get recent files
recent_files = File.query.filter_by(user_id=current_user.id).order_by(File.created_at.desc()).limit(5).all()
# Check if Share and Download models exist/are imported
try:
# Count active shares
active_shares = Share.query.filter_by(user_id=current_user.id).filter(
(Share.expires_at > datetime.now()) | (Share.expires_at.is_(None))
).count()
# Recent activities count (downloads, shares, etc.)
recent_activities = Download.query.join(Share)\
.filter(Share.user_id == current_user.id)\
.filter(Download.timestamp > (datetime.now() - timedelta(days=7)))\
.count()
except:
# Models not ready yet, using default values
pass
# Create stats object that the template is expecting
stats = {
'file_count': file_count,
'folder_count': folder_count,
'storage_used': storage_used_formatted
}
return render_template('dashboard.html',
title='Dashboard',
total_files=total_files,
total_folders=total_folders,
recent_files=recent_files,
root_folders=root_folders,
active_shares=active_shares,
recent_activities=recent_activities,
now=datetime.now(),
file_icon=get_file_icon,
format_size=format_file_size)
return render_template('dashboard/index.html',
stats=stats, # Pass as stats object
recent_files=recent_files)
def get_file_icon(mime_type, filename):
"""Return Font Awesome icon class based on file type"""

View file

@ -2,7 +2,7 @@ from flask import Blueprint, render_template, redirect, url_for, flash, request,
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app import db
from app.models import File, Share
from app.models import File, Share, Folder
from config import Config
import os
from datetime import datetime, timedelta
@ -10,12 +10,13 @@ import uuid
import mimetypes
import shutil
import json
import time
files_bp = Blueprint('files', __name__, url_prefix='/files')
bp = Blueprint('files', __name__, url_prefix='/files')
@files_bp.route('/')
@files_bp.route('/browser')
@files_bp.route('/browser/<int:folder_id>')
@bp.route('/')
@bp.route('/browser')
@bp.route('/browser/<int:folder_id>')
@login_required
def browser(folder_id=None):
"""Display file browser interface"""
@ -23,7 +24,7 @@ def browser(folder_id=None):
breadcrumbs = []
if folder_id:
current_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
current_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Generate breadcrumbs
breadcrumbs = []
@ -36,22 +37,30 @@ def browser(folder_id=None):
# For initial load - only get folders and files if it's not an AJAX request
if not request.headers.get('X-Requested-With') == 'XMLHttpRequest':
if current_folder:
folders = File.query.filter_by(parent_id=current_folder.id, user_id=current_user.id, is_folder=True).all()
files = File.query.filter_by(parent_id=current_folder.id, user_id=current_user.id, is_folder=False).all()
folders = Folder.query.filter_by(parent_id=current_folder.id, user_id=current_user.id).all()
files = File.query.filter_by(folder_id=current_folder.id, user_id=current_user.id).all()
else:
folders = File.query.filter_by(parent_id=None, user_id=current_user.id, is_folder=True).all()
files = File.query.filter_by(parent_id=None, user_id=current_user.id, is_folder=False).all()
folders = Folder.query.filter_by(parent_id=None, user_id=current_user.id).all()
files = File.query.filter_by(folder_id=None, user_id=current_user.id).all()
return render_template('files/browser.html',
# Get the search query if provided
query = request.args.get('q', '')
if query:
# Filter folders and files by name containing the query
folders = [f for f in folders if query.lower() in f.name.lower()]
files = [f for f in files if query.lower() in f.name.lower()]
return render_template('files/browser.html',
current_folder=current_folder,
breadcrumbs=breadcrumbs,
folders=folders,
folders=folders,
files=files)
else:
# If it's an AJAX request, return JSON
return jsonify({'error': 'Use the /contents endpoint for AJAX requests'})
# For AJAX request, return just the folder contents
# Implement this if needed
pass
@files_bp.route('/contents')
@bp.route('/contents')
@login_required
def folder_contents():
"""Returns the HTML for folder contents (used for AJAX loading)"""
@ -63,31 +72,31 @@ def folder_contents():
# Get the current folder if a folder_id is provided
current_folder = None
if folder_id:
current_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
current_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Base query for folders and files
folders_query = File.query.filter_by(user_id=current_user.id, is_folder=True)
files_query = File.query.filter_by(user_id=current_user.id, is_folder=False)
folders_query = Folder.query.filter_by(user_id=current_user.id)
files_query = File.query.filter_by(user_id=current_user.id)
# Filter by parent folder
if current_folder:
folders_query = folders_query.filter_by(parent_id=current_folder.id)
files_query = files_query.filter_by(parent_id=current_folder.id)
files_query = files_query.filter_by(folder_id=current_folder.id)
else:
folders_query = folders_query.filter_by(parent_id=None)
files_query = files_query.filter_by(parent_id=None)
files_query = files_query.filter_by(folder_id=None)
# Apply search if provided
if search_query:
folders_query = folders_query.filter(File.name.ilike(f'%{search_query}%'))
folders_query = folders_query.filter(Folder.name.ilike(f'%{search_query}%'))
files_query = files_query.filter(File.name.ilike(f'%{search_query}%'))
# Apply sorting
if sort_by == 'name':
folders_query = folders_query.order_by(File.name.asc() if sort_order == 'asc' else File.name.desc())
folders_query = folders_query.order_by(Folder.name.asc() if sort_order == 'asc' else Folder.name.desc())
files_query = files_query.order_by(File.name.asc() if sort_order == 'asc' else File.name.desc())
elif sort_by == 'date':
folders_query = folders_query.order_by(File.updated_at.desc() if sort_order == 'asc' else File.updated_at.asc())
folders_query = folders_query.order_by(Folder.updated_at.desc() if sort_order == 'asc' else Folder.updated_at.asc())
files_query = files_query.order_by(File.updated_at.desc() if sort_order == 'asc' else File.updated_at.asc())
elif sort_by == 'size':
# Folders always come first, then sort files by size
@ -107,7 +116,7 @@ def folder_contents():
'name': folder.name,
'updated_at': folder.updated_at.isoformat(),
'is_folder': True,
'item_count': folder.children.count(),
'item_count': folder.files.count(),
'url': url_for('files.browser', folder_id=folder.id)
} for folder in folders]
@ -116,10 +125,10 @@ def folder_contents():
'name': file.name,
'size': file.size,
'formatted_size': format_file_size(file.size),
'mime_type': file.mime_type,
'type': file.type,
'updated_at': file.updated_at.isoformat(),
'is_folder': False,
'icon': get_file_icon(file.mime_type, file.name),
'icon': file.icon_class,
'url': url_for('files.download', file_id=file.id)
} for file in files]
@ -141,34 +150,53 @@ def folder_contents():
else:
return redirect(url_for('files.browser'))
@files_bp.route('/upload', methods=['GET', 'POST'])
@files_bp.route('/upload/<int:folder_id>', methods=['GET', 'POST'])
@bp.route('/upload', methods=['GET', 'POST'])
@bp.route('/upload/<int:folder_id>', methods=['GET', 'POST'])
@login_required
def upload(folder_id=None):
"""Page for uploading files"""
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
if request.method == 'POST':
# Handle XHR upload
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Check if file was included
if 'file' not in request.files:
# Handle file upload
if 'file' not in request.files:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
# Check if the file was actually selected
if file.filename == '':
flash('No file part', 'error')
return redirect(request.url)
file = request.files['file']
# Validate filename
if file.filename == '':
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'No selected file'}), 400
# Validate and save file
flash('No selected file', 'error')
return redirect(request.url)
# Get the parent folder
parent_folder = None
if folder_id:
parent_folder = Folder.query.get_or_404(folder_id)
# Check if user has permission
if parent_folder.user_id != current_user.id:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'You do not have permission to upload to this folder'}), 403
flash('You do not have permission to upload to this folder', 'error')
return redirect(url_for('files.browser'))
# Process the file
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# Generate UUID for storage
# Generate a unique filename
file_uuid = str(uuid.uuid4())
storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid)
_, file_extension = os.path.splitext(filename)
storage_name = f"{file_uuid}{file_extension}"
# Create storage path
upload_folder = current_app.config['UPLOAD_FOLDER']
user_folder = os.path.join(upload_folder, str(current_user.id))
os.makedirs(user_folder, exist_ok=True)
storage_path = os.path.join(user_folder, storage_name)
try:
# Save file to storage location
@ -183,12 +211,12 @@ def upload(folder_id=None):
# Create file record
db_file = File(
name=filename,
storage_name=file_uuid,
mime_type=mime_type,
original_name=filename,
path=storage_path,
size=file_size,
type=mime_type,
user_id=current_user.id,
parent_id=parent_folder.id if parent_folder else None,
is_folder=False
folder_id=parent_folder.id if parent_folder else None
)
db.session.add(db_file)
db.session.commit()
@ -201,8 +229,8 @@ def upload(folder_id=None):
'name': db_file.name,
'size': db_file.size,
'formatted_size': format_file_size(db_file.size),
'mime_type': db_file.mime_type,
'icon': get_file_icon(db_file.mime_type, db_file.name)
'type': db_file.type,
'icon': db_file.icon_class
}
})
@ -215,19 +243,15 @@ def upload(folder_id=None):
return jsonify({'error': str(e)}), 500
else:
# Regular form POST (non-XHR) - redirect to browser
flash('Please use the browser interface to upload files', 'info')
if parent_folder:
return redirect(url_for('files.browser', folder_id=parent_folder.id))
else:
return redirect(url_for('files.browser'))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'File type not allowed'}), 400
flash('File type not allowed', 'error')
return redirect(request.url)
# GET request - show upload page
return render_template('files/upload.html',
parent_folder=parent_folder,
title="Upload Files")
# GET request - show upload form
return render_template('files/upload.html', folder_id=folder_id)
@files_bp.route('/upload_folder', methods=['POST'])
@bp.route('/upload_folder', methods=['POST'])
@login_required
def upload_folder():
"""Handle folder upload - this processes ZIP files uploaded as folders"""
@ -235,7 +259,7 @@ def upload_folder():
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
parent_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Check if folder data was provided
if 'folder_data' not in request.form:
@ -246,13 +270,10 @@ def upload_folder():
folder_name = secure_filename(folder_data.get('name', 'Unnamed Folder'))
# Create folder record
folder = File(
folder = Folder(
name=folder_name,
is_folder=True,
user_id=current_user.id,
parent_id=parent_folder.id if parent_folder else None,
size=0,
mime_type=None
parent_id=parent_folder.id if parent_folder else None
)
db.session.add(folder)
db.session.flush() # Get folder.id without committing
@ -286,22 +307,18 @@ def upload_folder():
continue
# Check if folder already exists
subfolder = File.query.filter_by(
subfolder = Folder.query.filter_by(
name=part,
parent_id=current_parent_id,
user_id=current_user.id,
is_folder=True
user_id=current_user.id
).first()
if not subfolder:
# Create new subfolder
subfolder = File(
subfolder = Folder(
name=part,
is_folder=True,
user_id=current_user.id,
parent_id=current_parent_id,
size=0,
mime_type=None
parent_id=current_parent_id
)
db.session.add(subfolder)
db.session.flush()
@ -330,12 +347,12 @@ def upload_folder():
# Create file record
db_file = File(
name=filename,
storage_name=file_uuid,
mime_type=mime_type,
original_name=filename,
path=storage_path,
size=file_size,
type=mime_type,
user_id=current_user.id,
parent_id=current_parent_id,
is_folder=False
folder_id=current_parent_id
)
db.session.add(db_file)
file_records.append(db_file)
@ -368,23 +385,23 @@ def upload_folder():
current_app.logger.error(f"Folder upload parsing error: {str(e)}")
return jsonify({'error': str(e)}), 500
@files_bp.route('/download/<int:file_id>')
@bp.route('/download/<int:file_id>')
@login_required
def download(file_id):
"""Download a file"""
file = File.query.filter_by(id=file_id, user_id=current_user.id, is_folder=False).first_or_404()
file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404()
# Can't download folders directly
if file.is_folder:
if file.folder:
flash('Cannot download folders directly. Please use the ZIP option.', 'warning')
return redirect(url_for('files.browser', folder_id=file.id))
return redirect(url_for('files.browser', folder_id=file.folder.id))
# Check if file exists in storage
storage_path = os.path.join(Config.UPLOAD_FOLDER, file.storage_name)
storage_path = file.path
if not os.path.exists(storage_path):
flash('File not found in storage', 'error')
return redirect(url_for('files.browser', folder_id=file.parent_id))
return redirect(url_for('files.browser', folder_id=file.folder_id))
# Return the file
return send_file(
@ -393,131 +410,336 @@ def download(file_id):
as_attachment=True
)
@files_bp.route('/create_folder', methods=['POST'])
@bp.route('/create_folder', methods=['POST'])
@login_required
def create_folder():
"""Create a new folder"""
parent_id = request.form.get('parent_id', type=int)
folder_name = request.form.get('name', '').strip()
name = request.form.get('name')
parent_id = request.form.get('parent_id')
if not folder_name:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'Folder name is required'}), 400
else:
flash('Folder name is required', 'error')
return redirect(url_for('files.browser', folder_id=parent_id))
if not name:
flash('Folder name is required', 'danger')
return redirect(url_for('files.browser'))
# Sanitize folder name
folder_name = secure_filename(folder_name)
# Check if folder already exists
parent = None
if parent_id:
parent = File.query.filter_by(id=parent_id, user_id=current_user.id, is_folder=True).first_or_404()
existing = File.query.filter_by(
name=folder_name,
parent_id=parent_id,
user_id=current_user.id,
is_folder=True
).first()
else:
existing = File.query.filter_by(
name=folder_name,
parent_id=None,
user_id=current_user.id,
is_folder=True
).first()
if existing:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'A folder with this name already exists'}), 400
else:
flash('A folder with this name already exists', 'error')
return redirect(url_for('files.browser', folder_id=parent_id))
# Create new folder
new_folder = File(
name=folder_name,
is_folder=True,
# Create folder
folder = Folder(
name=name,
user_id=current_user.id,
parent_id=parent_id
parent_id=parent_id if parent_id else None
)
db.session.add(new_folder)
db.session.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'success': True,
'folder': {
'id': new_folder.id,
'name': new_folder.name,
'url': url_for('files.browser', folder_id=new_folder.id)
}
})
else:
flash('Folder created successfully', 'success')
return redirect(url_for('files.browser', folder_id=parent_id))
@files_bp.route('/rename/<int:item_id>', methods=['POST'])
@login_required
def rename(item_id):
"""Rename a file or folder"""
item = File.query.filter_by(id=item_id, user_id=current_user.id).first_or_404()
new_name = request.form.get('name', '').strip()
if not new_name:
return jsonify({'error': 'Name is required'}), 400
# Sanitize name
new_name = secure_filename(new_name)
# Check if a file/folder with this name already exists in the same location
existing = File.query.filter(
File.name == new_name,
File.parent_id == item.parent_id,
File.user_id == current_user.id,
File.is_folder == item.is_folder,
File.id != item.id
).first()
if existing:
return jsonify({'error': 'An item with this name already exists'}), 400
# Update name
item.name = new_name
db.session.commit()
return jsonify({
'success': True,
'item': {
'id': item.id,
'name': item.name
}
})
@files_bp.route('/delete/<int:item_id>', methods=['POST'])
@login_required
def delete(item_id):
"""Delete a file or folder"""
item = File.query.filter_by(id=item_id, user_id=current_user.id).first_or_404()
try:
# If it's a file, delete the actual file from storage
if not item.is_folder and item.storage_name:
storage_path = os.path.join(Config.UPLOAD_FOLDER, item.storage_name)
if os.path.exists(storage_path):
os.remove(storage_path)
# Delete the database record (this will cascade delete any children due to the model relationship)
db.session.delete(item)
db.session.add(folder)
db.session.commit()
return jsonify({'success': True})
flash(f'Folder "{name}" created successfully', 'success')
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Delete error: {str(e)}")
current_app.logger.error(f"Error creating folder: {str(e)}")
flash('Error creating folder', 'danger')
# Redirect to appropriate location
if parent_id:
return redirect(url_for('files.browser', folder_id=parent_id))
return redirect(url_for('files.browser'))
@bp.route('/rename', methods=['POST'])
@login_required
def rename_item():
"""Rename a file or folder"""
try:
# Get JSON data
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
item_id = data.get('item_id')
new_name = data.get('new_name')
# Validation
if not item_id or not new_name or new_name.strip() == '':
return jsonify({'error': 'Item ID and new name are required'}), 400
# Determine if it's a file or folder
file = File.query.filter_by(id=item_id, user_id=current_user.id).first()
folder = Folder.query.filter_by(id=item_id, user_id=current_user.id).first()
if file:
# For files, we need to handle the file system and database
old_path = file.path
file_dir = os.path.dirname(old_path)
# Create safe name
safe_name = secure_filename(new_name)
# Check for duplicates
existing_file = File.query.filter_by(
name=safe_name,
folder_id=file.folder_id,
user_id=current_user.id
).filter(File.id != file.id).first()
if existing_file:
return jsonify({'error': 'A file with this name already exists'}), 400
# Update file path
new_path = os.path.join(file_dir, safe_name)
# Rename file on disk
try:
if os.path.exists(old_path):
os.rename(old_path, new_path)
except OSError as e:
return jsonify({'error': f'Error renaming file: {str(e)}'}), 500
# Update database
file.name = safe_name
file.path = new_path
db.session.commit()
return jsonify({
'success': True,
'message': 'File renamed successfully',
'new_name': safe_name
})
elif folder:
# For folders, we just update the database
# Check for duplicates
existing_folder = Folder.query.filter_by(
name=new_name,
parent_id=folder.parent_id,
user_id=current_user.id
).filter(Folder.id != folder.id).first()
if existing_folder:
return jsonify({'error': 'A folder with this name already exists'}), 400
# Update folder name
folder.name = new_name
db.session.commit()
return jsonify({
'success': True,
'message': 'Folder renamed successfully',
'new_name': new_name
})
else:
return jsonify({'error': 'Item not found'}), 404
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error renaming item: {str(e)}")
return jsonify({'error': str(e)}), 500
@bp.route('/delete/<item_id>', methods=['POST'])
@login_required
def delete_item(item_id):
"""Delete a file or folder"""
try:
# Check if item exists
if not item_id or item_id == 'null':
return jsonify({'error': 'Invalid item ID'}), 400
# Determine if it's a file or folder
file = File.query.filter_by(id=item_id, user_id=current_user.id).first()
folder = Folder.query.filter_by(id=item_id, user_id=current_user.id).first()
if file:
# Delete file from storage
try:
if os.path.exists(file.path):
os.remove(file.path)
except OSError as e:
current_app.logger.error(f"Error deleting file from disk: {str(e)}")
# Delete from database
db.session.delete(file)
db.session.commit()
return jsonify({'success': True, 'message': 'File deleted successfully'})
elif folder:
# Check if folder has contents
has_files = File.query.filter_by(folder_id=folder.id).first() is not None
has_subfolders = Folder.query.filter_by(parent_id=folder.id).first() is not None
if has_files or has_subfolders:
# Delete recursively
delete_folder_recursive(folder.id)
return jsonify({'success': True, 'message': 'Folder and contents deleted successfully'})
else:
# Empty folder, simple delete
db.session.delete(folder)
db.session.commit()
return jsonify({'success': True, 'message': 'Folder deleted successfully'})
else:
return jsonify({'error': 'Item not found'}), 404
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error deleting item: {str(e)}")
return jsonify({'error': str(e)}), 500
def delete_folder_recursive(folder_id):
"""Recursively delete a folder and its contents"""
# Delete all files in the folder
files = File.query.filter_by(folder_id=folder_id).all()
for file in files:
try:
if os.path.exists(file.path):
os.remove(file.path)
except OSError:
pass
db.session.delete(file)
# Recursively delete subfolders
subfolders = Folder.query.filter_by(parent_id=folder_id).all()
for subfolder in subfolders:
delete_folder_recursive(subfolder.id)
# Delete the folder itself
folder = Folder.query.get(folder_id)
if folder:
db.session.delete(folder)
@bp.route('/upload_xhr', methods=['POST'])
@login_required
def upload_xhr():
"""Handle XHR file uploads with improved error handling"""
try:
# Get parent folder ID if provided
parent_folder_id = request.form.get('parent_folder_id')
parent_folder = None
if parent_folder_id:
parent_folder = Folder.query.get(parent_folder_id)
if not parent_folder or parent_folder.user_id != current_user.id:
return jsonify({'error': 'Invalid parent folder'}), 400
# Check if files were uploaded
if 'files[]' not in request.files:
return jsonify({'error': 'No files in request'}), 400
files = request.files.getlist('files[]')
if not files or len(files) == 0 or files[0].filename == '':
return jsonify({'error': 'No files selected'}), 400
# Process files
uploaded_files = []
for file in files:
# Handle folder uploads by parsing the path
path_parts = []
if '/' in file.filename:
# This is a file in a folder structure
path_parts = file.filename.split('/')
filename = path_parts[-1] # Last part is the actual filename
# Create folder structure
current_parent = parent_folder
for i, folder_name in enumerate(path_parts[:-1]):
if not folder_name: # Skip empty folder names
continue
# Check if folder already exists
existing_folder = Folder.query.filter_by(
name=folder_name,
parent_id=current_parent.id if current_parent else None,
user_id=current_user.id
).first()
if existing_folder:
current_parent = existing_folder
else:
# Create new folder
new_folder = Folder(
name=folder_name,
parent_id=current_parent.id if current_parent else None,
user_id=current_user.id
)
db.session.add(new_folder)
db.session.flush() # Get ID without committing
current_parent = new_folder
else:
# Regular file upload
filename = file.filename
current_parent = parent_folder
# Save the file
if not filename:
continue # Skip files with empty names
secure_name = secure_filename(filename)
# Check for duplicates
existing_file = File.query.filter_by(
name=secure_name,
folder_id=current_parent.id if current_parent else None,
user_id=current_user.id
).first()
if existing_file:
# Append timestamp to avoid overwrite
name_parts = secure_name.rsplit('.', 1)
if len(name_parts) > 1:
secure_name = f"{name_parts[0]}_{int(time.time())}.{name_parts[1]}"
else:
secure_name = f"{secure_name}_{int(time.time())}"
# Create file path
file_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], str(current_user.id))
# Ensure directory exists
os.makedirs(file_dir, exist_ok=True)
file_path = os.path.join(file_dir, secure_name)
# Save file to disk
file.save(file_path)
# Create file record in database
file_size = os.path.getsize(file_path)
file_type = file.content_type or 'application/octet-stream'
new_file = File(
name=secure_name,
original_name=filename,
path=file_path,
size=file_size,
type=file_type,
folder_id=current_parent.id if current_parent else None,
user_id=current_user.id
)
db.session.add(new_file)
uploaded_files.append({
'id': None, # Will be set after commit
'name': secure_name,
'size': file_size,
'type': file_type
})
# Commit all changes
db.session.commit()
# Update file IDs for response
for i, file_data in enumerate(uploaded_files):
if i < len(db.session.new):
file_data['id'] = db.session.new[i].id
return jsonify({
'success': True,
'message': f'Successfully uploaded {len(uploaded_files)} files',
'files': uploaded_files
})
except Exception as e:
# Log the error for debugging
current_app.logger.error(f"Upload error: {str(e)}")
db.session.rollback()
return jsonify({'error': str(e)}), 500
# Import the helper functions from __init__.py