Flask-Files/app/routes/files.py
2025-03-22 12:30:45 +01:00

584 lines
No EOL
21 KiB
Python

from flask import render_template, redirect, url_for, flash, request, send_from_directory, abort, jsonify
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app import db
from app.models import File, Share
from app.routes import files_bp
import os
from datetime import datetime, timedelta
import uuid
import mimetypes
from config import Config
import shutil
@files_bp.route('/browser')
@files_bp.route('/browser/<int:folder_id>')
@login_required
def browser(folder_id=None):
# Get current folder
current_folder = None
if folder_id:
current_folder = File.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404()
if not current_folder.is_folder:
abort(400)
# Generate breadcrumb navigation
breadcrumbs = []
if current_folder:
temp_folder = current_folder
while temp_folder:
breadcrumbs.append(temp_folder)
temp_folder = temp_folder.parent
breadcrumbs.reverse()
# Get files and folders
query = File.query.filter_by(user_id=current_user.id, parent_id=folder_id)
folders = query.filter_by(is_folder=True).order_by(File.name).all()
files = query.filter_by(is_folder=False).order_by(File.name).all()
return render_template('files/browser.html',
title='File Browser',
current_folder=current_folder,
breadcrumbs=breadcrumbs,
folders=folders,
files=files)
@files_bp.route('/upload')
@login_required
def upload():
folder_id = request.args.get('folder', None, type=int)
# Get parent folder
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
return render_template('files/upload.html',
title='Upload Files',
parent_folder=parent_folder)
@files_bp.route('/upload_folder', methods=['POST'])
@login_required
def upload_folder():
folder_id = request.form.get('folder_id', None, type=int)
# Get parent folder
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404()
# Create temporary directory for uploaded files
temp_dir = os.path.join(Config.UPLOAD_FOLDER, 'temp', str(uuid.uuid4()))
os.makedirs(temp_dir, exist_ok=True)
try:
# Process uploaded files (with relative paths)
files = request.files.getlist('folder[]')
if not files or all(file.filename == '' for file in files):
flash('No folder selected for upload', 'error')
shutil.rmtree(temp_dir, ignore_errors=True)
return redirect(url_for('files.browser', folder_id=folder_id))
# Save files to temp directory with their relative paths
for file in files:
if file.filename == '':
continue
# Get the relative path within the folder
rel_path = file.filename
if '\\' in rel_path: # Windows paths
rel_path = rel_path.replace('\\', '/')
# Create directories for the path
dir_path = os.path.join(temp_dir, os.path.dirname(rel_path))
os.makedirs(dir_path, exist_ok=True)
# Save the file
file.save(os.path.join(temp_dir, rel_path))
# Process the folder structure
base_folder_name = os.path.basename(os.path.normpath(temp_dir))
for root, dirs, files in os.walk(temp_dir):
rel_root = os.path.relpath(root, temp_dir)
# Skip the root directory itself
if rel_root == '.':
rel_root = ''
# Find or create parent folder
current_parent_id = folder_id
if rel_root:
path_parts = rel_root.split(os.path.sep)
for part in path_parts:
existing_folder = File.query.filter_by(
name=part,
parent_id=current_parent_id,
user_id=current_user.id,
is_folder=True
).first()
if existing_folder:
current_parent_id = existing_folder.id
else:
new_folder = File(
name=part,
user_id=current_user.id,
parent_id=current_parent_id,
is_folder=True
)
db.session.add(new_folder)
db.session.flush() # To get the ID
current_parent_id = new_folder.id
# Create file records for files in current directory
for filename in files:
full_path = os.path.join(root, filename)
secure_name = secure_filename(filename)
# Generate UUID for storage
file_uuid = str(uuid.uuid4())
storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid)
# Copy file to storage location
shutil.copy2(full_path, storage_path)
# Get file info
file_size = os.path.getsize(storage_path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Create file record
db_file = File(
name=secure_name,
storage_name=file_uuid,
mime_type=mime_type,
size=file_size,
user_id=current_user.id,
parent_id=current_parent_id,
is_folder=False
)
db.session.add(db_file)
db.session.commit()
flash('Folder uploaded successfully', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error uploading folder: {str(e)}', 'error')
finally:
# Clean up temp directory
shutil.rmtree(temp_dir, ignore_errors=True)
# Redirect back to the folder
if parent_folder:
return redirect(url_for('files.browser', folder_id=parent_folder.id))
else:
return redirect(url_for('files.browser'))
@files_bp.route('/create_folder', methods=['GET', 'POST'])
@login_required
def create_folder():
parent_id = request.args.get('folder', None, type=int) or request.form.get('parent_id', None, type=int)
# Verify parent folder if specified
if parent_id:
parent = File.query.filter_by(id=parent_id, user_id=current_user.id, is_folder=True).first_or_404()
if request.method == 'POST':
folder_name = request.form.get('folder_name', '').strip()
if not folder_name:
flash('Folder name cannot be empty', 'error')
return redirect(request.url)
# Secure the folder name
folder_name = secure_filename(folder_name)
# Check if folder already exists
existing_folder = File.query.filter_by(
name=folder_name,
parent_id=parent_id,
user_id=current_user.id,
is_folder=True
).first()
if existing_folder:
flash(f'A folder named "{folder_name}" already exists', 'error')
return redirect(request.url)
# Create new folder
new_folder = File(
name=folder_name,
user_id=current_user.id,
parent_id=parent_id,
is_folder=True
)
db.session.add(new_folder)
db.session.commit()
flash(f'Folder "{folder_name}" created successfully', 'success')
# Handle AJAX requests
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': True, 'folder_id': new_folder.id})
# Redirect to the new folder
return redirect(url_for('files.browser', folder_id=new_folder.id))
# For GET, show the create folder form
return render_template('files/create_folder.html',
title='Create Folder',
parent_id=parent_id)
@files_bp.route('/download/<int:file_id>')
@login_required
def download(file_id):
file = File.query.filter_by(id=file_id, user_id=current_user.id, is_folder=False).first_or_404()
# Record the download
download = Download(
file_id=file.id,
user_id=current_user.id,
ip_address=request.remote_addr
)
db.session.add(download)
db.session.commit()
return send_from_directory(
Config.UPLOAD_FOLDER,
file.storage_name,
as_attachment=True,
attachment_filename=file.name
)
@files_bp.route('/delete/<int:file_id>', methods=['POST'])
@login_required
def delete(file_id):
file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404()
parent_id = file.parent_id
# If it's a folder, delete all children recursively
if file.is_folder:
delete_folder_recursive(file)
else:
# Delete the actual file
try:
os.remove(os.path.join(Config.UPLOAD_FOLDER, file.storage_name))
except (FileNotFoundError, OSError):
pass # File already gone, continue
# Delete the database record
db.session.delete(file)
db.session.commit()
flash(f'{"Folder" if file.is_folder else "File"} deleted successfully', 'success')
# Handle AJAX requests
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': True})
return redirect(url_for('files.browser', folder_id=parent_id))
def delete_folder_recursive(folder):
"""Recursively delete a folder and all its contents"""
# First get all child items
children = File.query.filter_by(parent_id=folder.id).all()
for child in children:
if child.is_folder:
delete_folder_recursive(child)
else:
# Delete the actual file
try:
os.remove(os.path.join(Config.UPLOAD_FOLDER, child.storage_name))
except (FileNotFoundError, OSError):
pass # File already gone, continue
# Delete the database record
db.session.delete(child)
# Finally delete the folder itself
db.session.delete(folder)
@files_bp.route('/rename/<int:file_id>', methods=['POST'])
@login_required
def rename(file_id):
file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404()
new_name = request.form.get('new_name', '').strip()
if not new_name:
flash('Name cannot be empty', 'error')
return redirect(url_for('files.browser', folder_id=file.parent_id))
# Secure the new name
new_name = secure_filename(new_name)
# Check if a file/folder with this name already exists
existing = File.query.filter_by(
name=new_name,
parent_id=file.parent_id,
user_id=current_user.id,
is_folder=file.is_folder
).first()
if existing and existing.id != file.id:
flash(f'A {"folder" if file.is_folder else "file"} with this name already exists', 'error')
return redirect(url_for('files.browser', folder_id=file.parent_id))
# Update the name
file.name = new_name
db.session.commit()
flash(f'{"Folder" if file.is_folder else "File"} renamed successfully', 'success')
# Handle AJAX requests
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': True})
return redirect(url_for('files.browser', folder_id=file.parent_id))
@files_bp.route('/share/<int:file_id>', methods=['GET', 'POST'])
@login_required
def share(file_id):
file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404()
if request.method == 'POST':
# Generate share link
expires_days = request.form.get('expires', type=int)
expires_at = None
if expires_days:
expires_at = datetime.utcnow() + timedelta(days=expires_days)
# Create unique share token
share_token = str(uuid.uuid4())
# Save share in database
share = Share(
file_id=file.id,
user_id=current_user.id,
token=share_token,
expires_at=expires_at
)
db.session.add(share)
db.session.commit()
# Generate the share URL
share_url = url_for('files.public_share', token=share_token, _external=True)
flash('Share link created successfully', 'success')
return render_template('files/share_success.html',
title='Share Link',
file=file,
share=share,
share_url=share_url)
return render_template('files/share.html',
title='Share File',
file=file)
@files_bp.route('/public/<string:token>')
def public_share(token):
# Find the share by token
share = Share.query.filter_by(token=token).first_or_404()
# Check if share has expired
if share.expires_at and share.expires_at < datetime.utcnow():
return render_template('files/share_expired.html',
title='Share Expired')
# Get the file details
file = File.query.get_or_404(share.file_id)
# Record the download
download = Download(
file_id=file.id,
share_id=share.id,
ip_address=request.remote_addr
)
db.session.add(download)
db.session.commit()
# If it's a viewable file type, show a preview
if file.mime_type and (
file.mime_type.startswith('image/') or
file.mime_type == 'application/pdf' or
file.mime_type.startswith('text/') or
file.mime_type in ['application/javascript', 'application/json']
):
return render_template('files/preview.html',
title=file.name,
file=file,
share=share,
download_url=url_for('files.public_download', token=token))
# Otherwise, redirect to download
return redirect(url_for('files.public_download', token=token))
@files_bp.route('/public/download/<string:token>')
def public_download(token):
# Find the share by token
share = Share.query.filter_by(token=token).first_or_404()
# Check if share has expired
if share.expires_at and share.expires_at < datetime.utcnow():
return render_template('files/share_expired.html',
title='Share Expired')
# Get the file details
file = File.query.get_or_404(share.file_id)
# Send the file
return send_from_directory(
Config.UPLOAD_FOLDER,
file.storage_name,
as_attachment=True,
attachment_filename=file.name
)
@files_bp.route('/upload_xhr', methods=['POST'])
@login_required
def upload_xhr():
"""Handle AJAX file uploads with progress tracking"""
if 'files[]' not in request.files:
return jsonify({'success': False, 'error': 'No files found in the request'})
files = request.files.getlist('files[]')
folder_id = request.form.get('folder_id', None, type=int)
is_folder = request.form.get('is_folder') == '1'
paths = request.form.getlist('paths[]')
# Check if any files were selected
if not files or all(f.filename == '' for f in files):
return jsonify({'success': False, 'error': 'No files selected for upload'})
# Check folder exists if folder_id is provided
parent_folder = None
if folder_id:
parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first()
if not parent_folder:
return jsonify({'success': False, 'error': 'Parent folder not found'})
# Process uploads
successful = 0
failed = 0
errors = []
# If this is a folder upload, we need to create the folder structure
folder_map = {} # Maps path to folder ID
for i, file in enumerate(files):
try:
if file.filename == '':
continue
# Get the relative path for folder uploads
relative_path = paths[i] if is_folder and i < len(paths) else None
# Handle folder structure if needed
current_parent_id = folder_id
if is_folder and relative_path:
# Split path into directory components
path_parts = os.path.dirname(relative_path).split('/')
if path_parts and path_parts[0]: # Skip empty path (files at root)
# Create each folder in the path if needed
current_path = ""
for part in path_parts:
if not part: # Skip empty parts
continue
current_path = os.path.join(current_path, part) if current_path else part
# Check if we've already created this folder
if current_path in folder_map:
current_parent_id = folder_map[current_path]
continue
# Check if folder already exists
folder_name = secure_filename(part)
existing_folder = File.query.filter_by(
name=folder_name,
parent_id=current_parent_id,
user_id=current_user.id,
is_folder=True
).first()
if existing_folder:
current_parent_id = existing_folder.id
folder_map[current_path] = existing_folder.id
else:
# Create new folder
new_folder = File(
name=folder_name,
parent_id=current_parent_id,
user_id=current_user.id,
is_folder=True
)
db.session.add(new_folder)
db.session.flush() # Get the ID without committing
current_parent_id = new_folder.id
folder_map[current_path] = new_folder.id
# Now handle the actual file
filename = os.path.basename(relative_path) if relative_path else file.filename
filename = secure_filename(filename)
# Check if file already exists
existing_file = File.query.filter_by(
name=filename,
parent_id=current_parent_id,
user_id=current_user.id,
is_folder=False
).first()
if existing_file:
# Create a unique name by adding timestamp
name_parts = os.path.splitext(filename)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{name_parts[0]}_{timestamp}{name_parts[1]}"
# Generate a unique storage name
storage_name = f"{str(uuid.uuid4())}{os.path.splitext(filename)[1]}"
# Save the file
file_path = os.path.join(Config.UPLOAD_FOLDER, storage_name)
file.save(file_path)
# Get file size and mime type
file_size = os.path.getsize(file_path)
mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
# Create file entry in database
db_file = File(
name=filename,
storage_name=storage_name,
mime_type=mime_type,
size=file_size,
user_id=current_user.id,
parent_id=current_parent_id,
is_folder=False
)
db.session.add(db_file)
successful += 1
except Exception as e:
failed += 1
errors.append(f"{file.filename}: {str(e)}")
# Commit all database changes
if successful > 0:
db.session.commit()
result = {
'success': True if successful > 0 else False,
'message': f"Successfully uploaded {successful} files, {failed} failed.",
'successful': successful,
'failed': failed,
'errors': errors
}
return jsonify(result)