""" Complete Migration Script with Integrated Backup & Restore Option 3: Client + Machine Assignment FLOW: 1. Backup current database 2. Run migration 3. Verify migration success 4. On failure: Auto-restore from backup Usage: python migrate_with_backup.py Author: System Date: 2025-01-25 Version: 2.0.0 """ import sys import os import shutil from datetime import datetime from pathlib import Path # Add parent directory to path current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) class DatabaseBackupManager: """Handles database backup and restore operations""" def __init__(self, db_path): self.db_path = Path(db_path) self.backup_dir = self.db_path.parent / 'backups' self.current_backup = None def ensure_backup_dir(self): """Create backup directory if it doesn't exist""" self.backup_dir.mkdir(parents=True, exist_ok=True) def create_backup(self): """Create timestamped backup of database""" if not self.db_path.exists(): raise FileNotFoundError(f"Database not found: {self.db_path}") self.ensure_backup_dir() timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = self.backup_dir / f'machines_backup_{timestamp}.db' try: # Create backup shutil.copy2(self.db_path, backup_file) self.current_backup = backup_file # Get file size size = backup_file.stat().st_size / (1024 * 1024) print(f"āœ… Backup created successfully!") print(f" File: {backup_file.name}") print(f" Size: {size:.2f} MB") print(f" Location: {backup_file}") return backup_file except Exception as e: raise Exception(f"Backup failed: {e}") def restore_backup(self, backup_file=None): """Restore database from backup""" if backup_file is None: backup_file = self.current_backup if backup_file is None: raise ValueError("No backup file specified") if not backup_file.exists(): raise FileNotFoundError(f"Backup file not found: {backup_file}") try: # Create safety backup of current database if self.db_path.exists(): safety_backup = self.backup_dir / f'machines_before_restore_{datetime.now().strftime("%Y%m%d_%H%M%S")}.db' shutil.copy2(self.db_path, safety_backup) print(f"āœ… Safety backup created: {safety_backup.name}") # Restore from backup shutil.copy2(backup_file, self.db_path) print(f"āœ… Database restored successfully!") print(f" Restored from: {backup_file.name}") return True except Exception as e: raise Exception(f"Restore failed: {e}") def list_backups(self): """List all available backups""" self.ensure_backup_dir() backups = sorted(self.backup_dir.glob('machines_backup_*.db'), reverse=True) return backups def cleanup_old_backups(self, keep_count=10): """Keep only the most recent N backups""" backups = self.list_backups() if len(backups) <= keep_count: print(f"āœ… Cleanup: Keeping all {len(backups)} backups") return to_delete = backups[keep_count:] print(f"šŸ—‘ļø Cleanup: Keeping {keep_count} most recent backups") print(f" Deleting {len(to_delete)} old backups...") for backup in to_delete: try: backup.unlink() print(f" āœ“ Deleted: {backup.name}") except Exception as e: print(f" āœ— Failed to delete {backup.name}: {e}") class MigrationError(Exception): """Custom exception for migration errors""" pass class Option3MigrationWithBackup: """Complete migration with integrated backup and restore""" def __init__(self): self.app = None self.inspector = None self.changes_made = [] self.backup_manager = None self.backup_file = None def print_header(self, title): """Print formatted header""" print("\n" + "=" * 70) print(f" {title}") print("=" * 70) def print_step(self, step_num, description): """Print step information""" print(f"\n[Step {step_num}] {description}") print("-" * 70) def initialize(self): """Initialize Flask app and database""" try: from app import create_app, db from sqlalchemy import inspect, text self.app = create_app() self.db = db self.text = text # Get database path with self.app.app_context(): db_uri = self.app.config['SQLALCHEMY_DATABASE_URI'] if db_uri.startswith('sqlite:///'): db_path = db_uri.replace('sqlite:///', '') self.db_path = Path(db_path) self.backup_manager = DatabaseBackupManager(self.db_path) else: raise ValueError("This script only supports SQLite databases") self.inspector = inspect(db.engine) return True except ImportError as e: print(f"āŒ Error importing modules: {e}") print("Make sure you're running this from the backend directory") return False except Exception as e: print(f"āŒ Initialization failed: {e}") return False def create_backup(self): """Create database backup""" self.print_step(1, "Creating Database Backup") try: self.backup_file = self.backup_manager.create_backup() return True except Exception as e: print(f"āŒ Backup failed: {e}") return False def check_prerequisites(self): """Check if all required tables exist""" self.print_step(2, "Checking Prerequisites") with self.app.app_context(): tables = self.inspector.get_table_names() required_tables = ['users', 'machines'] missing_tables = [t for t in required_tables if t not in tables] if missing_tables: raise MigrationError( f"Required tables missing: {', '.join(missing_tables)}\n" "Please ensure your database is properly initialized." ) print("āœ… All required tables exist") print(f"āœ… Found tables: {', '.join(tables)}") def add_assigned_to_column(self): """Add assigned_to column to users table if not exists""" self.print_step(3, "Adding assigned_to Column to Users Table") with self.app.app_context(): columns = [col['name'] for col in self.inspector.get_columns('users')] if 'assigned_to' in columns: print("āš ļø assigned_to column already exists - skipping") return False try: print("Creating assigned_to column...") # Add column self.db.session.execute(self.text(""" ALTER TABLE users ADD COLUMN assigned_to INTEGER NULL """)) # Add foreign key constraint self.db.session.execute(self.text(""" ALTER TABLE users ADD CONSTRAINT fk_users_assigned_to FOREIGN KEY (assigned_to) REFERENCES users(id) ON DELETE SET NULL """)) self.db.session.commit() print("āœ… assigned_to column created successfully") print("āœ… Foreign key constraint added") self.changes_made.append('assigned_to_column') return True except Exception as e: self.db.session.rollback() raise MigrationError(f"Failed to add assigned_to column: {str(e)}") def create_refiller_machines_table(self): """Create refiller_machines junction table if not exists""" self.print_step(4, "Creating refiller_machines Junction Table") with self.app.app_context(): tables = self.inspector.get_table_names() if 'refiller_machines' in tables: print("āš ļø refiller_machines table already exists - skipping") return False try: print("Creating refiller_machines table...") self.db.session.execute(self.text(""" CREATE TABLE refiller_machines ( id INTEGER PRIMARY KEY AUTOINCREMENT, refiller_id INTEGER NOT NULL, machine_id VARCHAR(10) NOT NULL, assigned_at DATETIME DEFAULT CURRENT_TIMESTAMP, assigned_by INTEGER NULL, CONSTRAINT fk_refiller_machines_refiller FOREIGN KEY (refiller_id) REFERENCES users(id) ON DELETE CASCADE, CONSTRAINT fk_refiller_machines_machine FOREIGN KEY (machine_id) REFERENCES machines(machine_id) ON DELETE CASCADE, CONSTRAINT fk_refiller_machines_assigner FOREIGN KEY (assigned_by) REFERENCES users(id) ON DELETE SET NULL, CONSTRAINT unique_refiller_machine UNIQUE (refiller_id, machine_id) ) """)) self.db.session.commit() print("āœ… refiller_machines table created successfully") self.changes_made.append('refiller_machines_table') return True except Exception as e: self.db.session.rollback() raise MigrationError(f"Failed to create refiller_machines table: {str(e)}") def verify_migration(self): """Verify all changes were applied correctly""" self.print_step(5, "Verifying Migration") with self.app.app_context(): # Refresh inspector from sqlalchemy import inspect self.inspector = inspect(self.db.engine) # Check assigned_to column user_columns = [col['name'] for col in self.inspector.get_columns('users')] if 'assigned_to' in user_columns: print("āœ… users.assigned_to column exists") else: raise MigrationError("Verification failed: assigned_to column not found") # Check refiller_machines table tables = self.inspector.get_table_names() if 'refiller_machines' in tables: print("āœ… refiller_machines table exists") # Verify columns rm_columns = [col['name'] for col in self.inspector.get_columns('refiller_machines')] expected_columns = ['id', 'refiller_id', 'machine_id', 'assigned_at', 'assigned_by'] for col in expected_columns: if col in rm_columns: print(f" āœ… Column '{col}' exists") else: raise MigrationError(f"Verification failed: Column '{col}' not found") else: raise MigrationError("Verification failed: refiller_machines table not found") def restore_on_failure(self): """Restore database from backup on migration failure""" self.print_header("šŸ”„ RESTORING FROM BACKUP") try: if self.backup_file: self.backup_manager.restore_backup(self.backup_file) print("āœ… Database restored successfully from backup") print(" Your data is safe!") else: print("āš ļø No backup file available for restore") except Exception as e: print(f"āŒ Restore failed: {e}") print(" Please manually restore from backup") def print_summary(self): """Print migration summary""" self.print_header("āœ… MIGRATION COMPLETED SUCCESSFULLY") if not self.changes_made: print("\nāš ļø No changes were made - all structures already exist") else: print(f"\nāœ… Successfully applied {len(self.changes_made)} change(s):") for i, change in enumerate(self.changes_made, 1): print(f" {i}. {change}") # Show backup information if self.backup_file: print(f"\nšŸ“¦ Backup Information:") print(f" Location: {self.backup_file}") print(f" You can restore this backup if needed using:") print(f" python migrate_with_backup.py restore") # Cleanup old backups print() self.backup_manager.cleanup_old_backups(keep_count=10) print("\n" + "=" * 70) print("NEXT STEPS") print("=" * 70) print("\n1. Update Backend Models (app/models/models.py)") print("2. Update Backend Services (app/services/services.py)") print("3. Update Backend Routes (app/routes/routes.py)") print("4. Update Frontend (user and machine modules)") print("5. Restart Backend: python app.py") print("6. Test the implementation") print("\n" + "=" * 70 + "\n") def run_migration(self): """Run the complete migration process""" try: self.print_header("šŸš€ Option 3 Migration with Backup") # Step 1: Create backup if not self.create_backup(): print("āŒ Cannot proceed without backup") return False # Step 2: Check prerequisites self.check_prerequisites() # Step 3: Add assigned_to column self.add_assigned_to_column() # Step 4: Create refiller_machines table self.create_refiller_machines_table() # Step 5: Verify migration self.verify_migration() # Step 6: Print summary self.print_summary() return True except MigrationError as e: print(f"\nāŒ Migration Error: {str(e)}") self.restore_on_failure() return False except Exception as e: print(f"\nāŒ Unexpected Error: {str(e)}") import traceback traceback.print_exc() self.restore_on_failure() return False def list_backups(): """List all available backups""" print("\n" + "=" * 70) print(" AVAILABLE BACKUPS") print("=" * 70) # Find database path try: from app import create_app app = create_app() with app.app_context(): db_uri = app.config['SQLALCHEMY_DATABASE_URI'] if db_uri.startswith('sqlite:///'): db_path = db_uri.replace('sqlite:///', '') backup_manager = DatabaseBackupManager(Path(db_path)) backups = backup_manager.list_backups() if not backups: print("\nšŸ“‚ No backups found") return print(f"\nšŸ“‚ Found {len(backups)} backup(s):") print("=" * 70) for i, backup in enumerate(backups, 1): size = backup.stat().st_size / (1024 * 1024) mtime = datetime.fromtimestamp(backup.stat().st_mtime) print(f"{i:2d}. {backup.name}") print(f" Date: {mtime.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Size: {size:.2f} MB") print() print("=" * 70 + "\n") else: print("āŒ Only SQLite databases are supported") except Exception as e: print(f"āŒ Error: {e}") def restore_from_backup(): """Interactive restore from backup""" print("\n" + "=" * 70) print(" RESTORE FROM BACKUP") print("=" * 70) try: from app import create_app app = create_app() with app.app_context(): db_uri = app.config['SQLALCHEMY_DATABASE_URI'] if db_uri.startswith('sqlite:///'): db_path = db_uri.replace('sqlite:///', '') backup_manager = DatabaseBackupManager(Path(db_path)) backups = backup_manager.list_backups() if not backups: print("\nāŒ No backups found") return # List backups print(f"\nšŸ“‚ Available backups:") print("=" * 70) for i, backup in enumerate(backups, 1): size = backup.stat().st_size / (1024 * 1024) mtime = datetime.fromtimestamp(backup.stat().st_mtime) print(f"{i:2d}. {backup.name}") print(f" Date: {mtime.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Size: {size:.2f} MB") print() print("=" * 70) # Get user choice choice = input("\nEnter backup number to restore (or 'q' to quit): ").strip() if choice.lower() == 'q': print("āŒ Restore cancelled") return try: index = int(choice) - 1 if index < 0 or index >= len(backups): print("āŒ Invalid backup number") return backup_file = backups[index] # Confirm print(f"\nāš ļø WARNING: This will replace your current database!") print(f" Backup: {backup_file.name}") confirm = input("\nType 'yes' to confirm restore: ").strip().lower() if confirm != 'yes': print("āŒ Restore cancelled") return # Restore backup_manager.restore_backup(backup_file) print("\nāš ļø Remember to restart your Flask server!") except ValueError: print("āŒ Invalid input") except Exception as e: print(f"āŒ Restore failed: {e}") else: print("āŒ Only SQLite databases are supported") except Exception as e: print(f"āŒ Error: {e}") def main(): """Main entry point""" print("\n" + "=" * 70) print(" VENDING MACHINE MANAGEMENT SYSTEM") print(" Database Migration with Backup & Restore") print(" Version: 2.0.0") print("=" * 70) if len(sys.argv) > 1: command = sys.argv[1].lower() if command == 'list': list_backups() return elif command == 'restore': restore_from_backup() return elif command == 'migrate': pass # Continue to migration else: print(f"\nāŒ Unknown command: {command}") print("\nUsage:") print(" python migrate_with_backup.py # Run migration") print(" python migrate_with_backup.py migrate # Run migration") print(" python migrate_with_backup.py list # List backups") print(" python migrate_with_backup.py restore # Restore from backup") return # Run migration migration = Option3MigrationWithBackup() if not migration.initialize(): print("\nāŒ Failed to initialize. Please check your setup.") sys.exit(1) print("\nThis migration will:") print(" 1. āœ… Create backup of current database") print(" 2. āœ… Add users.assigned_to column") print(" 3. āœ… Create refiller_machines table") print(" 4. āœ… Verify all changes") print(" 5. āœ… Auto-restore on failure") response = input("\nProceed with migration? (yes/no): ").strip().lower() if response != 'yes': print("\nāŒ Migration cancelled") sys.exit(0) success = migration.run_migration() if success: print("\nāœ… Migration completed successfully!") sys.exit(0) else: print("\nāŒ Migration failed! Database has been restored from backup.") sys.exit(1) if __name__ == '__main__': main()