import uuid import datetime import datetime import re import os import json import serial import time import threading from typing import Optional, Dict, Any import smtplib from email.mime.text import MIMEText from app import db from app.models.models import Machine, User, Product, VendingSlot, Transaction, Role from flask import current_app import secrets from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() class SerialCommunicationService: def __init__(self): self.serial_port: Optional[serial.Serial] = None self.is_connected = False self.response_buffer = "" self.last_response = None self.response_lock = threading.Lock() def connect_to_machine(self, port: str = '/dev/ttyUSB0', baudrate: int = 9600) -> bool: """ Connect to vending machine via serial port Common ports: - Linux: /dev/ttyUSB0, /dev/ttyACM0 - Windows: COM3, COM4, etc. - macOS: /dev/cu.usbserial-* """ try: self.serial_port = serial.Serial( port=port, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1 # 1 second timeout for reads ) # Test connection with a status command time.sleep(2) # Allow time for Arduino to reset self.serial_port.write(b'STATUS\n') time.sleep(1) if self.serial_port.in_waiting > 0: response = self.serial_port.readline().decode('utf-8').strip() print(f"Machine connected successfully. Status: {response}") self.is_connected = True return True else: print("No response from machine") self.serial_port.close() return False except Exception as e: print(f"Failed to connect to machine: {e}") self.is_connected = False return False def send_dispense_command(self, slot_id: str, timeout: int = 10) -> Dict[str, Any]: """ Send dispense command and wait for drop sensor response """ if not self.is_connected or not self.serial_port: return { 'success': False, 'error': 'Not connected to machine', 'response': 'NO_CONNECTION' } try: # Clear any pending data self.serial_port.reset_input_buffer() # Send dispense command command = f"DISPENSE:{slot_id}\n" self.serial_port.write(command.encode('utf-8')) print(f"Sent command: {command.strip()}") # Wait for response with timeout start_time = time.time() response = "" while time.time() - start_time < timeout: if self.serial_port.in_waiting > 0: line = self.serial_port.readline().decode('utf-8').strip() print(f"Received: {line}") if line in ['PRODUCT_DISPENSED', 'DISPENSE_FAILED', 'SLOT_EMPTY', 'MOTOR_ERROR']: response = line break time.sleep(0.1) # Small delay to prevent busy waiting if not response: return { 'success': False, 'error': 'Timeout waiting for drop sensor', 'response': 'TIMEOUT' } success = response == 'PRODUCT_DISPENSED' return { 'success': success, 'response': response, 'slot_id': slot_id, 'message': self._get_response_message(response, slot_id) } except Exception as e: print(f"Error during dispensing: {e}") return { 'success': False, 'error': str(e), 'response': 'COMMUNICATION_ERROR' } def _get_response_message(self, response: str, slot_id: str) -> str: """Get user-friendly message for response codes""" messages = { 'PRODUCT_DISPENSED': f'Product successfully dispensed from slot {slot_id}', 'DISPENSE_FAILED': f'Dispensing failed - no product detected from slot {slot_id}', 'SLOT_EMPTY': f'Slot {slot_id} is empty', 'MOTOR_ERROR': f'Motor malfunction in slot {slot_id}', 'TIMEOUT': f'Timeout - no response from slot {slot_id}', 'COMMUNICATION_ERROR': 'Communication error with machine' } return messages.get(response, f'Unknown response: {response}') def check_machine_status(self) -> Dict[str, Any]: """Check overall machine status""" if not self.is_connected or not self.serial_port: return {'status': 'disconnected', 'message': 'Not connected to machine'} try: self.serial_port.reset_input_buffer() self.serial_port.write(b'STATUS\n') time.sleep(1) if self.serial_port.in_waiting > 0: response = self.serial_port.readline().decode('utf-8').strip() return { 'status': 'connected', 'response': response, 'message': 'Machine is responding' } else: return { 'status': 'no_response', 'message': 'Machine not responding' } except Exception as e: return { 'status': 'error', 'error': str(e), 'message': 'Communication error' } def reset_machine(self) -> Dict[str, Any]: """Send reset command to clear any error states""" if not self.is_connected or not self.serial_port: return {'success': False, 'error': 'Not connected to machine'} try: self.serial_port.write(b'RESET\n') time.sleep(2) response = self.serial_port.readline().decode('utf-8').strip() return { 'success': True, 'response': response, 'message': 'Reset command sent' } except Exception as e: return {'success': False, 'error': str(e)} def disconnect(self): """Disconnect from serial port""" if self.serial_port and self.serial_port.is_open: self.serial_port.close() self.is_connected = False print("Disconnected from vending machine") # Global instance serial_service = SerialCommunicationService() # Machine Service (UNCHANGED) class MachineService: @staticmethod def generate_id(prefix): return f"{prefix}{uuid.uuid4().hex[:8].upper()}" @staticmethod def generate_unique_password(): while True: password = secrets.token_hex(4).upper() # e.g., "A1B2C3D4" if not Machine.query.filter_by(password=password).first(): return password @staticmethod def validate_machine_data(data): required_fields = ['machine_model', 'machine_type', 'client_name', 'branch_name', 'operation_status', 'connection_status'] for field in required_fields: if field not in data: raise ValueError(f"Missing required field: {field}") status_mapping = { 'connected': 'connected', 'disconnected': 'disconnected', 'maintenance':'maintenance', 'active': 'active', 'inactive': 'inactive' } if data['operation_status'] not in status_mapping: raise ValueError(f"Invalid operation status. Allowed statuses: {list(status_mapping.keys())}") data['operation_status'] = status_mapping[data['operation_status']] if data['connection_status'] not in status_mapping: raise ValueError(f"Invalid connection status. Allowed statuses: {list(status_mapping.keys())}") data['connection_status'] = status_mapping[data['connection_status']] @staticmethod def get_all_machines(): return Machine.query.all() @staticmethod def get_machine_slots(machine_id: str): machine = Machine.query.filter_by(machine_id=machine_id).first() if not machine: return None slots = VendingSlot.query.filter_by(machine_id=machine_id).all() product_ids = [slot.product_id for slot in slots if slot.product_id] products = {p.product_id: p for p in Product.query.filter(Product.product_id.in_(product_ids)).all()} if product_ids else {} return [{ 'slot_name': slot.row_id, 'column': int(slot.slot_name[1:]) if slot.slot_name[1:].isdigit() else 1, 'enabled': slot.enabled, 'product_id': slot.product_id, 'units': slot.units or 0, 'price': slot.price or '0.0', 'product_name': products.get(slot.product_id, {'product_name': 'N/A'}).product_name if slot.product_id else 'N/A', 'product_image': products.get(slot.product_id, {'product_image': None}).product_image if slot.product_id else None } for slot in slots] @staticmethod def create_machine(data): # Modified validation to accept client_id instead of client_name required_fields = ['machine_model', 'machine_type', 'client_id', 'branch_name', 'operation_status', 'connection_status'] for field in required_fields: if field not in data: raise ValueError(f"Missing required field: {field}") status_mapping = { 'connected': 'connected', 'disconnected': 'disconnected', 'maintenance':'maintenance', 'active': 'active', 'inactive': 'inactive' } if data['operation_status'] not in status_mapping: raise ValueError(f"Invalid operation status. Allowed statuses: {list(status_mapping.keys())}") data['operation_status'] = status_mapping[data['operation_status']] if data['connection_status'] not in status_mapping: raise ValueError(f"Invalid connection status. Allowed statuses: {list(status_mapping.keys())}") data['connection_status'] = status_mapping[data['connection_status']] machine_id = MachineService.generate_id("M") branch_id = MachineService.generate_id("B") created_on = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") password = MachineService.generate_unique_password() # Validate client_id exists in users client_id = data.get('client_id') if not client_id or not User.query.get(client_id): raise ValueError("Invalid or missing client_id") # Get client_name from user user = User.query.get(client_id) client_name = user.username new_machine = Machine( machine_id=machine_id, machine_model=data['machine_model'], machine_type=data['machine_type'], client_id=client_id, client_name=client_name, # Set client_name from user record branch_id=branch_id, branch_name=data['branch_name'], operation_status=data['operation_status'], connection_status=data['connection_status'], created_on=created_on, password=password ) db.session.add(new_machine) db.session.commit() return new_machine @staticmethod def update_machine(id, data): machine = Machine.query.get_or_404(id) if any(field in data for field in ['machine_model', 'machine_type', 'branch_name', 'operation_status', 'connection_status']): MachineService.validate_machine_data({k: data.get(k, getattr(machine, k)) for k in ['machine_model', 'machine_type', 'branch_name', 'operation_status', 'connection_status']}) machine.machine_model = data.get('machine_model', machine.machine_model) machine.machine_type = data.get('machine_type', machine.machine_type) if 'client_id' in data: client_id = data['client_id'] if not User.query.get(client_id): raise ValueError("Invalid client_id") machine.client_id = client_id machine.branch_name = data.get('branch_name', machine.branch_name) machine.operation_status = data.get('operation_status', machine.operation_status) machine.connection_status = data.get('connection_status', machine.connection_status) db.session.commit() return machine @staticmethod def delete_machine(id): machine = Machine.query.get_or_404(id) db.session.delete(machine) db.session.commit() return True @staticmethod def get_machine_by_id(id): return Machine.query.get_or_404(id) @staticmethod def get_vending_state(machine_id): machine = Machine.query.filter_by(machine_id=machine_id).first_or_404() slots = VendingSlot.query.filter_by(machine_id=machine_id).all() vending_rows = {} for slot in slots: if slot.row_id not in vending_rows: vending_rows[slot.row_id] = [] vending_rows[slot.row_id].append(slot.to_dict()) return { 'vendingRows': [{'rowId': row_id, 'slots': slots} for row_id, slots in vending_rows.items()], 'lastUpdated': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } @staticmethod def update_vending_state(machine_id, vending_rows): machine = Machine.query.filter_by(machine_id=machine_id).first_or_404() VendingSlot.query.filter_by(machine_id=machine_id).delete() for row in vending_rows: row_id = row['rowId'] for slot_data in row['slots']: new_slot = VendingSlot( machine_id=machine_id, row_id=row_id, slot_name=slot_data['name'], enabled=slot_data['enabled'], product_id=slot_data['productId'], units=slot_data['units'], price=slot_data['price'] ) db.session.add(new_slot) db.session.commit() return {'success': True, 'message': 'Vending state updated', 'lastUpdated': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} # User Service (MODIFIED FOR BREVO SMTP WITH ENV VARIABLES) # Updated UserService in services.py class UserService: @staticmethod def generate_unique_password(): """Generate a secure random password""" return secrets.token_hex(4).upper() # 8 character hex password @staticmethod def save_file(file, folder='user_uploads'): """Save uploaded file and return path""" if not file: return None filename = f"{uuid.uuid4().hex}_{file.filename}" upload_folder = os.path.join(current_app.config['UPLOAD_FOLDER'], folder) os.makedirs(upload_folder, exist_ok=True) file_path = os.path.join(upload_folder, filename) file.save(file_path) return f"/uploads/{folder}/{filename}" @staticmethod def save_multiple_documents(files, metadata_json=None): """Save multiple document files with metadata""" document_paths = [] metadata_list = [] if metadata_json: try: metadata_list = json.loads(metadata_json) print(f"Parsed metadata: {metadata_list}") except Exception as e: print(f"Error parsing metadata: {e}") metadata_list = [] if not isinstance(files, list): files = [files] for idx, file in enumerate(files): if file and hasattr(file, 'filename'): try: path = UserService.save_file(file, 'user_documents') if path: metadata = metadata_list[idx] if idx < len(metadata_list) else {} document_paths.append({ 'filename': file.filename, 'path': path, 'uploaded_at': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'document_type': metadata.get('document_type', 'other'), 'document_type_other': metadata.get('document_type_other', '') }) print(f"Saved document: {file.filename}") except Exception as e: print(f"Error saving file {file.filename}: {e}") continue return document_paths @staticmethod def send_email_notification(username, email, contact, password, user_id): """Send email notification with user credentials""" subject = "Your Account Credentials" body = f""" Dear {username}, Your account has been created successfully! Login Credentials: ================== User ID: {user_id} Username: {username} Email: {email} Contact: {contact} Password: {password} Important Security Notice: - Please change your password after first login - Keep this information secure and confidential - Do not share your credentials with anyone You can login using either your email or username along with your password. Best regards, System Administration Team """ msg = MIMEText(body) msg['Subject'] = subject msg['From'] = os.getenv('BREVO_SENDER_EMAIL') msg['To'] = email smtp_email = os.getenv('BREVO_SMTP_EMAIL') smtp_key = os.getenv('BREVO_SMTP_KEY') if not all([smtp_email, smtp_key, msg['From']]): print("Error: Brevo SMTP configuration missing in .env") return False try: with smtplib.SMTP('smtp-relay.brevo.com', 587) as server: server.starttls() server.login(smtp_email, smtp_key) server.send_message(msg) print(f"✓ Email sent successfully to {email}") return True except Exception as e: print(f"✗ Failed to send email to {email}: {str(e)}") return False @staticmethod def log_sms_notification(contact, username, email, password, user_id): """Log SMS notification (for future SMS integration)""" notification = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "user_id": user_id, "username": username, "email": email, "contact": contact, "password": password, "message": f"Your account credentials: User ID: {user_id}, Username: {username}, Email: {email}, Password: {password}" } notifications_file = os.path.join(current_app.root_path, "sms_notifications.json") try: notifications = [] if os.path.exists(notifications_file): with open(notifications_file, "r") as f: notifications = json.load(f) notifications.append(notification) with open(notifications_file, "w") as f: json.dump(notifications, f, indent=4) print(f"✓ SMS notification logged for {contact}") return True except Exception as e: print(f"✗ Failed to log SMS notification: {str(e)}") return False @staticmethod def validate_user_data(data): """Validate user data before creating/updating""" import re required_fields = ['username', 'email', 'contact', 'roles', 'user_status'] for field in required_fields: if field not in data: raise ValueError(f"Missing required field: {field}") # Email validation - FIXED: Added closing quote email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_regex, data['email']): raise ValueError("Invalid email format") @staticmethod def get_all_users(): """Get all users from database""" return User.query.all() @staticmethod def create_user(data, photo_file=None, logo_file=None, document_files=None, documents_metadata=None): """ Create new user with proper password hashing Password must be provided in data """ print(f"\n{'='*60}") print(f"CREATING NEW USER") print(f"{'='*60}") # Validate user data UserService.validate_user_data(data) # CHECK: Password must be provided if 'password' not in data or not data['password']: raise ValueError("Password is required") # Generate user ID only user_id = User.generate_user_id(data['username'], data['email']) password = data['password'] # Use provided password, not auto-generated print(f"Generated User ID: {user_id}") print(f"Password provided by admin") # Handle file uploads photo_path = UserService.save_file(photo_file, 'user_photos') if photo_file else None logo_path = UserService.save_file(logo_file, 'company_logos') if logo_file else None documents = UserService.save_multiple_documents(document_files, documents_metadata) if document_files else [] # Create new user new_user = User( user_id=user_id, username=data['username'], email=data['email'], contact=data['contact'], roles=data['roles'], user_status=data['user_status'], photo=photo_path, company_logo=logo_path, documents=json.dumps(documents) ) # Set password (this will hash it automatically) new_user.set_password(password) print(f"Password hashed and set for user") # Save to database db.session.add(new_user) db.session.commit() print(f"✓ User created successfully with ID: {new_user.id}") print(f"{'='*60}\n") # Optional: Send email notification with the password they set # UserService.send_email_notification(...) return new_user @staticmethod def update_user(id, data, photo_file=None, logo_file=None, document_files=None, documents_metadata=None): """ Update existing user Password is only updated if explicitly provided in data """ user = User.query.get_or_404(id) print(f"\n{'='*60}") print(f"UPDATING USER: {user.username}") print(f"{'='*60}") # Validate only if fields are being updated if any(field in data for field in ['username', 'email', 'contact', 'roles', 'user_status']): UserService.validate_user_data({ 'username': data.get('username', user.username), 'email': data.get('email', user.email), 'contact': data.get('contact', user.contact), 'roles': data.get('roles', user.roles), 'user_status': data.get('user_status', user.user_status) }) # Update basic fields user.username = data.get('username', user.username) user.email = data.get('email', user.email) user.contact = data.get('contact', user.contact) user.roles = data.get('roles', user.roles) user.user_status = data.get('user_status', user.user_status) # Update password if provided (will be hashed automatically) if 'password' in data and data['password']: print(f"Updating password for user") user.set_password(data['password']) # Handle file updates if photo_file: if user.photo: old_path = os.path.join(current_app.config['UPLOAD_FOLDER'], user.photo.replace('/uploads/', '')) if os.path.exists(old_path): os.remove(old_path) user.photo = UserService.save_file(photo_file, 'user_photos') if logo_file: if user.company_logo: old_path = os.path.join(current_app.config['UPLOAD_FOLDER'], user.company_logo.replace('/uploads/', '')) if os.path.exists(old_path): os.remove(old_path) user.company_logo = UserService.save_file(logo_file, 'company_logos') if document_files: existing_docs = json.loads(user.documents) if user.documents else [] new_docs = UserService.save_multiple_documents(document_files, documents_metadata) existing_docs.extend(new_docs) user.documents = json.dumps(existing_docs) user.updated_at = datetime.datetime.utcnow() db.session.commit() print(f"✓ User updated successfully") print(f"{'='*60}\n") return user @staticmethod def delete_user_document(user_id, document_path): """Delete a specific document from a user""" user = User.query.get_or_404(user_id) if user.documents: documents = json.loads(user.documents) documents = [doc for doc in documents if doc['path'] != document_path] user.documents = json.dumps(documents) # Delete physical file file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_path.replace('/uploads/', '')) if os.path.exists(file_path): os.remove(file_path) db.session.commit() return True return False @staticmethod def delete_user(id): """Delete user and all associated files""" user = User.query.get_or_404(id) # Delete all associated files if user.photo: photo_path = os.path.join(current_app.config['UPLOAD_FOLDER'], user.photo.replace('/uploads/', '')) if os.path.exists(photo_path): os.remove(photo_path) if user.company_logo: logo_path = os.path.join(current_app.config['UPLOAD_FOLDER'], user.company_logo.replace('/uploads/', '')) if os.path.exists(logo_path): os.remove(logo_path) if user.documents: documents = json.loads(user.documents) for doc in documents: doc_path = os.path.join(current_app.config['UPLOAD_FOLDER'], doc['path'].replace('/uploads/', '')) if os.path.exists(doc_path): os.remove(doc_path) db.session.delete(user) db.session.commit() return True @staticmethod def validate_file(file, maxSizeMB=10): """Validate file size""" max_size_bytes = maxSizeMB * 1024 * 1024 if file.size > max_size_bytes: return f"File size exceeds {maxSizeMB}MB limit" return None @staticmethod def is_valid_image_type(file): """Check if file is valid image type""" valid_types = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif'] return valid_types.includes(file.type) if hasattr(file, 'type') else False @staticmethod def is_valid_document_type(file): """Check if file is valid document type""" valid_types = [ 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'image/jpeg', 'image/jpg', 'image/png' ] return valid_types.includes(file.type) if hasattr(file, 'type') else False # Product Service (UNCHANGED) class ProductService: @staticmethod def generate_id(prefix): return f"{prefix}{uuid.uuid4().hex[:4].upper()}" @staticmethod def validate_product_data(data): required_fields = ['product_name', 'price'] for field in required_fields: if field not in data or not data[field]: raise ValueError(f"Missing or empty required field: {field}") try: price = float(data['price']) if price < 0: raise ValueError("Price must be a positive number") except (ValueError, TypeError): raise ValueError("Price must be a valid number") @staticmethod def save_image(file): if not file: raise ValueError("Product image is required") filename = f"{uuid.uuid4().hex}_{file.filename}" file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) file.save(file_path) return f"/uploads/{filename}" @staticmethod def get_all_products(): return Product.query.all() @staticmethod def create_product(data, file): ProductService.validate_product_data(data) image_path = ProductService.save_image(file) product_id = ProductService.generate_id("P") created_date = data.get('created_date', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) new_product = Product( product_id=product_id, product_name=data['product_name'], price=float(data['price']), product_image=image_path, created_date=created_date ) db.session.add(new_product) db.session.commit() return new_product @staticmethod def update_product(id, data, file): product = Product.query.get_or_404(id) ProductService.validate_product_data(data) product.product_name = data.get('product_name', product.product_name) product.price = float(data.get('price', product.price)) product.created_date = data.get('created_date', product.created_date) if file: if product.product_image: old_image_path = os.path.join(current_app.config['UPLOAD_FOLDER'], product.product_image.split('/')[-1]) if os.path.exists(old_image_path): os.remove(old_image_path) product.product_image = ProductService.save_image(file) db.session.commit() return product @staticmethod def delete_product(id): product = Product.query.get_or_404(id) if product.product_image: image_path = os.path.join(current_app.config['UPLOAD_FOLDER'], product.product_image.split('/')[-1]) if os.path.exists(image_path): os.remove(image_path) db.session.delete(product) db.session.commit() return True class TransactionService: @staticmethod def create_transaction(machine_id, product_name, quantity, amount, payment_type, status='Success'): """Create a new transaction record""" transaction_id = f"TXN-{uuid.uuid4().hex[:8].upper()}" new_transaction = Transaction( transaction_id=transaction_id, machine_id=machine_id, product_name=product_name, quantity=quantity, amount=amount, payment_type=payment_type, status=status ) db.session.add(new_transaction) db.session.commit() return new_transaction @staticmethod def update_transaction_status(transaction_id, status, amount_receiving_status=None, return_amount=None): """Update transaction status after dispensing""" transaction = Transaction.query.filter_by(transaction_id=transaction_id).first() if transaction: transaction.status = status if amount_receiving_status: transaction.amount_receiving_status = amount_receiving_status if return_amount: transaction.return_amount = return_amount db.session.commit() return transaction @staticmethod def get_all_transactions(filters=None): """Get all transactions with optional filters""" query = Transaction.query if filters: if 'machine_id' in filters: query = query.filter_by(machine_id=filters['machine_id']) if 'status' in filters: query = query.filter_by(status=filters['status']) if 'date_from' in filters: query = query.filter(Transaction.created_at >= filters['date_from']) if 'date_to' in filters: query = query.filter(Transaction.created_at <= filters['date_to']) return query.order_by(Transaction.created_at.desc()).all() @staticmethod def get_transaction_by_id(transaction_id): """Get single transaction by ID""" return Transaction.query.filter_by(transaction_id=transaction_id).first() class RoleService: @staticmethod def get_all_roles(): """Get all roles""" return Role.query.all() @staticmethod def get_role_by_id(role_id): """Get role by ID""" return Role.query.get_or_404(role_id) @staticmethod def create_role(data): """Create new role""" name = data.get('name') description = data.get('description', '') permissions = data.get('permissions', []) # Check if role already exists existing_role = Role.query.filter_by(name=name).first() if existing_role: raise ValueError(f"Role '{name}' already exists") new_role = Role( name=name, description=description, permissions=json.dumps(permissions) ) db.session.add(new_role) db.session.commit() return new_role @staticmethod def update_role(role_id, data): """Update existing role""" role = Role.query.get_or_404(role_id) if 'name' in data: # Check if new name conflicts with existing role existing = Role.query.filter(Role.name == data['name'], Role.id != role_id).first() if existing: raise ValueError(f"Role '{data['name']}' already exists") role.name = data['name'] if 'description' in data: role.description = data['description'] if 'permissions' in data: role.permissions = json.dumps(data['permissions']) role.updated_at = datetime.datetime.utcnow() db.session.commit() return role @staticmethod def delete_role(role_id): """Delete role""" role = Role.query.get_or_404(role_id) # Check if any users have this role users_with_role = User.query.filter_by(roles=role.name).count() if users_with_role > 0: raise ValueError(f"Cannot delete role '{role.name}' - {users_with_role} user(s) still assigned to it") db.session.delete(role) db.session.commit() return True