Flask-Files/app/routes/files.py
2025-03-23 01:31:21 +01:00

524 lines
No EOL
20 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, send_from_directory, abort, jsonify, send_file, current_app
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app import db
from app.models import File, Share
from config import Config
import os
from datetime import datetime, timedelta
import uuid
import mimetypes
import shutil
import json
files_bp = Blueprint('files', __name__, url_prefix='/files')
@files_bp.route('/')
@files_bp.route('/browser')
@files_bp.route('/browser/<int:folder_id>')
@login_required
def browser(folder_id=None):
"""Display file browser interface"""
current_folder = None
breadcrumbs = []
if folder_id:
current_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
# Generate breadcrumbs
breadcrumbs = []
parent = current_folder
while parent:
breadcrumbs.append(parent)
parent = parent.parent
breadcrumbs.reverse()
# For initial load - only get folders and files if it's not an AJAX request
if not request.headers.get('X-Requested-With') == 'XMLHttpRequest':
if current_folder:
folders = File.query.filter_by(parent_id=current_folder.id, user_id=current_user.id, is_folder=True).all()
files = File.query.filter_by(parent_id=current_folder.id, user_id=current_user.id, is_folder=False).all()
else:
folders = File.query.filter_by(parent_id=None, user_id=current_user.id, is_folder=True).all()
files = File.query.filter_by(parent_id=None, user_id=current_user.id, is_folder=False).all()
return render_template('files/browser.html',
current_folder=current_folder,
breadcrumbs=breadcrumbs,
folders=folders,
files=files)
else:
# If it's an AJAX request, return JSON
return jsonify({'error': 'Use the /contents endpoint for AJAX requests'})
@files_bp.route('/contents')
@login_required
def folder_contents():
"""Returns the HTML for folder contents (used for AJAX loading)"""
folder_id = request.args.get('folder_id', type=int)
sort_by = request.args.get('sort', 'name')
sort_order = request.args.get('order', 'asc')
search_query = request.args.get('search', '')
# Get the current folder if a folder_id is provided
current_folder = None
if folder_id:
current_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
# Base query for folders and files
folders_query = File.query.filter_by(user_id=current_user.id, is_folder=True)
files_query = File.query.filter_by(user_id=current_user.id, is_folder=False)
# Filter by parent folder
if current_folder:
folders_query = folders_query.filter_by(parent_id=current_folder.id)
files_query = files_query.filter_by(parent_id=current_folder.id)
else:
folders_query = folders_query.filter_by(parent_id=None)
files_query = files_query.filter_by(parent_id=None)
# Apply search if provided
if search_query:
folders_query = folders_query.filter(File.name.ilike(f'%{search_query}%'))
files_query = files_query.filter(File.name.ilike(f'%{search_query}%'))
# Apply sorting
if sort_by == 'name':
folders_query = folders_query.order_by(File.name.asc() if sort_order == 'asc' else File.name.desc())
files_query = files_query.order_by(File.name.asc() if sort_order == 'asc' else File.name.desc())
elif sort_by == 'date':
folders_query = folders_query.order_by(File.updated_at.desc() if sort_order == 'asc' else File.updated_at.asc())
files_query = files_query.order_by(File.updated_at.desc() if sort_order == 'asc' else File.updated_at.asc())
elif sort_by == 'size':
# Folders always come first, then sort files by size
files_query = files_query.order_by(File.size.asc() if sort_order == 'asc' else File.size.desc())
# Get the results
folders = folders_query.all()
files = files_query.all()
# Different format of response based on requested format
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# If AJAX request, check if JSON or HTML is requested
if request.args.get('format') == 'json':
# Return JSON response with folder and file data
folder_data = [{
'id': folder.id,
'name': folder.name,
'updated_at': folder.updated_at.isoformat(),
'is_folder': True,
'item_count': folder.children.count(),
'url': url_for('files.browser', folder_id=folder.id)
} for folder in folders]
file_data = [{
'id': file.id,
'name': file.name,
'size': file.size,
'formatted_size': format_file_size(file.size),
'mime_type': file.mime_type,
'updated_at': file.updated_at.isoformat(),
'is_folder': False,
'icon': get_file_icon(file.mime_type, file.name),
'url': url_for('files.download', file_id=file.id)
} for file in files]
return jsonify({
'folders': folder_data,
'files': file_data,
'total_items': len(folder_data) + len(file_data)
})
else:
# Return HTML partial
return render_template('files/partials/folder_contents.html',
current_folder=current_folder,
folders=folders,
files=files)
else:
# If not AJAX, redirect to browser view
if current_folder:
return redirect(url_for('files.browser', folder_id=current_folder.id))
else:
return redirect(url_for('files.browser'))
@files_bp.route('/upload', methods=['GET', 'POST'])
@files_bp.route('/upload/<int:folder_id>', methods=['GET', 'POST'])
@login_required
def upload(folder_id=None):
"""Page for uploading files"""
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
if request.method == 'POST':
# Handle XHR upload
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Check if file was included
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
# Check if the file was actually selected
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
# Validate and save file
filename = secure_filename(file.filename)
# Generate UUID for storage
file_uuid = str(uuid.uuid4())
storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid)
try:
# Save file to storage location
file.save(storage_path)
# Get file info
file_size = os.path.getsize(storage_path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Create file record
db_file = File(
name=filename,
storage_name=file_uuid,
mime_type=mime_type,
size=file_size,
user_id=current_user.id,
parent_id=parent_folder.id if parent_folder else None,
is_folder=False
)
db.session.add(db_file)
db.session.commit()
# Return success response with file info
return jsonify({
'success': True,
'file': {
'id': db_file.id,
'name': db_file.name,
'size': db_file.size,
'formatted_size': format_file_size(db_file.size),
'mime_type': db_file.mime_type,
'icon': get_file_icon(db_file.mime_type, db_file.name)
}
})
except Exception as e:
# Make sure to remove the file if there was an error
if os.path.exists(storage_path):
os.remove(storage_path)
current_app.logger.error(f"Upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
else:
# Regular form POST (non-XHR) - redirect to browser
flash('Please use the browser interface to upload files', 'info')
if parent_folder:
return redirect(url_for('files.browser', folder_id=parent_folder.id))
else:
return redirect(url_for('files.browser'))
# GET request - show upload page
return render_template('files/upload.html',
parent_folder=parent_folder,
title="Upload Files")
@files_bp.route('/upload_folder', methods=['POST'])
@login_required
def upload_folder():
"""Handle folder upload - this processes ZIP files uploaded as folders"""
folder_id = request.form.get('folder_id', None, type=int)
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
# Check if folder data was provided
if 'folder_data' not in request.form:
return jsonify({'error': 'No folder data provided'}), 400
try:
folder_data = json.loads(request.form['folder_data'])
folder_name = secure_filename(folder_data.get('name', 'Unnamed Folder'))
# Create folder record
folder = File(
name=folder_name,
is_folder=True,
user_id=current_user.id,
parent_id=parent_folder.id if parent_folder else None,
size=0,
mime_type=None
)
db.session.add(folder)
db.session.flush() # Get folder.id without committing
# Process files
file_records = []
total_size = 0
# Create temp dir for uploaded files
temp_dir = os.path.join(Config.UPLOAD_FOLDER, 'temp', str(uuid.uuid4()))
os.makedirs(temp_dir, exist_ok=True)
try:
for file_info in folder_data.get('files', []):
original_path = file_info.get('path', '')
# Skip files without a valid path
if not original_path:
continue
# Get relative path within the folder
path_parts = original_path.split('/')
# Create intermediate folders as needed
current_parent_id = folder.id
for i, part in enumerate(path_parts[:-1]):
part = secure_filename(part)
# Skip empty parts
if not part:
continue
# Check if folder already exists
subfolder = File.query.filter_by(
name=part,
parent_id=current_parent_id,
user_id=current_user.id,
is_folder=True
).first()
if not subfolder:
# Create new subfolder
subfolder = File(
name=part,
is_folder=True,
user_id=current_user.id,
parent_id=current_parent_id,
size=0,
mime_type=None
)
db.session.add(subfolder)
db.session.flush()
current_parent_id = subfolder.id
# Process the file if it exists in the request
file_uid = file_info.get('id')
if file_uid and file_uid in request.files:
file = request.files[file_uid]
filename = secure_filename(path_parts[-1])
# Generate UUID for storage
file_uuid = str(uuid.uuid4())
storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid)
# Save file
file.save(storage_path)
# Get file info
file_size = os.path.getsize(storage_path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Create file record
db_file = File(
name=filename,
storage_name=file_uuid,
mime_type=mime_type,
size=file_size,
user_id=current_user.id,
parent_id=current_parent_id,
is_folder=False
)
db.session.add(db_file)
file_records.append(db_file)
total_size += file_size
# Update folder size (sum of all files in the folder and subfolders)
folder.size = total_size
# Commit all changes
db.session.commit()
# Return success with folder info
return jsonify({
'success': True,
'folder': {
'id': folder.id,
'name': folder.name
}
})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Folder upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
# Clean up temp directory
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
current_app.logger.error(f"Folder upload parsing error: {str(e)}")
return jsonify({'error': str(e)}), 500
@files_bp.route('/download/<int:file_id>')
@login_required
def download(file_id):
"""Download a file"""
file = File.query.filter_by(id=file_id, user_id=current_user.id, is_folder=False).first_or_404()
# Can't download folders directly
if file.is_folder:
flash('Cannot download folders directly. Please use the ZIP option.', 'warning')
return redirect(url_for('files.browser', folder_id=file.id))
# Check if file exists in storage
storage_path = os.path.join(Config.UPLOAD_FOLDER, file.storage_name)
if not os.path.exists(storage_path):
flash('File not found in storage', 'error')
return redirect(url_for('files.browser', folder_id=file.parent_id))
# Return the file
return send_file(
storage_path,
download_name=file.name,
as_attachment=True
)
@files_bp.route('/create_folder', methods=['POST'])
@login_required
def create_folder():
"""Create a new folder"""
parent_id = request.form.get('parent_id', type=int)
folder_name = request.form.get('name', '').strip()
if not folder_name:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'Folder name is required'}), 400
else:
flash('Folder name is required', 'error')
return redirect(url_for('files.browser', folder_id=parent_id))
# Sanitize folder name
folder_name = secure_filename(folder_name)
# Check if folder already exists
parent = None
if parent_id:
parent = File.query.filter_by(id=parent_id, user_id=current_user.id, is_folder=True).first_or_404()
existing = File.query.filter_by(
name=folder_name,
parent_id=parent_id,
user_id=current_user.id,
is_folder=True
).first()
else:
existing = File.query.filter_by(
name=folder_name,
parent_id=None,
user_id=current_user.id,
is_folder=True
).first()
if existing:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'A folder with this name already exists'}), 400
else:
flash('A folder with this name already exists', 'error')
return redirect(url_for('files.browser', folder_id=parent_id))
# Create new folder
new_folder = File(
name=folder_name,
is_folder=True,
user_id=current_user.id,
parent_id=parent_id
)
db.session.add(new_folder)
db.session.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'success': True,
'folder': {
'id': new_folder.id,
'name': new_folder.name,
'url': url_for('files.browser', folder_id=new_folder.id)
}
})
else:
flash('Folder created successfully', 'success')
return redirect(url_for('files.browser', folder_id=parent_id))
@files_bp.route('/rename/<int:item_id>', methods=['POST'])
@login_required
def rename(item_id):
"""Rename a file or folder"""
item = File.query.filter_by(id=item_id, user_id=current_user.id).first_or_404()
new_name = request.form.get('name', '').strip()
if not new_name:
return jsonify({'error': 'Name is required'}), 400
# Sanitize name
new_name = secure_filename(new_name)
# Check if a file/folder with this name already exists in the same location
existing = File.query.filter(
File.name == new_name,
File.parent_id == item.parent_id,
File.user_id == current_user.id,
File.is_folder == item.is_folder,
File.id != item.id
).first()
if existing:
return jsonify({'error': 'An item with this name already exists'}), 400
# Update name
item.name = new_name
db.session.commit()
return jsonify({
'success': True,
'item': {
'id': item.id,
'name': item.name
}
})
@files_bp.route('/delete/<int:item_id>', methods=['POST'])
@login_required
def delete(item_id):
"""Delete a file or folder"""
item = File.query.filter_by(id=item_id, user_id=current_user.id).first_or_404()
try:
# If it's a file, delete the actual file from storage
if not item.is_folder and item.storage_name:
storage_path = os.path.join(Config.UPLOAD_FOLDER, item.storage_name)
if os.path.exists(storage_path):
os.remove(storage_path)
# Delete the database record (this will cascade delete any children due to the model relationship)
db.session.delete(item)
db.session.commit()
return jsonify({'success': True})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Delete error: {str(e)}")
return jsonify({'error': str(e)}), 500
# Import the helper functions from __init__.py
from app import get_file_icon, format_file_size