""" User model for authentication and authorization. """ import bcrypt from datetime import datetime, timedelta from typing import Optional, List import uuid from flask_login import UserMixin from app.extensions import db, login_manager class User(UserMixin, db.Model): """User model for authentication and authorization.""" __tablename__ = "users" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, nullable=False, index=True) email = db.Column(db.String(120), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(128), nullable=False) active = db.Column(db.Boolean, default=True, nullable=False) is_admin = db.Column(db.Boolean, default=False, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) last_login = db.Column(db.DateTime) # For account lockout/security failed_login_count = db.Column(db.Integer, default=0) locked_until = db.Column(db.DateTime) # Password reset reset_token = db.Column(db.String(64), unique=True, index=True) reset_token_expires_at = db.Column(db.DateTime) # Relationships networks = db.relationship("Network", back_populates="owner", cascade="all, delete-orphan") def set_password(self, password: str) -> None: """Hash and set the user's password.""" # Generate a salt and hash the password self.password_hash = bcrypt.hashpw( password.encode("utf-8"), bcrypt.gensalt() ).decode("utf-8") def check_password(self, password: str) -> bool: """Verify the provided password against the stored hash.""" return bcrypt.checkpw( password.encode("utf-8"), self.password_hash.encode("utf-8") ) def generate_reset_token(self, expires_in: int = 3600) -> str: """Generate a password reset token that expires after the specified time.""" self.reset_token = str(uuid.uuid4()) self.reset_token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in) db.session.commit() return self.reset_token def clear_reset_token(self) -> None: """Clear the reset token after it's been used.""" self.reset_token = None self.reset_token_expires_at = None db.session.commit() def is_reset_token_valid(self, token: str) -> bool: """Check if the provided reset token is valid and not expired.""" return ( self.reset_token == token and self.reset_token_expires_at is not None and self.reset_token_expires_at > datetime.utcnow() ) def record_login(self, success: bool) -> None: """Record a login attempt.""" if success: self.last_login = datetime.utcnow() self.failed_login_count = 0 self.locked_until = None else: self.failed_login_count += 1 if self.failed_login_count >= 5: # Threshold from config self.locked_until = datetime.utcnow() + timedelta(minutes=30) # Duration from config db.session.commit() def is_account_locked(self) -> bool: """Check if the account is locked due to too many failed login attempts.""" if self.locked_until is None: return False if datetime.utcnow() > self.locked_until: self.failed_login_count = 0 self.locked_until = None db.session.commit() return False return True def __repr__(self) -> str: return f"" @login_manager.user_loader def load_user(user_id: str) -> Optional[User]: """Load a user from the database by ID.""" return User.query.get(int(user_id))