from flask import render_template, redirect, url_for, flash, request, send_from_directory, abort, jsonify from flask_login import login_required, current_user from werkzeug.utils import secure_filename from app import db from app.models import File, Share from app.routes import files_bp import os from datetime import datetime, timedelta import uuid import mimetypes from config import Config import shutil @files_bp.route('/browser') @files_bp.route('/browser/') @login_required def browser(folder_id=None): # Get current folder current_folder = None if folder_id: current_folder = File.query.filter_by(id=folder_id, user_id=current_user.id).first_or_404() if not current_folder.is_folder: abort(400) # Generate breadcrumb navigation breadcrumbs = [] if current_folder: temp_folder = current_folder while temp_folder: breadcrumbs.append(temp_folder) temp_folder = temp_folder.parent breadcrumbs.reverse() # Get files and folders query = File.query.filter_by(user_id=current_user.id, parent_id=folder_id) folders = query.filter_by(is_folder=True).order_by(File.name).all() files = query.filter_by(is_folder=False).order_by(File.name).all() return render_template('files/browser.html', title='File Browser', current_folder=current_folder, breadcrumbs=breadcrumbs, folders=folders, files=files) @files_bp.route('/upload') @login_required def upload(): folder_id = request.args.get('folder', None, type=int) # Get parent folder parent_folder = None if folder_id: parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404() return render_template('files/upload.html', title='Upload Files', parent_folder=parent_folder) @files_bp.route('/upload_folder', methods=['POST']) @login_required def upload_folder(): folder_id = request.form.get('folder_id', None, type=int) # Get parent folder parent_folder = None if folder_id: parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first_or_404() # Create temporary directory for uploaded files temp_dir = os.path.join(Config.UPLOAD_FOLDER, 'temp', str(uuid.uuid4())) os.makedirs(temp_dir, exist_ok=True) try: # Process uploaded files (with relative paths) files = request.files.getlist('folder[]') if not files or all(file.filename == '' for file in files): flash('No folder selected for upload', 'error') shutil.rmtree(temp_dir, ignore_errors=True) return redirect(url_for('files.browser', folder_id=folder_id)) # Save files to temp directory with their relative paths for file in files: if file.filename == '': continue # Get the relative path within the folder rel_path = file.filename if '\\' in rel_path: # Windows paths rel_path = rel_path.replace('\\', '/') # Create directories for the path dir_path = os.path.join(temp_dir, os.path.dirname(rel_path)) os.makedirs(dir_path, exist_ok=True) # Save the file file.save(os.path.join(temp_dir, rel_path)) # Process the folder structure base_folder_name = os.path.basename(os.path.normpath(temp_dir)) for root, dirs, files in os.walk(temp_dir): rel_root = os.path.relpath(root, temp_dir) # Skip the root directory itself if rel_root == '.': rel_root = '' # Find or create parent folder current_parent_id = folder_id if rel_root: path_parts = rel_root.split(os.path.sep) for part in path_parts: existing_folder = File.query.filter_by( name=part, parent_id=current_parent_id, user_id=current_user.id, is_folder=True ).first() if existing_folder: current_parent_id = existing_folder.id else: new_folder = File( name=part, user_id=current_user.id, parent_id=current_parent_id, is_folder=True ) db.session.add(new_folder) db.session.flush() # To get the ID current_parent_id = new_folder.id # Create file records for files in current directory for filename in files: full_path = os.path.join(root, filename) secure_name = secure_filename(filename) # Generate UUID for storage file_uuid = str(uuid.uuid4()) storage_path = os.path.join(Config.UPLOAD_FOLDER, file_uuid) # Copy file to storage location shutil.copy2(full_path, storage_path) # Get file info file_size = os.path.getsize(storage_path) mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = 'application/octet-stream' # Create file record db_file = File( name=secure_name, storage_name=file_uuid, mime_type=mime_type, size=file_size, user_id=current_user.id, parent_id=current_parent_id, is_folder=False ) db.session.add(db_file) db.session.commit() flash('Folder uploaded successfully', 'success') except Exception as e: db.session.rollback() flash(f'Error uploading folder: {str(e)}', 'error') finally: # Clean up temp directory shutil.rmtree(temp_dir, ignore_errors=True) # Redirect back to the folder if parent_folder: return redirect(url_for('files.browser', folder_id=parent_folder.id)) else: return redirect(url_for('files.browser')) @files_bp.route('/create_folder', methods=['GET', 'POST']) @login_required def create_folder(): parent_id = request.args.get('folder', None, type=int) or request.form.get('parent_id', None, type=int) # Verify parent folder if specified if parent_id: parent = File.query.filter_by(id=parent_id, user_id=current_user.id, is_folder=True).first_or_404() if request.method == 'POST': folder_name = request.form.get('folder_name', '').strip() if not folder_name: flash('Folder name cannot be empty', 'error') return redirect(request.url) # Secure the folder name folder_name = secure_filename(folder_name) # Check if folder already exists existing_folder = File.query.filter_by( name=folder_name, parent_id=parent_id, user_id=current_user.id, is_folder=True ).first() if existing_folder: flash(f'A folder named "{folder_name}" already exists', 'error') return redirect(request.url) # Create new folder new_folder = File( name=folder_name, user_id=current_user.id, parent_id=parent_id, is_folder=True ) db.session.add(new_folder) db.session.commit() flash(f'Folder "{folder_name}" created successfully', 'success') # Handle AJAX requests if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'success': True, 'folder_id': new_folder.id}) # Redirect to the new folder return redirect(url_for('files.browser', folder_id=new_folder.id)) # For GET, show the create folder form return render_template('files/create_folder.html', title='Create Folder', parent_id=parent_id) @files_bp.route('/download/') @login_required def download(file_id): file = File.query.filter_by(id=file_id, user_id=current_user.id, is_folder=False).first_or_404() # Record the download download = Download( file_id=file.id, user_id=current_user.id, ip_address=request.remote_addr ) db.session.add(download) db.session.commit() return send_from_directory( Config.UPLOAD_FOLDER, file.storage_name, as_attachment=True, attachment_filename=file.name ) @files_bp.route('/delete/', methods=['POST']) @login_required def delete(file_id): file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404() parent_id = file.parent_id # If it's a folder, delete all children recursively if file.is_folder: delete_folder_recursive(file) else: # Delete the actual file try: os.remove(os.path.join(Config.UPLOAD_FOLDER, file.storage_name)) except (FileNotFoundError, OSError): pass # File already gone, continue # Delete the database record db.session.delete(file) db.session.commit() flash(f'{"Folder" if file.is_folder else "File"} deleted successfully', 'success') # Handle AJAX requests if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'success': True}) return redirect(url_for('files.browser', folder_id=parent_id)) def delete_folder_recursive(folder): """Recursively delete a folder and all its contents""" # First get all child items children = File.query.filter_by(parent_id=folder.id).all() for child in children: if child.is_folder: delete_folder_recursive(child) else: # Delete the actual file try: os.remove(os.path.join(Config.UPLOAD_FOLDER, child.storage_name)) except (FileNotFoundError, OSError): pass # File already gone, continue # Delete the database record db.session.delete(child) # Finally delete the folder itself db.session.delete(folder) @files_bp.route('/rename/', methods=['POST']) @login_required def rename(file_id): file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404() new_name = request.form.get('new_name', '').strip() if not new_name: flash('Name cannot be empty', 'error') return redirect(url_for('files.browser', folder_id=file.parent_id)) # Secure the new name new_name = secure_filename(new_name) # Check if a file/folder with this name already exists existing = File.query.filter_by( name=new_name, parent_id=file.parent_id, user_id=current_user.id, is_folder=file.is_folder ).first() if existing and existing.id != file.id: flash(f'A {"folder" if file.is_folder else "file"} with this name already exists', 'error') return redirect(url_for('files.browser', folder_id=file.parent_id)) # Update the name file.name = new_name db.session.commit() flash(f'{"Folder" if file.is_folder else "File"} renamed successfully', 'success') # Handle AJAX requests if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'success': True}) return redirect(url_for('files.browser', folder_id=file.parent_id)) @files_bp.route('/share/', methods=['GET', 'POST']) @login_required def share(file_id): file = File.query.filter_by(id=file_id, user_id=current_user.id).first_or_404() if request.method == 'POST': # Generate share link expires_days = request.form.get('expires', type=int) expires_at = None if expires_days: expires_at = datetime.utcnow() + timedelta(days=expires_days) # Create unique share token share_token = str(uuid.uuid4()) # Save share in database share = Share( file_id=file.id, user_id=current_user.id, token=share_token, expires_at=expires_at ) db.session.add(share) db.session.commit() # Generate the share URL share_url = url_for('files.public_share', token=share_token, _external=True) flash('Share link created successfully', 'success') return render_template('files/share_success.html', title='Share Link', file=file, share=share, share_url=share_url) return render_template('files/share.html', title='Share File', file=file) @files_bp.route('/public/') def public_share(token): # Find the share by token share = Share.query.filter_by(token=token).first_or_404() # Check if share has expired if share.expires_at and share.expires_at < datetime.utcnow(): return render_template('files/share_expired.html', title='Share Expired') # Get the file details file = File.query.get_or_404(share.file_id) # Record the download download = Download( file_id=file.id, share_id=share.id, ip_address=request.remote_addr ) db.session.add(download) db.session.commit() # If it's a viewable file type, show a preview if file.mime_type and ( file.mime_type.startswith('image/') or file.mime_type == 'application/pdf' or file.mime_type.startswith('text/') or file.mime_type in ['application/javascript', 'application/json'] ): return render_template('files/preview.html', title=file.name, file=file, share=share, download_url=url_for('files.public_download', token=token)) # Otherwise, redirect to download return redirect(url_for('files.public_download', token=token)) @files_bp.route('/public/download/') def public_download(token): # Find the share by token share = Share.query.filter_by(token=token).first_or_404() # Check if share has expired if share.expires_at and share.expires_at < datetime.utcnow(): return render_template('files/share_expired.html', title='Share Expired') # Get the file details file = File.query.get_or_404(share.file_id) # Send the file return send_from_directory( Config.UPLOAD_FOLDER, file.storage_name, as_attachment=True, attachment_filename=file.name ) @files_bp.route('/upload_xhr', methods=['POST']) @login_required def upload_xhr(): """Handle AJAX file uploads with progress tracking""" if 'files[]' not in request.files: return jsonify({'success': False, 'error': 'No files found in the request'}) files = request.files.getlist('files[]') folder_id = request.form.get('folder_id', None, type=int) is_folder = request.form.get('is_folder') == '1' paths = request.form.getlist('paths[]') # Check if any files were selected if not files or all(f.filename == '' for f in files): return jsonify({'success': False, 'error': 'No files selected for upload'}) # Check folder exists if folder_id is provided parent_folder = None if folder_id: parent_folder = File.query.filter_by(id=folder_id, user_id=current_user.id, is_folder=True).first() if not parent_folder: return jsonify({'success': False, 'error': 'Parent folder not found'}) # Process uploads successful = 0 failed = 0 errors = [] # If this is a folder upload, we need to create the folder structure folder_map = {} # Maps path to folder ID for i, file in enumerate(files): try: if file.filename == '': continue # Get the relative path for folder uploads relative_path = paths[i] if is_folder and i < len(paths) else None # Handle folder structure if needed current_parent_id = folder_id if is_folder and relative_path: # Split path into directory components path_parts = os.path.dirname(relative_path).split('/') if path_parts and path_parts[0]: # Skip empty path (files at root) # Create each folder in the path if needed current_path = "" for part in path_parts: if not part: # Skip empty parts continue current_path = os.path.join(current_path, part) if current_path else part # Check if we've already created this folder if current_path in folder_map: current_parent_id = folder_map[current_path] continue # Check if folder already exists folder_name = secure_filename(part) existing_folder = File.query.filter_by( name=folder_name, parent_id=current_parent_id, user_id=current_user.id, is_folder=True ).first() if existing_folder: current_parent_id = existing_folder.id folder_map[current_path] = existing_folder.id else: # Create new folder new_folder = File( name=folder_name, parent_id=current_parent_id, user_id=current_user.id, is_folder=True ) db.session.add(new_folder) db.session.flush() # Get the ID without committing current_parent_id = new_folder.id folder_map[current_path] = new_folder.id # Now handle the actual file filename = os.path.basename(relative_path) if relative_path else file.filename filename = secure_filename(filename) # Check if file already exists existing_file = File.query.filter_by( name=filename, parent_id=current_parent_id, user_id=current_user.id, is_folder=False ).first() if existing_file: # Create a unique name by adding timestamp name_parts = os.path.splitext(filename) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{name_parts[0]}_{timestamp}{name_parts[1]}" # Generate a unique storage name storage_name = f"{str(uuid.uuid4())}{os.path.splitext(filename)[1]}" # Save the file file_path = os.path.join(Config.UPLOAD_FOLDER, storage_name) file.save(file_path) # Get file size and mime type file_size = os.path.getsize(file_path) mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' # Create file entry in database db_file = File( name=filename, storage_name=storage_name, mime_type=mime_type, size=file_size, user_id=current_user.id, parent_id=current_parent_id, is_folder=False ) db.session.add(db_file) successful += 1 except Exception as e: failed += 1 errors.append(f"{file.filename}: {str(e)}") # Commit all database changes if successful > 0: db.session.commit() result = { 'success': True if successful > 0 else False, 'message': f"Successfully uploaded {successful} files, {failed} failed.", 'successful': successful, 'failed': failed, 'errors': errors } return jsonify(result)