Flask-Files/app/routes/files.py
2025-03-24 20:34:42 +01:00

776 lines
No EOL
29 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, send_from_directory, abort, jsonify, send_file, current_app
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app import db
from app.models import File, Share, Folder
from config import Config
import os
from datetime import datetime, timedelta
import uuid
import mimetypes
import shutil
import json
import time
bp = Blueprint('files', __name__, url_prefix='/files')
@bp.route('/')
@bp.route('/browser')
@bp.route('/browser/<int:folder_id>')
@login_required
def browser(folder_id=None):
"""File browser page"""
# Get the current folder
current_folder = None
if folder_id:
current_folder = Folder.query.get_or_404(folder_id)
# Check if user has permission
if current_folder.user_id != current_user.id:
flash('You do not have permission to access this folder', 'error')
return redirect(url_for('files.browser'))
# Get breadcrumb navigation
breadcrumbs = []
if current_folder:
# Get parent folders for breadcrumb
parent = current_folder
while parent:
breadcrumbs.insert(0, parent)
if parent.parent_id:
parent = Folder.query.get(parent.parent_id)
else:
break
# Get subfolders and files
if current_folder:
subfolders = Folder.query.filter_by(parent_id=current_folder.id, user_id=current_user.id).all()
files = File.query.filter_by(folder_id=current_folder.id, user_id=current_user.id).all()
else:
# Root level - show folders and files without parent folder
subfolders = Folder.query.filter_by(parent_id=None, user_id=current_user.id).all()
files = File.query.filter_by(folder_id=None, user_id=current_user.id).all()
# Always return a valid response
return render_template('files/browser.html',
current_folder=current_folder,
breadcrumbs=breadcrumbs,
subfolders=subfolders,
files=files)
@bp.route('/contents')
@login_required
def folder_contents():
"""Returns the HTML for folder contents (used for AJAX loading)"""
folder_id = request.args.get('folder_id', type=int)
sort_by = request.args.get('sort', 'name')
sort_order = request.args.get('order', 'asc')
search_query = request.args.get('search', '')
# Get the current folder if a folder_id is provided
current_folder = None
if folder_id:
current_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Base query for folders and files
folders_query = Folder.query.filter_by(user_id=current_user.id)
files_query = File.query.filter_by(user_id=current_user.id)
# Filter by parent folder
if current_folder:
folders_query = folders_query.filter_by(parent_id=current_folder.id)
files_query = files_query.filter_by(folder_id=current_folder.id)
else:
folders_query = folders_query.filter_by(parent_id=None)
files_query = files_query.filter_by(folder_id=None)
# Apply search if provided
if search_query:
folders_query = folders_query.filter(Folder.name.ilike(f'%{search_query}%'))
files_query = files_query.filter(File.name.ilike(f'%{search_query}%'))
# Apply sorting
if sort_by == 'name':
folders_query = folders_query.order_by(Folder.name.asc() if sort_order == 'asc' else Folder.name.desc())
files_query = files_query.order_by(File.name.asc() if sort_order == 'asc' else File.name.desc())
elif sort_by == 'date':
folders_query = folders_query.order_by(Folder.updated_at.desc() if sort_order == 'asc' else Folder.updated_at.asc())
files_query = files_query.order_by(File.updated_at.desc() if sort_order == 'asc' else File.updated_at.asc())
elif sort_by == 'size':
# Folders always come first, then sort files by size
files_query = files_query.order_by(File.size.asc() if sort_order == 'asc' else File.size.desc())
# Get the results
folders = folders_query.all()
files = files_query.all()
# Different format of response based on requested format
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# If AJAX request, check if JSON or HTML is requested
if request.args.get('format') == 'json':
# Return JSON response with folder and file data
folder_data = [{
'id': folder.id,
'name': folder.name,
'updated_at': folder.updated_at.isoformat(),
'is_folder': True,
'item_count': folder.files.count(),
'url': url_for('files.browser', folder_id=folder.id)
} for folder in folders]
file_data = [{
'id': file.id,
'name': file.name,
'size': file.size,
'formatted_size': format_file_size(file.size),
'type': file.type,
'updated_at': file.updated_at.isoformat(),
'is_folder': False,
'icon': file.icon_class,
'url': url_for('files.download', file_id=file.id)
} for file in files]
return jsonify({
'folders': folder_data,
'files': file_data,
'total_items': len(folder_data) + len(file_data)
})
else:
# Return HTML partial
return render_template('files/partials/folder_contents.html',
current_folder=current_folder,
folders=folders,
files=files)
else:
# If not AJAX, redirect to browser view
if current_folder:
return redirect(url_for('files.browser', folder_id=current_folder.id))
else:
return redirect(url_for('files.browser'))
@bp.route('/upload')
@bp.route('/upload/<int:folder_id>')
@login_required
def upload(folder_id=None):
"""Redirect to browser with upload modal visible"""
if folder_id:
return redirect(url_for('files.browser', folder_id=folder_id, show_upload=True))
return redirect(url_for('files.browser', show_upload=True))
@bp.route('/upload_folder', methods=['POST'])
@login_required
def upload_folder():
"""Handle folder upload - this processes ZIP files uploaded as folders"""
folder_id = request.form.get('folder_id', None, type=int)
parent_folder = None
if folder_id:
parent_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Check if folder data was provided
if 'folder_data' not in request.form:
return jsonify({'error': 'No folder data provided'}), 400
try:
folder_data = json.loads(request.form['folder_data'])
folder_name = secure_filename(folder_data.get('name', 'Unnamed Folder'))
# Create folder record
folder = Folder(
name=folder_name,
user_id=current_user.id,
parent_id=parent_folder.id if parent_folder else None
)
db.session.add(folder)
db.session.flush() # Get folder.id without committing
# Process files
file_records = []
total_size = 0
# Create temp dir for uploaded files
temp_dir = os.path.join(Config.UPLOAD_FOLDER, 'temp', str(uuid.uuid4()))
os.makedirs(temp_dir, exist_ok=True)
try:
for file_info in folder_data.get('files', []):
original_path = file_info.get('path', '')
# Skip files without a valid path
if not original_path:
continue
# Get relative path within the folder
path_parts = original_path.split('/')
# Create intermediate folders as needed
current_parent_id = folder.id
for i, part in enumerate(path_parts[:-1]):
part = secure_filename(part)
# Skip empty parts
if not part:
continue
# Check if folder already exists
subfolder = Folder.query.filter_by(
name=part,
parent_id=current_parent_id,
user_id=current_user.id
).first()
if not subfolder:
# Create new subfolder
subfolder = Folder(
name=part,
user_id=current_user.id,
parent_id=current_parent_id
)
db.session.add(subfolder)
db.session.flush()
current_parent_id = subfolder.id
# Process the file if it exists in the request
file_uid = file_info.get('id')
if file_uid and file_uid in request.files:
file = request.files[file_uid]
filename = secure_filename(path_parts[-1])
# Generate UUID for storage
file_uuid = str(uuid.uuid4())
storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid)
# Save file
file.save(storage_path)
# Get file info
file_size = os.path.getsize(storage_path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Create file record
db_file = File(
name=filename,
original_name=filename,
path=storage_path,
size=file_size,
type=mime_type,
user_id=current_user.id,
folder_id=current_parent_id
)
db.session.add(db_file)
file_records.append(db_file)
total_size += file_size
# Update folder size (sum of all files in the folder and subfolders)
folder.size = total_size
# Commit all changes
db.session.commit()
# Return success with folder info
return jsonify({
'success': True,
'folder': {
'id': folder.id,
'name': folder.name
}
})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Folder upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
# Clean up temp directory
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
current_app.logger.error(f"Folder upload parsing error: {str(e)}")
return jsonify({'error': str(e)}), 500
@bp.route('/download/<int:file_id>')
@login_required
def download(file_id):
"""Download a file"""
file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404()
# Can't download folders directly
if file.folder:
flash('Cannot download folders directly. Please use the ZIP option.', 'warning')
return redirect(url_for('files.browser', folder_id=file.folder.id))
# Check if file exists in storage
storage_path = file.path
if not os.path.exists(storage_path):
flash('File not found in storage', 'error')
return redirect(url_for('files.browser', folder_id=file.folder_id))
# Return the file
return send_file(
storage_path,
download_name=file.name,
as_attachment=True
)
@bp.route('/create_folder', methods=['POST'])
@login_required
def create_folder():
"""Create a new folder"""
# Check if request is AJAX
is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
try:
# Get data from form or JSON depending on request type
if is_ajax and request.is_json:
data = request.get_json()
folder_name = data.get('folder_name')
parent_id = data.get('parent_id')
else:
folder_name = request.form.get('folder_name')
parent_id = request.form.get('parent_id')
if not folder_name:
if is_ajax:
return jsonify({'success': False, 'error': 'Folder name is required'}), 400
flash('Folder name is required', 'error')
return redirect(url_for('files.browser', folder_id=parent_id if parent_id else None))
# Validate folder name
if not is_valid_filename(folder_name):
if is_ajax:
return jsonify({'success': False, 'error': 'Invalid folder name'}), 400
flash('Invalid folder name. Please use only letters, numbers, spaces, and common punctuation.', 'error')
return redirect(url_for('files.browser', folder_id=parent_id if parent_id else None))
# Check if folder already exists
parent_folder = None
if parent_id:
parent_folder = Folder.query.get_or_404(parent_id)
# Check if user has permission
if parent_folder.user_id != current_user.id:
if is_ajax:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
flash('You do not have permission to create a folder here', 'error')
return redirect(url_for('files.browser'))
# Check if folder with same name exists in parent
existing = Folder.query.filter_by(
name=folder_name,
parent_id=parent_id,
user_id=current_user.id
).first()
else:
# Check if folder with same name exists at root level
existing = Folder.query.filter_by(
name=folder_name,
parent_id=None,
user_id=current_user.id
).first()
if existing:
if is_ajax:
return jsonify({'success': False, 'error': f'A folder named "{folder_name}" already exists'}), 400
flash(f'A folder named "{folder_name}" already exists', 'error')
return redirect(url_for('files.browser', folder_id=parent_id if parent_id else None))
# Create folder
folder = Folder(
name=folder_name,
parent_id=parent_id if parent_id else None,
user_id=current_user.id
)
db.session.add(folder)
db.session.commit()
if is_ajax:
return jsonify({
'success': True,
'folder': {
'id': folder.id,
'name': folder.name,
'created_at': folder.created_at.strftime('%Y-%m-%d %H:%M:%S')
}
})
flash(f'Folder "{folder_name}" created successfully', 'success')
return redirect(url_for('files.browser', folder_id=parent_id if parent_id else None))
except Exception as e:
current_app.logger.error(f"Error creating folder: {str(e)}")
if is_ajax:
return jsonify({'success': False, 'error': f'Error creating folder: {str(e)}'}), 500
flash(f'Error creating folder: {str(e)}', 'error')
return redirect(url_for('files.browser', folder_id=parent_id if parent_id else None))
@bp.route('/rename', methods=['POST'])
@login_required
def rename_item():
"""Rename a file or folder"""
try:
# Get JSON data
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
item_id = data.get('item_id')
new_name = data.get('new_name')
# Validation
if not item_id or not new_name or new_name.strip() == '':
return jsonify({'error': 'Item ID and new name are required'}), 400
# Determine if it's a file or folder
file = File.query.filter_by(id=item_id, user_id=current_user.id).first()
folder = Folder.query.filter_by(id=item_id, user_id=current_user.id).first()
if file:
# For files, we need to handle the file system and database
old_path = file.path
file_dir = os.path.dirname(old_path)
# Create safe name
safe_name = secure_filename(new_name)
# Check for duplicates
existing_file = File.query.filter_by(
name=safe_name,
folder_id=file.folder_id,
user_id=current_user.id
).filter(File.id != file.id).first()
if existing_file:
return jsonify({'error': 'A file with this name already exists'}), 400
# Update file path
new_path = os.path.join(file_dir, safe_name)
# Rename file on disk
try:
if os.path.exists(old_path):
os.rename(old_path, new_path)
except OSError as e:
return jsonify({'error': f'Error renaming file: {str(e)}'}), 500
# Update database
file.name = safe_name
file.path = new_path
db.session.commit()
return jsonify({
'success': True,
'message': 'File renamed successfully',
'new_name': safe_name
})
elif folder:
# For folders, we just update the database
# Check for duplicates
existing_folder = Folder.query.filter_by(
name=new_name,
parent_id=folder.parent_id,
user_id=current_user.id
).filter(Folder.id != folder.id).first()
if existing_folder:
return jsonify({'error': 'A folder with this name already exists'}), 400
# Update folder name
folder.name = new_name
db.session.commit()
return jsonify({
'success': True,
'message': 'Folder renamed successfully',
'new_name': new_name
})
else:
return jsonify({'error': 'Item not found'}), 404
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error renaming item: {str(e)}")
return jsonify({'error': str(e)}), 500
@bp.route('/delete/<item_id>', methods=['POST'])
@login_required
def delete_item(item_id):
"""Delete a file or folder"""
try:
# Check if item exists
if not item_id or item_id == 'null':
return jsonify({'error': 'Invalid item ID'}), 400
# Determine if it's a file or folder
file = File.query.filter_by(id=item_id, user_id=current_user.id).first()
folder = Folder.query.filter_by(id=item_id, user_id=current_user.id).first()
if file:
# Delete file from storage
try:
if os.path.exists(file.path):
os.remove(file.path)
except OSError as e:
current_app.logger.error(f"Error deleting file from disk: {str(e)}")
# Delete from database
db.session.delete(file)
db.session.commit()
return jsonify({'success': True, 'message': 'File deleted successfully'})
elif folder:
# Check if folder has contents
has_files = File.query.filter_by(folder_id=folder.id).first() is not None
has_subfolders = Folder.query.filter_by(parent_id=folder.id).first() is not None
if has_files or has_subfolders:
# Delete recursively
delete_folder_recursive(folder.id)
return jsonify({'success': True, 'message': 'Folder and contents deleted successfully'})
else:
# Empty folder, simple delete
db.session.delete(folder)
db.session.commit()
return jsonify({'success': True, 'message': 'Folder deleted successfully'})
else:
return jsonify({'error': 'Item not found'}), 404
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error deleting item: {str(e)}")
return jsonify({'error': str(e)}), 500
def delete_folder_recursive(folder_id):
"""Recursively delete a folder and its contents"""
# Delete all files in the folder
files = File.query.filter_by(folder_id=folder_id).all()
for file in files:
try:
if os.path.exists(file.path):
os.remove(file.path)
except OSError:
pass
db.session.delete(file)
# Recursively delete subfolders
subfolders = Folder.query.filter_by(parent_id=folder_id).all()
for subfolder in subfolders:
delete_folder_recursive(subfolder.id)
# Delete the folder itself
folder = Folder.query.get(folder_id)
if folder:
db.session.delete(folder)
@bp.route('/upload_xhr', methods=['POST'])
@login_required
def upload_xhr():
"""Handle XHR file uploads with improved error handling"""
try:
# Get parent folder ID if provided
parent_folder_id = request.form.get('parent_folder_id')
parent_folder = None
if parent_folder_id:
parent_folder = Folder.query.get(parent_folder_id)
if not parent_folder or parent_folder.user_id != current_user.id:
return jsonify({'error': 'Invalid parent folder'}), 400
# Check if files were uploaded
if 'files[]' not in request.files:
return jsonify({'error': 'No files in request'}), 400
files = request.files.getlist('files[]')
if not files or len(files) == 0 or files[0].filename == '':
return jsonify({'error': 'No files selected'}), 400
# Process files
uploaded_files = []
for file in files:
# Handle folder uploads by parsing the path
path_parts = []
if '/' in file.filename:
# This is a file in a folder structure
path_parts = file.filename.split('/')
filename = path_parts[-1] # Last part is the actual filename
# Create folder structure
current_parent = parent_folder
for i, folder_name in enumerate(path_parts[:-1]):
if not folder_name: # Skip empty folder names
continue
# Check if folder already exists
existing_folder = Folder.query.filter_by(
name=folder_name,
parent_id=current_parent.id if current_parent else None,
user_id=current_user.id
).first()
if existing_folder:
current_parent = existing_folder
else:
# Create new folder
new_folder = Folder(
name=folder_name,
parent_id=current_parent.id if current_parent else None,
user_id=current_user.id
)
db.session.add(new_folder)
db.session.flush() # Get ID without committing
current_parent = new_folder
else:
# Regular file upload
filename = file.filename
current_parent = parent_folder
# Save the file
if not filename:
continue # Skip files with empty names
secure_name = secure_filename(filename)
# Check for duplicates
existing_file = File.query.filter_by(
name=secure_name,
folder_id=current_parent.id if current_parent else None,
user_id=current_user.id
).first()
if existing_file:
# Append timestamp to avoid overwrite
name_parts = secure_name.rsplit('.', 1)
if len(name_parts) > 1:
secure_name = f"{name_parts[0]}_{int(time.time())}.{name_parts[1]}"
else:
secure_name = f"{secure_name}_{int(time.time())}"
# Create file path
file_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], str(current_user.id))
# Ensure directory exists
os.makedirs(file_dir, exist_ok=True)
file_path = os.path.join(file_dir, secure_name)
# Save file to disk
file.save(file_path)
# Create file record in database
file_size = os.path.getsize(file_path)
file_type = file.content_type or 'application/octet-stream'
new_file = File(
name=secure_name,
original_name=filename,
path=file_path,
size=file_size,
type=file_type,
folder_id=current_parent.id if current_parent else None,
user_id=current_user.id
)
db.session.add(new_file)
uploaded_files.append({
'id': None, # Will be set after commit
'name': secure_name,
'size': file_size,
'type': file_type
})
# Commit all changes
db.session.commit()
# Update file IDs for response
for i, file_data in enumerate(uploaded_files):
if i < len(db.session.new):
file_data['id'] = db.session.new[i].id
return jsonify({
'success': True,
'message': f'Successfully uploaded {len(uploaded_files)} files',
'files': uploaded_files
})
except Exception as e:
# Log the error for debugging
current_app.logger.error(f"Upload error: {str(e)}")
db.session.rollback()
return jsonify({'error': str(e)}), 500
@bp.route('/move', methods=['POST'])
@login_required
def move_item():
"""Move a file or folder to another folder"""
data = request.get_json()
item_id = data.get('item_id')
item_type = data.get('item_type')
target_folder_id = data.get('target_folder_id')
if not item_id or not item_type or not target_folder_id:
return jsonify({'success': False, 'error': 'Missing parameters'}), 400
# Get target folder
target_folder = Folder.query.get_or_404(target_folder_id)
# Check if user has permission
if target_folder.user_id != current_user.id:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
# Move file or folder
if item_type == 'file':
file = File.query.get_or_404(item_id)
# Check if user has permission
if file.user_id != current_user.id:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
# Check for circular reference
if file.id == target_folder.id:
return jsonify({'success': False, 'error': 'Cannot move a file to itself'}), 400
# Update file's folder_id
file.folder_id = target_folder.id
elif item_type == 'folder':
folder = Folder.query.get_or_404(item_id)
# Check if user has permission
if folder.user_id != current_user.id:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
# Check for circular reference
if folder.id == target_folder.id or target_folder.id == folder.id:
return jsonify({'success': False, 'error': 'Cannot move a folder to itself'}), 400
# Check if target is a descendant of the folder
if is_descendant(target_folder, folder):
return jsonify({'success': False, 'error': 'Cannot move a folder to its descendant'}), 400
# Update folder's parent_id
folder.parent_id = target_folder.id
# Save changes
db.session.commit()
return jsonify({'success': True})
def is_descendant(folder, potential_ancestor):
"""Check if a folder is a descendant of another folder"""
current = folder
while current.parent_id is not None:
if current.parent_id == potential_ancestor.id:
return True
current = Folder.query.get(current.parent_id)
return False
# Import the helper functions from __init__.py
from app import get_file_icon, format_file_size