Flask-Files/app/__init__.py
2025-03-23 03:53:45 +01:00

281 lines
11 KiB
Python

from flask import Flask, current_app, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from config import Config
import os
from datetime import datetime
import sqlite3
import logging
from . import context_processors, filters
from flask_migrate import Migrate
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize extensions
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.login_message_category = 'info'
def initialize_database(app):
"""Create and initialize database tables if they don't exist"""
with app.app_context():
app.logger.info("Initializing database...")
try:
# Check if tables exist before creating them
from sqlalchemy import inspect
inspector = inspect(db.engine)
existing_tables = inspector.get_table_names()
# Only create tables that don't exist
if not existing_tables:
app.logger.info("Creating database tables...")
db.create_all()
# Check if we need to add the Share and Download models
inspector = db.inspect(db.engine)
if not inspector.has_table('share'):
app.logger.info("Creating Share table...")
# Import models to ensure they're registered with SQLAlchemy
from app.models import Share
if not inspector.has_table('share'):
# Create the Share table
Share.__table__.create(db.engine)
if not inspector.has_table('download'):
app.logger.info("Creating Download table...")
from app.models import Download
if not inspector.has_table('download'):
# Create the Download table
Download.__table__.create(db.engine)
# Check for existing users - create admin if none
from app.models import User
if User.query.count() == 0:
app.logger.info("No users found, creating default admin user...")
admin = User(username='admin', email='admin@example.com')
admin.set_password('adminpassword')
db.session.add(admin)
db.session.commit()
app.logger.info("Default admin user created")
app.logger.info("Database initialization complete")
else:
app.logger.info(f"Database already initialized with tables: {existing_tables}")
# Check for missing tables
from app.models import User, File, Folder, Download
required_tables = ['users', 'files', 'folders', 'downloads']
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
app.logger.info(f"Creating missing tables: {missing_tables}")
# Create only the missing tables
db.create_all()
except Exception as e:
app.logger.error(f"Error initializing database: {str(e)}")
# Don't raise the exception to prevent app startup failure
# But log it for debugging purposes
def run_migrations(app):
"""Apply any necessary database migrations automatically"""
with app.app_context():
try:
app.logger.info("Running database migrations...")
# Get database path
db_path = app.config['SQLALCHEMY_DATABASE_URI'].replace('sqlite:///', '')
# Check if we're using SQLite and if the database file exists
if db_path.startswith('/'): # Absolute path
if not os.path.exists(db_path):
app.logger.info(f"Database file does not exist: {db_path}")
return
# Use SQLAlchemy to check and add missing columns
inspector = db.inspect(db.engine)
# Check for 'file' table columns
if inspector.has_table('file'):
columns = [col['name'] for col in inspector.get_columns('file')]
# Add storage_name column if it doesn't exist
if 'storage_name' not in columns:
app.logger.info("Adding storage_name column to file table")
if db.engine.name == 'sqlite':
# For SQLite, use direct SQL as it doesn't support ALTER TABLE ADD COLUMN with default
db.session.execute('ALTER TABLE file ADD COLUMN storage_name TEXT')
# Update existing records
db.session.execute('UPDATE file SET storage_name = uuid() WHERE storage_name IS NULL AND is_folder = 0')
db.session.commit()
# Check for user table columns
if inspector.has_table('user'):
columns = [col['name'] for col in inspector.get_columns('user')]
# Add last_login column if it doesn't exist
if 'last_login' not in columns:
app.logger.info("Adding last_login column to user table")
if db.engine.name == 'sqlite':
db.session.execute('ALTER TABLE user ADD COLUMN last_login DATETIME')
db.session.commit()
app.logger.info("Database migrations complete")
except Exception as e:
app.logger.error(f"Error during database migration: {str(e)}")
# Log error but don't crash the app
def get_file_icon(mime_type, filename):
"""Return Font Awesome icon class based on file type"""
if mime_type:
if mime_type.startswith('image/'):
return 'fa-file-image'
elif mime_type.startswith('video/'):
return 'fa-file-video'
elif mime_type.startswith('audio/'):
return 'fa-file-audio'
elif mime_type.startswith('text/'):
return 'fa-file-alt'
elif mime_type.startswith('application/pdf'):
return 'fa-file-pdf'
elif 'spreadsheet' in mime_type or 'excel' in mime_type:
return 'fa-file-excel'
elif 'presentation' in mime_type or 'powerpoint' in mime_type:
return 'fa-file-powerpoint'
elif 'document' in mime_type or 'word' in mime_type:
return 'fa-file-word'
# Check by extension
ext = os.path.splitext(filename)[1].lower()[1:]
if ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp']:
return 'fa-file-image'
elif ext in ['mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv']:
return 'fa-file-video'
elif ext in ['mp3', 'wav', 'ogg', 'flac', 'm4a']:
return 'fa-file-audio'
elif ext in ['doc', 'docx', 'odt']:
return 'fa-file-word'
elif ext in ['xls', 'xlsx', 'ods', 'csv']:
return 'fa-file-excel'
elif ext in ['ppt', 'pptx', 'odp']:
return 'fa-file-powerpoint'
elif ext == 'pdf':
return 'fa-file-pdf'
elif ext in ['zip', 'rar', '7z', 'tar', 'gz']:
return 'fa-file-archive'
elif ext in ['txt', 'rtf', 'md']:
return 'fa-file-alt'
elif ext in ['html', 'css', 'js', 'py', 'java', 'php', 'c', 'cpp', 'json', 'xml']:
return 'fa-file-code'
return 'fa-file'
def format_file_size(size):
"""Format file size in bytes to human-readable format"""
if not size:
return "0 B"
size_names = ("B", "KB", "MB", "GB", "TB")
i = 0
while size >= 1024 and i < len(size_names) - 1:
size /= 1024
i += 1
return f"{size:.1f} {size_names[i]}"
def create_app(config_class=Config):
app = Flask(__name__)
# Configure app
if config_class:
app.config.from_object(config_class)
else:
# Use default configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-key-change-in-production')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///app.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../uploads')
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB max upload
# Configure logging
if not app.debug:
# Set up file handler
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = logging.FileHandler('logs/flask_files.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
# Set log level
app.logger.setLevel(logging.INFO)
app.logger.info('Flask Files startup')
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
# Initialize the upload folder
upload_folder = app.config.get('UPLOAD_FOLDER', 'uploads')
if not os.path.isabs(upload_folder):
# If it's a relative path, make it relative to the app instance folder
upload_folder = os.path.join(app.instance_path, upload_folder)
app.config['UPLOAD_FOLDER'] = upload_folder
os.makedirs(upload_folder, exist_ok=True)
app.logger.info(f"Upload folder initialized at: {upload_folder}")
# Auto initialize database and run migrations on startup
with app.app_context():
initialize_database(app)
run_migrations(app)
# Register blueprints
from app.routes.auth import bp as auth_bp
from app.routes.files import bp as files_bp
from app.routes.dashboard import bp as dashboard_bp
from app.routes.admin import bp as admin_bp
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(files_bp, url_prefix='/files')
app.register_blueprint(dashboard_bp)
app.register_blueprint(admin_bp, url_prefix='/admin')
# Register context processors and filters
context_processors.init_app(app)
filters.init_app(app)
# Add context processor for template variables
@app.context_processor
def inject_global_variables():
return {
'current_year': datetime.now().year,
'format_file_size': format_file_size,
'file_icon': get_file_icon,
'app_version': '1.0.0',
}
# Handle 404 errors
@app.errorhandler(404)
def not_found_error(error):
return render_template('errors/404.html'), 404
# Handle 500 errors
@app.errorhandler(500)
def internal_error(error):
db.session.rollback() # Rollback any failed database transactions
return render_template('errors/500.html'), 500
logger.info("Flask Files startup")
return app
# Import must come after create_app to avoid circular imports
from app import models