Flask-Files/app/routes/files.py
2025-03-23 03:29:05 +01:00

746 lines
No EOL
28 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, send_from_directory, abort, jsonify, send_file, current_app
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app import db
from app.models import File, Share, Folder
from config import Config
import os
from datetime import datetime, timedelta
import uuid
import mimetypes
import shutil
import json
import time
bp = Blueprint('files', __name__, url_prefix='/files')
@bp.route('/')
@bp.route('/browser')
@bp.route('/browser/<int:folder_id>')
@login_required
def browser(folder_id=None):
"""Display file browser interface"""
current_folder = None
breadcrumbs = []
if folder_id:
current_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Generate breadcrumbs
breadcrumbs = []
parent = current_folder
while parent:
breadcrumbs.append(parent)
parent = parent.parent
breadcrumbs.reverse()
# For initial load - only get folders and files if it's not an AJAX request
if not request.headers.get('X-Requested-With') == 'XMLHttpRequest':
if current_folder:
folders = Folder.query.filter_by(parent_id=current_folder.id, user_id=current_user.id).all()
files = File.query.filter_by(folder_id=current_folder.id, user_id=current_user.id).all()
else:
folders = Folder.query.filter_by(parent_id=None, user_id=current_user.id).all()
files = File.query.filter_by(folder_id=None, user_id=current_user.id).all()
# Get the search query if provided
query = request.args.get('q', '')
if query:
# Filter folders and files by name containing the query
folders = [f for f in folders if query.lower() in f.name.lower()]
files = [f for f in files if query.lower() in f.name.lower()]
return render_template('files/browser.html',
current_folder=current_folder,
breadcrumbs=breadcrumbs,
folders=folders,
files=files)
else:
# For AJAX request, return just the folder contents
# Implement this if needed
pass
@bp.route('/contents')
@login_required
def folder_contents():
"""Returns the HTML for folder contents (used for AJAX loading)"""
folder_id = request.args.get('folder_id', type=int)
sort_by = request.args.get('sort', 'name')
sort_order = request.args.get('order', 'asc')
search_query = request.args.get('search', '')
# Get the current folder if a folder_id is provided
current_folder = None
if folder_id:
current_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Base query for folders and files
folders_query = Folder.query.filter_by(user_id=current_user.id)
files_query = File.query.filter_by(user_id=current_user.id)
# Filter by parent folder
if current_folder:
folders_query = folders_query.filter_by(parent_id=current_folder.id)
files_query = files_query.filter_by(folder_id=current_folder.id)
else:
folders_query = folders_query.filter_by(parent_id=None)
files_query = files_query.filter_by(folder_id=None)
# Apply search if provided
if search_query:
folders_query = folders_query.filter(Folder.name.ilike(f'%{search_query}%'))
files_query = files_query.filter(File.name.ilike(f'%{search_query}%'))
# Apply sorting
if sort_by == 'name':
folders_query = folders_query.order_by(Folder.name.asc() if sort_order == 'asc' else Folder.name.desc())
files_query = files_query.order_by(File.name.asc() if sort_order == 'asc' else File.name.desc())
elif sort_by == 'date':
folders_query = folders_query.order_by(Folder.updated_at.desc() if sort_order == 'asc' else Folder.updated_at.asc())
files_query = files_query.order_by(File.updated_at.desc() if sort_order == 'asc' else File.updated_at.asc())
elif sort_by == 'size':
# Folders always come first, then sort files by size
files_query = files_query.order_by(File.size.asc() if sort_order == 'asc' else File.size.desc())
# Get the results
folders = folders_query.all()
files = files_query.all()
# Different format of response based on requested format
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# If AJAX request, check if JSON or HTML is requested
if request.args.get('format') == 'json':
# Return JSON response with folder and file data
folder_data = [{
'id': folder.id,
'name': folder.name,
'updated_at': folder.updated_at.isoformat(),
'is_folder': True,
'item_count': folder.files.count(),
'url': url_for('files.browser', folder_id=folder.id)
} for folder in folders]
file_data = [{
'id': file.id,
'name': file.name,
'size': file.size,
'formatted_size': format_file_size(file.size),
'type': file.type,
'updated_at': file.updated_at.isoformat(),
'is_folder': False,
'icon': file.icon_class,
'url': url_for('files.download', file_id=file.id)
} for file in files]
return jsonify({
'folders': folder_data,
'files': file_data,
'total_items': len(folder_data) + len(file_data)
})
else:
# Return HTML partial
return render_template('files/partials/folder_contents.html',
current_folder=current_folder,
folders=folders,
files=files)
else:
# If not AJAX, redirect to browser view
if current_folder:
return redirect(url_for('files.browser', folder_id=current_folder.id))
else:
return redirect(url_for('files.browser'))
@bp.route('/upload', methods=['GET', 'POST'])
@bp.route('/upload/<int:folder_id>', methods=['GET', 'POST'])
@login_required
def upload(folder_id=None):
"""Page for uploading files"""
if request.method == 'POST':
# Handle file upload
if 'file' not in request.files:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'No file part'}), 400
flash('No file part', 'error')
return redirect(request.url)
file = request.files['file']
# Validate filename
if file.filename == '':
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'No selected file'}), 400
flash('No selected file', 'error')
return redirect(request.url)
# Get the parent folder
parent_folder = None
if folder_id:
parent_folder = Folder.query.get_or_404(folder_id)
# Check if user has permission
if parent_folder.user_id != current_user.id:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'You do not have permission to upload to this folder'}), 403
flash('You do not have permission to upload to this folder', 'error')
return redirect(url_for('files.browser'))
# Process the file
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# Generate a unique filename
file_uuid = str(uuid.uuid4())
_, file_extension = os.path.splitext(filename)
storage_name = f"{file_uuid}{file_extension}"
# Create storage path
upload_folder = current_app.config['UPLOAD_FOLDER']
user_folder = os.path.join(upload_folder, str(current_user.id))
os.makedirs(user_folder, exist_ok=True)
storage_path = os.path.join(user_folder, storage_name)
try:
# Save file to storage location
file.save(storage_path)
# Get file info
file_size = os.path.getsize(storage_path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Create file record
db_file = File(
name=filename,
original_name=filename,
path=storage_path,
size=file_size,
type=mime_type,
user_id=current_user.id,
folder_id=parent_folder.id if parent_folder else None
)
db.session.add(db_file)
db.session.commit()
# Return success response with file info
return jsonify({
'success': True,
'file': {
'id': db_file.id,
'name': db_file.name,
'size': db_file.size,
'formatted_size': format_file_size(db_file.size),
'type': db_file.type,
'icon': db_file.icon_class
}
})
except Exception as e:
# Make sure to remove the file if there was an error
if os.path.exists(storage_path):
os.remove(storage_path)
current_app.logger.error(f"Upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
else:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'File type not allowed'}), 400
flash('File type not allowed', 'error')
return redirect(request.url)
# GET request - show upload form
return render_template('files/upload.html', folder_id=folder_id)
@bp.route('/upload_folder', methods=['POST'])
@login_required
def upload_folder():
"""Handle folder upload - this processes ZIP files uploaded as folders"""
folder_id = request.form.get('folder_id', None, type=int)
parent_folder = None
if folder_id:
parent_folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
# Check if folder data was provided
if 'folder_data' not in request.form:
return jsonify({'error': 'No folder data provided'}), 400
try:
folder_data = json.loads(request.form['folder_data'])
folder_name = secure_filename(folder_data.get('name', 'Unnamed Folder'))
# Create folder record
folder = Folder(
name=folder_name,
user_id=current_user.id,
parent_id=parent_folder.id if parent_folder else None
)
db.session.add(folder)
db.session.flush() # Get folder.id without committing
# Process files
file_records = []
total_size = 0
# Create temp dir for uploaded files
temp_dir = os.path.join(Config.UPLOAD_FOLDER, 'temp', str(uuid.uuid4()))
os.makedirs(temp_dir, exist_ok=True)
try:
for file_info in folder_data.get('files', []):
original_path = file_info.get('path', '')
# Skip files without a valid path
if not original_path:
continue
# Get relative path within the folder
path_parts = original_path.split('/')
# Create intermediate folders as needed
current_parent_id = folder.id
for i, part in enumerate(path_parts[:-1]):
part = secure_filename(part)
# Skip empty parts
if not part:
continue
# Check if folder already exists
subfolder = Folder.query.filter_by(
name=part,
parent_id=current_parent_id,
user_id=current_user.id
).first()
if not subfolder:
# Create new subfolder
subfolder = Folder(
name=part,
user_id=current_user.id,
parent_id=current_parent_id
)
db.session.add(subfolder)
db.session.flush()
current_parent_id = subfolder.id
# Process the file if it exists in the request
file_uid = file_info.get('id')
if file_uid and file_uid in request.files:
file = request.files[file_uid]
filename = secure_filename(path_parts[-1])
# Generate UUID for storage
file_uuid = str(uuid.uuid4())
storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid)
# Save file
file.save(storage_path)
# Get file info
file_size = os.path.getsize(storage_path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Create file record
db_file = File(
name=filename,
original_name=filename,
path=storage_path,
size=file_size,
type=mime_type,
user_id=current_user.id,
folder_id=current_parent_id
)
db.session.add(db_file)
file_records.append(db_file)
total_size += file_size
# Update folder size (sum of all files in the folder and subfolders)
folder.size = total_size
# Commit all changes
db.session.commit()
# Return success with folder info
return jsonify({
'success': True,
'folder': {
'id': folder.id,
'name': folder.name
}
})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Folder upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
# Clean up temp directory
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
current_app.logger.error(f"Folder upload parsing error: {str(e)}")
return jsonify({'error': str(e)}), 500
@bp.route('/download/<int:file_id>')
@login_required
def download(file_id):
"""Download a file"""
file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404()
# Can't download folders directly
if file.folder:
flash('Cannot download folders directly. Please use the ZIP option.', 'warning')
return redirect(url_for('files.browser', folder_id=file.folder.id))
# Check if file exists in storage
storage_path = file.path
if not os.path.exists(storage_path):
flash('File not found in storage', 'error')
return redirect(url_for('files.browser', folder_id=file.folder_id))
# Return the file
return send_file(
storage_path,
download_name=file.name,
as_attachment=True
)
@bp.route('/create_folder', methods=['POST'])
@login_required
def create_folder():
"""Create a new folder"""
name = request.form.get('name')
parent_id = request.form.get('parent_id')
if not name:
flash('Folder name is required', 'danger')
return redirect(url_for('files.browser'))
# Create folder
folder = Folder(
name=name,
user_id=current_user.id,
parent_id=parent_id if parent_id else None
)
try:
db.session.add(folder)
db.session.commit()
flash(f'Folder "{name}" created successfully', 'success')
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error creating folder: {str(e)}")
flash('Error creating folder', 'danger')
# Redirect to appropriate location
if parent_id:
return redirect(url_for('files.browser', folder_id=parent_id))
return redirect(url_for('files.browser'))
@bp.route('/rename', methods=['POST'])
@login_required
def rename_item():
"""Rename a file or folder"""
try:
# Get JSON data
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
item_id = data.get('item_id')
new_name = data.get('new_name')
# Validation
if not item_id or not new_name or new_name.strip() == '':
return jsonify({'error': 'Item ID and new name are required'}), 400
# Determine if it's a file or folder
file = File.query.filter_by(id=item_id, user_id=current_user.id).first()
folder = Folder.query.filter_by(id=item_id, user_id=current_user.id).first()
if file:
# For files, we need to handle the file system and database
old_path = file.path
file_dir = os.path.dirname(old_path)
# Create safe name
safe_name = secure_filename(new_name)
# Check for duplicates
existing_file = File.query.filter_by(
name=safe_name,
folder_id=file.folder_id,
user_id=current_user.id
).filter(File.id != file.id).first()
if existing_file:
return jsonify({'error': 'A file with this name already exists'}), 400
# Update file path
new_path = os.path.join(file_dir, safe_name)
# Rename file on disk
try:
if os.path.exists(old_path):
os.rename(old_path, new_path)
except OSError as e:
return jsonify({'error': f'Error renaming file: {str(e)}'}), 500
# Update database
file.name = safe_name
file.path = new_path
db.session.commit()
return jsonify({
'success': True,
'message': 'File renamed successfully',
'new_name': safe_name
})
elif folder:
# For folders, we just update the database
# Check for duplicates
existing_folder = Folder.query.filter_by(
name=new_name,
parent_id=folder.parent_id,
user_id=current_user.id
).filter(Folder.id != folder.id).first()
if existing_folder:
return jsonify({'error': 'A folder with this name already exists'}), 400
# Update folder name
folder.name = new_name
db.session.commit()
return jsonify({
'success': True,
'message': 'Folder renamed successfully',
'new_name': new_name
})
else:
return jsonify({'error': 'Item not found'}), 404
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error renaming item: {str(e)}")
return jsonify({'error': str(e)}), 500
@bp.route('/delete/<item_id>', methods=['POST'])
@login_required
def delete_item(item_id):
"""Delete a file or folder"""
try:
# Check if item exists
if not item_id or item_id == 'null':
return jsonify({'error': 'Invalid item ID'}), 400
# Determine if it's a file or folder
file = File.query.filter_by(id=item_id, user_id=current_user.id).first()
folder = Folder.query.filter_by(id=item_id, user_id=current_user.id).first()
if file:
# Delete file from storage
try:
if os.path.exists(file.path):
os.remove(file.path)
except OSError as e:
current_app.logger.error(f"Error deleting file from disk: {str(e)}")
# Delete from database
db.session.delete(file)
db.session.commit()
return jsonify({'success': True, 'message': 'File deleted successfully'})
elif folder:
# Check if folder has contents
has_files = File.query.filter_by(folder_id=folder.id).first() is not None
has_subfolders = Folder.query.filter_by(parent_id=folder.id).first() is not None
if has_files or has_subfolders:
# Delete recursively
delete_folder_recursive(folder.id)
return jsonify({'success': True, 'message': 'Folder and contents deleted successfully'})
else:
# Empty folder, simple delete
db.session.delete(folder)
db.session.commit()
return jsonify({'success': True, 'message': 'Folder deleted successfully'})
else:
return jsonify({'error': 'Item not found'}), 404
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error deleting item: {str(e)}")
return jsonify({'error': str(e)}), 500
def delete_folder_recursive(folder_id):
"""Recursively delete a folder and its contents"""
# Delete all files in the folder
files = File.query.filter_by(folder_id=folder_id).all()
for file in files:
try:
if os.path.exists(file.path):
os.remove(file.path)
except OSError:
pass
db.session.delete(file)
# Recursively delete subfolders
subfolders = Folder.query.filter_by(parent_id=folder_id).all()
for subfolder in subfolders:
delete_folder_recursive(subfolder.id)
# Delete the folder itself
folder = Folder.query.get(folder_id)
if folder:
db.session.delete(folder)
@bp.route('/upload_xhr', methods=['POST'])
@login_required
def upload_xhr():
"""Handle XHR file uploads with improved error handling"""
try:
# Get parent folder ID if provided
parent_folder_id = request.form.get('parent_folder_id')
parent_folder = None
if parent_folder_id:
parent_folder = Folder.query.get(parent_folder_id)
if not parent_folder or parent_folder.user_id != current_user.id:
return jsonify({'error': 'Invalid parent folder'}), 400
# Check if files were uploaded
if 'files[]' not in request.files:
return jsonify({'error': 'No files in request'}), 400
files = request.files.getlist('files[]')
if not files or len(files) == 0 or files[0].filename == '':
return jsonify({'error': 'No files selected'}), 400
# Process files
uploaded_files = []
for file in files:
# Handle folder uploads by parsing the path
path_parts = []
if '/' in file.filename:
# This is a file in a folder structure
path_parts = file.filename.split('/')
filename = path_parts[-1] # Last part is the actual filename
# Create folder structure
current_parent = parent_folder
for i, folder_name in enumerate(path_parts[:-1]):
if not folder_name: # Skip empty folder names
continue
# Check if folder already exists
existing_folder = Folder.query.filter_by(
name=folder_name,
parent_id=current_parent.id if current_parent else None,
user_id=current_user.id
).first()
if existing_folder:
current_parent = existing_folder
else:
# Create new folder
new_folder = Folder(
name=folder_name,
parent_id=current_parent.id if current_parent else None,
user_id=current_user.id
)
db.session.add(new_folder)
db.session.flush() # Get ID without committing
current_parent = new_folder
else:
# Regular file upload
filename = file.filename
current_parent = parent_folder
# Save the file
if not filename:
continue # Skip files with empty names
secure_name = secure_filename(filename)
# Check for duplicates
existing_file = File.query.filter_by(
name=secure_name,
folder_id=current_parent.id if current_parent else None,
user_id=current_user.id
).first()
if existing_file:
# Append timestamp to avoid overwrite
name_parts = secure_name.rsplit('.', 1)
if len(name_parts) > 1:
secure_name = f"{name_parts[0]}_{int(time.time())}.{name_parts[1]}"
else:
secure_name = f"{secure_name}_{int(time.time())}"
# Create file path
file_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], str(current_user.id))
# Ensure directory exists
os.makedirs(file_dir, exist_ok=True)
file_path = os.path.join(file_dir, secure_name)
# Save file to disk
file.save(file_path)
# Create file record in database
file_size = os.path.getsize(file_path)
file_type = file.content_type or 'application/octet-stream'
new_file = File(
name=secure_name,
original_name=filename,
path=file_path,
size=file_size,
type=file_type,
folder_id=current_parent.id if current_parent else None,
user_id=current_user.id
)
db.session.add(new_file)
uploaded_files.append({
'id': None, # Will be set after commit
'name': secure_name,
'size': file_size,
'type': file_type
})
# Commit all changes
db.session.commit()
# Update file IDs for response
for i, file_data in enumerate(uploaded_files):
if i < len(db.session.new):
file_data['id'] = db.session.new[i].id
return jsonify({
'success': True,
'message': f'Successfully uploaded {len(uploaded_files)} files',
'files': uploaded_files
})
except Exception as e:
# Log the error for debugging
current_app.logger.error(f"Upload error: {str(e)}")
db.session.rollback()
return jsonify({'error': str(e)}), 500
# Import the helper functions from __init__.py
from app import get_file_icon, format_file_size