603 lines
22 KiB
Python
603 lines
22 KiB
Python
"""
|
|
Complete Migration Script with Integrated Backup & Restore
|
|
Option 3: Client + Machine Assignment
|
|
|
|
FLOW:
|
|
1. Backup current database
|
|
2. Run migration
|
|
3. Verify migration success
|
|
4. On failure: Auto-restore from backup
|
|
|
|
Usage: python migrate_with_backup.py
|
|
|
|
Author: System
|
|
Date: 2025-01-25
|
|
Version: 2.0.0
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.dirname(current_dir)
|
|
sys.path.insert(0, parent_dir)
|
|
|
|
|
|
class DatabaseBackupManager:
|
|
"""Handles database backup and restore operations"""
|
|
|
|
def __init__(self, db_path):
|
|
self.db_path = Path(db_path)
|
|
self.backup_dir = self.db_path.parent / 'backups'
|
|
self.current_backup = None
|
|
|
|
def ensure_backup_dir(self):
|
|
"""Create backup directory if it doesn't exist"""
|
|
self.backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def create_backup(self):
|
|
"""Create timestamped backup of database"""
|
|
if not self.db_path.exists():
|
|
raise FileNotFoundError(f"Database not found: {self.db_path}")
|
|
|
|
self.ensure_backup_dir()
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_file = self.backup_dir / f'machines_backup_{timestamp}.db'
|
|
|
|
try:
|
|
# Create backup
|
|
shutil.copy2(self.db_path, backup_file)
|
|
self.current_backup = backup_file
|
|
|
|
# Get file size
|
|
size = backup_file.stat().st_size / (1024 * 1024)
|
|
|
|
print(f"✅ Backup created successfully!")
|
|
print(f" File: {backup_file.name}")
|
|
print(f" Size: {size:.2f} MB")
|
|
print(f" Location: {backup_file}")
|
|
|
|
return backup_file
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Backup failed: {e}")
|
|
|
|
def restore_backup(self, backup_file=None):
|
|
"""Restore database from backup"""
|
|
if backup_file is None:
|
|
backup_file = self.current_backup
|
|
|
|
if backup_file is None:
|
|
raise ValueError("No backup file specified")
|
|
|
|
if not backup_file.exists():
|
|
raise FileNotFoundError(f"Backup file not found: {backup_file}")
|
|
|
|
try:
|
|
# Create safety backup of current database
|
|
if self.db_path.exists():
|
|
safety_backup = self.backup_dir / f'machines_before_restore_{datetime.now().strftime("%Y%m%d_%H%M%S")}.db'
|
|
shutil.copy2(self.db_path, safety_backup)
|
|
print(f"✅ Safety backup created: {safety_backup.name}")
|
|
|
|
# Restore from backup
|
|
shutil.copy2(backup_file, self.db_path)
|
|
|
|
print(f"✅ Database restored successfully!")
|
|
print(f" Restored from: {backup_file.name}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Restore failed: {e}")
|
|
|
|
def list_backups(self):
|
|
"""List all available backups"""
|
|
self.ensure_backup_dir()
|
|
backups = sorted(self.backup_dir.glob('machines_backup_*.db'), reverse=True)
|
|
return backups
|
|
|
|
def cleanup_old_backups(self, keep_count=10):
|
|
"""Keep only the most recent N backups"""
|
|
backups = self.list_backups()
|
|
|
|
if len(backups) <= keep_count:
|
|
print(f"✅ Cleanup: Keeping all {len(backups)} backups")
|
|
return
|
|
|
|
to_delete = backups[keep_count:]
|
|
|
|
print(f"🗑️ Cleanup: Keeping {keep_count} most recent backups")
|
|
print(f" Deleting {len(to_delete)} old backups...")
|
|
|
|
for backup in to_delete:
|
|
try:
|
|
backup.unlink()
|
|
print(f" ✓ Deleted: {backup.name}")
|
|
except Exception as e:
|
|
print(f" ✗ Failed to delete {backup.name}: {e}")
|
|
|
|
|
|
class MigrationError(Exception):
|
|
"""Custom exception for migration errors"""
|
|
pass
|
|
|
|
|
|
class Option3MigrationWithBackup:
|
|
"""Complete migration with integrated backup and restore"""
|
|
|
|
def __init__(self):
|
|
self.app = None
|
|
self.inspector = None
|
|
self.changes_made = []
|
|
self.backup_manager = None
|
|
self.backup_file = None
|
|
|
|
def print_header(self, title):
|
|
"""Print formatted header"""
|
|
print("\n" + "=" * 70)
|
|
print(f" {title}")
|
|
print("=" * 70)
|
|
|
|
def print_step(self, step_num, description):
|
|
"""Print step information"""
|
|
print(f"\n[Step {step_num}] {description}")
|
|
print("-" * 70)
|
|
|
|
def initialize(self):
|
|
"""Initialize Flask app and database"""
|
|
try:
|
|
from app import create_app, db
|
|
from sqlalchemy import inspect, text
|
|
|
|
self.app = create_app()
|
|
self.db = db
|
|
self.text = text
|
|
|
|
# Get database path
|
|
with self.app.app_context():
|
|
db_uri = self.app.config['SQLALCHEMY_DATABASE_URI']
|
|
if db_uri.startswith('sqlite:///'):
|
|
db_path = db_uri.replace('sqlite:///', '')
|
|
self.db_path = Path(db_path)
|
|
self.backup_manager = DatabaseBackupManager(self.db_path)
|
|
else:
|
|
raise ValueError("This script only supports SQLite databases")
|
|
|
|
self.inspector = inspect(db.engine)
|
|
|
|
return True
|
|
|
|
except ImportError as e:
|
|
print(f"❌ Error importing modules: {e}")
|
|
print("Make sure you're running this from the backend directory")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Initialization failed: {e}")
|
|
return False
|
|
|
|
def create_backup(self):
|
|
"""Create database backup"""
|
|
self.print_step(1, "Creating Database Backup")
|
|
|
|
try:
|
|
self.backup_file = self.backup_manager.create_backup()
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Backup failed: {e}")
|
|
return False
|
|
|
|
def check_prerequisites(self):
|
|
"""Check if all required tables exist"""
|
|
self.print_step(2, "Checking Prerequisites")
|
|
|
|
with self.app.app_context():
|
|
tables = self.inspector.get_table_names()
|
|
|
|
required_tables = ['users', 'machines']
|
|
missing_tables = [t for t in required_tables if t not in tables]
|
|
|
|
if missing_tables:
|
|
raise MigrationError(
|
|
f"Required tables missing: {', '.join(missing_tables)}\n"
|
|
"Please ensure your database is properly initialized."
|
|
)
|
|
|
|
print("✅ All required tables exist")
|
|
print(f"✅ Found tables: {', '.join(tables)}")
|
|
|
|
def add_assigned_to_column(self):
|
|
"""Add assigned_to column to users table if not exists"""
|
|
self.print_step(3, "Adding assigned_to Column to Users Table")
|
|
|
|
with self.app.app_context():
|
|
columns = [col['name'] for col in self.inspector.get_columns('users')]
|
|
|
|
if 'assigned_to' in columns:
|
|
print("⚠️ assigned_to column already exists - skipping")
|
|
return False
|
|
|
|
try:
|
|
print("Creating assigned_to column...")
|
|
|
|
# Add column
|
|
self.db.session.execute(self.text("""
|
|
ALTER TABLE users
|
|
ADD COLUMN assigned_to INTEGER NULL
|
|
"""))
|
|
|
|
# Add foreign key constraint
|
|
self.db.session.execute(self.text("""
|
|
ALTER TABLE users
|
|
ADD CONSTRAINT fk_users_assigned_to
|
|
FOREIGN KEY (assigned_to)
|
|
REFERENCES users(id)
|
|
ON DELETE SET NULL
|
|
"""))
|
|
|
|
self.db.session.commit()
|
|
|
|
print("✅ assigned_to column created successfully")
|
|
print("✅ Foreign key constraint added")
|
|
|
|
self.changes_made.append('assigned_to_column')
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.db.session.rollback()
|
|
raise MigrationError(f"Failed to add assigned_to column: {str(e)}")
|
|
|
|
def create_refiller_machines_table(self):
|
|
"""Create refiller_machines junction table if not exists"""
|
|
self.print_step(4, "Creating refiller_machines Junction Table")
|
|
|
|
with self.app.app_context():
|
|
tables = self.inspector.get_table_names()
|
|
|
|
if 'refiller_machines' in tables:
|
|
print("⚠️ refiller_machines table already exists - skipping")
|
|
return False
|
|
|
|
try:
|
|
print("Creating refiller_machines table...")
|
|
|
|
self.db.session.execute(self.text("""
|
|
CREATE TABLE refiller_machines (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
refiller_id INTEGER NOT NULL,
|
|
machine_id VARCHAR(10) NOT NULL,
|
|
assigned_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
assigned_by INTEGER NULL,
|
|
|
|
CONSTRAINT fk_refiller_machines_refiller
|
|
FOREIGN KEY (refiller_id)
|
|
REFERENCES users(id)
|
|
ON DELETE CASCADE,
|
|
|
|
CONSTRAINT fk_refiller_machines_machine
|
|
FOREIGN KEY (machine_id)
|
|
REFERENCES machines(machine_id)
|
|
ON DELETE CASCADE,
|
|
|
|
CONSTRAINT fk_refiller_machines_assigner
|
|
FOREIGN KEY (assigned_by)
|
|
REFERENCES users(id)
|
|
ON DELETE SET NULL,
|
|
|
|
CONSTRAINT unique_refiller_machine
|
|
UNIQUE (refiller_id, machine_id)
|
|
)
|
|
"""))
|
|
|
|
self.db.session.commit()
|
|
|
|
print("✅ refiller_machines table created successfully")
|
|
|
|
self.changes_made.append('refiller_machines_table')
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.db.session.rollback()
|
|
raise MigrationError(f"Failed to create refiller_machines table: {str(e)}")
|
|
|
|
def verify_migration(self):
|
|
"""Verify all changes were applied correctly"""
|
|
self.print_step(5, "Verifying Migration")
|
|
|
|
with self.app.app_context():
|
|
# Refresh inspector
|
|
from sqlalchemy import inspect
|
|
self.inspector = inspect(self.db.engine)
|
|
|
|
# Check assigned_to column
|
|
user_columns = [col['name'] for col in self.inspector.get_columns('users')]
|
|
if 'assigned_to' in user_columns:
|
|
print("✅ users.assigned_to column exists")
|
|
else:
|
|
raise MigrationError("Verification failed: assigned_to column not found")
|
|
|
|
# Check refiller_machines table
|
|
tables = self.inspector.get_table_names()
|
|
if 'refiller_machines' in tables:
|
|
print("✅ refiller_machines table exists")
|
|
|
|
# Verify columns
|
|
rm_columns = [col['name'] for col in self.inspector.get_columns('refiller_machines')]
|
|
expected_columns = ['id', 'refiller_id', 'machine_id', 'assigned_at', 'assigned_by']
|
|
|
|
for col in expected_columns:
|
|
if col in rm_columns:
|
|
print(f" ✅ Column '{col}' exists")
|
|
else:
|
|
raise MigrationError(f"Verification failed: Column '{col}' not found")
|
|
else:
|
|
raise MigrationError("Verification failed: refiller_machines table not found")
|
|
|
|
def restore_on_failure(self):
|
|
"""Restore database from backup on migration failure"""
|
|
self.print_header("🔄 RESTORING FROM BACKUP")
|
|
|
|
try:
|
|
if self.backup_file:
|
|
self.backup_manager.restore_backup(self.backup_file)
|
|
print("✅ Database restored successfully from backup")
|
|
print(" Your data is safe!")
|
|
else:
|
|
print("⚠️ No backup file available for restore")
|
|
except Exception as e:
|
|
print(f"❌ Restore failed: {e}")
|
|
print(" Please manually restore from backup")
|
|
|
|
def print_summary(self):
|
|
"""Print migration summary"""
|
|
self.print_header("✅ MIGRATION COMPLETED SUCCESSFULLY")
|
|
|
|
if not self.changes_made:
|
|
print("\n⚠️ No changes were made - all structures already exist")
|
|
else:
|
|
print(f"\n✅ Successfully applied {len(self.changes_made)} change(s):")
|
|
for i, change in enumerate(self.changes_made, 1):
|
|
print(f" {i}. {change}")
|
|
|
|
# Show backup information
|
|
if self.backup_file:
|
|
print(f"\n📦 Backup Information:")
|
|
print(f" Location: {self.backup_file}")
|
|
print(f" You can restore this backup if needed using:")
|
|
print(f" python migrate_with_backup.py restore")
|
|
|
|
# Cleanup old backups
|
|
print()
|
|
self.backup_manager.cleanup_old_backups(keep_count=10)
|
|
|
|
print("\n" + "=" * 70)
|
|
print("NEXT STEPS")
|
|
print("=" * 70)
|
|
print("\n1. Update Backend Models (app/models/models.py)")
|
|
print("2. Update Backend Services (app/services/services.py)")
|
|
print("3. Update Backend Routes (app/routes/routes.py)")
|
|
print("4. Update Frontend (user and machine modules)")
|
|
print("5. Restart Backend: python app.py")
|
|
print("6. Test the implementation")
|
|
print("\n" + "=" * 70 + "\n")
|
|
|
|
def run_migration(self):
|
|
"""Run the complete migration process"""
|
|
try:
|
|
self.print_header("🚀 Option 3 Migration with Backup")
|
|
|
|
# Step 1: Create backup
|
|
if not self.create_backup():
|
|
print("❌ Cannot proceed without backup")
|
|
return False
|
|
|
|
# Step 2: Check prerequisites
|
|
self.check_prerequisites()
|
|
|
|
# Step 3: Add assigned_to column
|
|
self.add_assigned_to_column()
|
|
|
|
# Step 4: Create refiller_machines table
|
|
self.create_refiller_machines_table()
|
|
|
|
# Step 5: Verify migration
|
|
self.verify_migration()
|
|
|
|
# Step 6: Print summary
|
|
self.print_summary()
|
|
|
|
return True
|
|
|
|
except MigrationError as e:
|
|
print(f"\n❌ Migration Error: {str(e)}")
|
|
self.restore_on_failure()
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Unexpected Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
self.restore_on_failure()
|
|
return False
|
|
|
|
|
|
def list_backups():
|
|
"""List all available backups"""
|
|
print("\n" + "=" * 70)
|
|
print(" AVAILABLE BACKUPS")
|
|
print("=" * 70)
|
|
|
|
# Find database path
|
|
try:
|
|
from app import create_app
|
|
app = create_app()
|
|
with app.app_context():
|
|
db_uri = app.config['SQLALCHEMY_DATABASE_URI']
|
|
if db_uri.startswith('sqlite:///'):
|
|
db_path = db_uri.replace('sqlite:///', '')
|
|
backup_manager = DatabaseBackupManager(Path(db_path))
|
|
|
|
backups = backup_manager.list_backups()
|
|
|
|
if not backups:
|
|
print("\n📂 No backups found")
|
|
return
|
|
|
|
print(f"\n📂 Found {len(backups)} backup(s):")
|
|
print("=" * 70)
|
|
|
|
for i, backup in enumerate(backups, 1):
|
|
size = backup.stat().st_size / (1024 * 1024)
|
|
mtime = datetime.fromtimestamp(backup.stat().st_mtime)
|
|
|
|
print(f"{i:2d}. {backup.name}")
|
|
print(f" Date: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f" Size: {size:.2f} MB")
|
|
print()
|
|
|
|
print("=" * 70 + "\n")
|
|
else:
|
|
print("❌ Only SQLite databases are supported")
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
|
|
|
|
def restore_from_backup():
|
|
"""Interactive restore from backup"""
|
|
print("\n" + "=" * 70)
|
|
print(" RESTORE FROM BACKUP")
|
|
print("=" * 70)
|
|
|
|
try:
|
|
from app import create_app
|
|
app = create_app()
|
|
with app.app_context():
|
|
db_uri = app.config['SQLALCHEMY_DATABASE_URI']
|
|
if db_uri.startswith('sqlite:///'):
|
|
db_path = db_uri.replace('sqlite:///', '')
|
|
backup_manager = DatabaseBackupManager(Path(db_path))
|
|
|
|
backups = backup_manager.list_backups()
|
|
|
|
if not backups:
|
|
print("\n❌ No backups found")
|
|
return
|
|
|
|
# List backups
|
|
print(f"\n📂 Available backups:")
|
|
print("=" * 70)
|
|
|
|
for i, backup in enumerate(backups, 1):
|
|
size = backup.stat().st_size / (1024 * 1024)
|
|
mtime = datetime.fromtimestamp(backup.stat().st_mtime)
|
|
|
|
print(f"{i:2d}. {backup.name}")
|
|
print(f" Date: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f" Size: {size:.2f} MB")
|
|
print()
|
|
|
|
print("=" * 70)
|
|
|
|
# Get user choice
|
|
choice = input("\nEnter backup number to restore (or 'q' to quit): ").strip()
|
|
|
|
if choice.lower() == 'q':
|
|
print("❌ Restore cancelled")
|
|
return
|
|
|
|
try:
|
|
index = int(choice) - 1
|
|
if index < 0 or index >= len(backups):
|
|
print("❌ Invalid backup number")
|
|
return
|
|
|
|
backup_file = backups[index]
|
|
|
|
# Confirm
|
|
print(f"\n⚠️ WARNING: This will replace your current database!")
|
|
print(f" Backup: {backup_file.name}")
|
|
confirm = input("\nType 'yes' to confirm restore: ").strip().lower()
|
|
|
|
if confirm != 'yes':
|
|
print("❌ Restore cancelled")
|
|
return
|
|
|
|
# Restore
|
|
backup_manager.restore_backup(backup_file)
|
|
print("\n⚠️ Remember to restart your Flask server!")
|
|
|
|
except ValueError:
|
|
print("❌ Invalid input")
|
|
except Exception as e:
|
|
print(f"❌ Restore failed: {e}")
|
|
else:
|
|
print("❌ Only SQLite databases are supported")
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
print("\n" + "=" * 70)
|
|
print(" VENDING MACHINE MANAGEMENT SYSTEM")
|
|
print(" Database Migration with Backup & Restore")
|
|
print(" Version: 2.0.0")
|
|
print("=" * 70)
|
|
|
|
if len(sys.argv) > 1:
|
|
command = sys.argv[1].lower()
|
|
|
|
if command == 'list':
|
|
list_backups()
|
|
return
|
|
elif command == 'restore':
|
|
restore_from_backup()
|
|
return
|
|
elif command == 'migrate':
|
|
pass # Continue to migration
|
|
else:
|
|
print(f"\n❌ Unknown command: {command}")
|
|
print("\nUsage:")
|
|
print(" python migrate_with_backup.py # Run migration")
|
|
print(" python migrate_with_backup.py migrate # Run migration")
|
|
print(" python migrate_with_backup.py list # List backups")
|
|
print(" python migrate_with_backup.py restore # Restore from backup")
|
|
return
|
|
|
|
# Run migration
|
|
migration = Option3MigrationWithBackup()
|
|
|
|
if not migration.initialize():
|
|
print("\n❌ Failed to initialize. Please check your setup.")
|
|
sys.exit(1)
|
|
|
|
print("\nThis migration will:")
|
|
print(" 1. ✅ Create backup of current database")
|
|
print(" 2. ✅ Add users.assigned_to column")
|
|
print(" 3. ✅ Create refiller_machines table")
|
|
print(" 4. ✅ Verify all changes")
|
|
print(" 5. ✅ Auto-restore on failure")
|
|
|
|
response = input("\nProceed with migration? (yes/no): ").strip().lower()
|
|
|
|
if response != 'yes':
|
|
print("\n❌ Migration cancelled")
|
|
sys.exit(0)
|
|
|
|
success = migration.run_migration()
|
|
|
|
if success:
|
|
print("\n✅ Migration completed successfully!")
|
|
sys.exit(0)
|
|
else:
|
|
print("\n❌ Migration failed! Database has been restored from backup.")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |