Files
IOT_application/Machine-Backend/app/services/services.py

967 lines
35 KiB
Python

import uuid
import datetime
import datetime
import re
import os
import json
import serial
import time
import threading
from typing import Optional, Dict, Any
import smtplib
from email.mime.text import MIMEText
from app import db
from app.models.models import Machine, User, Product, VendingSlot, Transaction, Role
from flask import current_app
import secrets
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class SerialCommunicationService:
def __init__(self):
self.serial_port: Optional[serial.Serial] = None
self.is_connected = False
self.response_buffer = ""
self.last_response = None
self.response_lock = threading.Lock()
def connect_to_machine(self, port: str = '/dev/ttyUSB0', baudrate: int = 9600) -> bool:
"""
Connect to vending machine via serial port
Common ports:
- Linux: /dev/ttyUSB0, /dev/ttyACM0
- Windows: COM3, COM4, etc.
- macOS: /dev/cu.usbserial-*
"""
try:
self.serial_port = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1 # 1 second timeout for reads
)
# Test connection with a status command
time.sleep(2) # Allow time for Arduino to reset
self.serial_port.write(b'STATUS\n')
time.sleep(1)
if self.serial_port.in_waiting > 0:
response = self.serial_port.readline().decode('utf-8').strip()
print(f"Machine connected successfully. Status: {response}")
self.is_connected = True
return True
else:
print("No response from machine")
self.serial_port.close()
return False
except Exception as e:
print(f"Failed to connect to machine: {e}")
self.is_connected = False
return False
def send_dispense_command(self, slot_id: str, timeout: int = 10) -> Dict[str, Any]:
"""
Send dispense command and wait for drop sensor response
"""
if not self.is_connected or not self.serial_port:
return {
'success': False,
'error': 'Not connected to machine',
'response': 'NO_CONNECTION'
}
try:
# Clear any pending data
self.serial_port.reset_input_buffer()
# Send dispense command
command = f"DISPENSE:{slot_id}\n"
self.serial_port.write(command.encode('utf-8'))
print(f"Sent command: {command.strip()}")
# Wait for response with timeout
start_time = time.time()
response = ""
while time.time() - start_time < timeout:
if self.serial_port.in_waiting > 0:
line = self.serial_port.readline().decode('utf-8').strip()
print(f"Received: {line}")
if line in ['PRODUCT_DISPENSED', 'DISPENSE_FAILED', 'SLOT_EMPTY', 'MOTOR_ERROR']:
response = line
break
time.sleep(0.1) # Small delay to prevent busy waiting
if not response:
return {
'success': False,
'error': 'Timeout waiting for drop sensor',
'response': 'TIMEOUT'
}
success = response == 'PRODUCT_DISPENSED'
return {
'success': success,
'response': response,
'slot_id': slot_id,
'message': self._get_response_message(response, slot_id)
}
except Exception as e:
print(f"Error during dispensing: {e}")
return {
'success': False,
'error': str(e),
'response': 'COMMUNICATION_ERROR'
}
def _get_response_message(self, response: str, slot_id: str) -> str:
"""Get user-friendly message for response codes"""
messages = {
'PRODUCT_DISPENSED': f'Product successfully dispensed from slot {slot_id}',
'DISPENSE_FAILED': f'Dispensing failed - no product detected from slot {slot_id}',
'SLOT_EMPTY': f'Slot {slot_id} is empty',
'MOTOR_ERROR': f'Motor malfunction in slot {slot_id}',
'TIMEOUT': f'Timeout - no response from slot {slot_id}',
'COMMUNICATION_ERROR': 'Communication error with machine'
}
return messages.get(response, f'Unknown response: {response}')
def check_machine_status(self) -> Dict[str, Any]:
"""Check overall machine status"""
if not self.is_connected or not self.serial_port:
return {'status': 'disconnected', 'message': 'Not connected to machine'}
try:
self.serial_port.reset_input_buffer()
self.serial_port.write(b'STATUS\n')
time.sleep(1)
if self.serial_port.in_waiting > 0:
response = self.serial_port.readline().decode('utf-8').strip()
return {
'status': 'connected',
'response': response,
'message': 'Machine is responding'
}
else:
return {
'status': 'no_response',
'message': 'Machine not responding'
}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'message': 'Communication error'
}
def reset_machine(self) -> Dict[str, Any]:
"""Send reset command to clear any error states"""
if not self.is_connected or not self.serial_port:
return {'success': False, 'error': 'Not connected to machine'}
try:
self.serial_port.write(b'RESET\n')
time.sleep(2)
response = self.serial_port.readline().decode('utf-8').strip()
return {
'success': True,
'response': response,
'message': 'Reset command sent'
}
except Exception as e:
return {'success': False, 'error': str(e)}
def disconnect(self):
"""Disconnect from serial port"""
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.is_connected = False
print("Disconnected from vending machine")
# Global instance
serial_service = SerialCommunicationService()
# Machine Service (UNCHANGED)
class MachineService:
@staticmethod
def generate_id(prefix):
return f"{prefix}{uuid.uuid4().hex[:8].upper()}"
@staticmethod
def generate_unique_password():
while True:
password = secrets.token_hex(4).upper() # e.g., "A1B2C3D4"
if not Machine.query.filter_by(password=password).first():
return password
@staticmethod
def validate_machine_data(data):
required_fields = ['machine_model', 'machine_type', 'client_name', 'branch_name', 'operation_status', 'connection_status']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
status_mapping = {
'connected': 'connected',
'disconnected': 'disconnected',
'maintenance':'maintenance',
'active': 'active',
'inactive': 'inactive'
}
if data['operation_status'] not in status_mapping:
raise ValueError(f"Invalid operation status. Allowed statuses: {list(status_mapping.keys())}")
data['operation_status'] = status_mapping[data['operation_status']]
if data['connection_status'] not in status_mapping:
raise ValueError(f"Invalid connection status. Allowed statuses: {list(status_mapping.keys())}")
data['connection_status'] = status_mapping[data['connection_status']]
@staticmethod
def get_all_machines():
return Machine.query.all()
@staticmethod
def get_machine_slots(machine_id: str):
machine = Machine.query.filter_by(machine_id=machine_id).first()
if not machine:
return None
slots = VendingSlot.query.filter_by(machine_id=machine_id).all()
product_ids = [slot.product_id for slot in slots if slot.product_id]
products = {p.product_id: p for p in Product.query.filter(Product.product_id.in_(product_ids)).all()} if product_ids else {}
return [{
'slot_name': slot.row_id,
'column': int(slot.slot_name[1:]) if slot.slot_name[1:].isdigit() else 1,
'enabled': slot.enabled,
'product_id': slot.product_id,
'units': slot.units or 0,
'price': slot.price or '0.0',
'product_name': products.get(slot.product_id, {'product_name': 'N/A'}).product_name if slot.product_id else 'N/A',
'product_image': products.get(slot.product_id, {'product_image': None}).product_image if slot.product_id else None
} for slot in slots]
@staticmethod
def create_machine(data):
# Modified validation to accept client_id instead of client_name
required_fields = ['machine_model', 'machine_type', 'client_id', 'branch_name', 'operation_status', 'connection_status']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
status_mapping = {
'connected': 'connected',
'disconnected': 'disconnected',
'maintenance':'maintenance',
'active': 'active',
'inactive': 'inactive'
}
if data['operation_status'] not in status_mapping:
raise ValueError(f"Invalid operation status. Allowed statuses: {list(status_mapping.keys())}")
data['operation_status'] = status_mapping[data['operation_status']]
if data['connection_status'] not in status_mapping:
raise ValueError(f"Invalid connection status. Allowed statuses: {list(status_mapping.keys())}")
data['connection_status'] = status_mapping[data['connection_status']]
machine_id = MachineService.generate_id("M")
branch_id = MachineService.generate_id("B")
created_on = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
password = MachineService.generate_unique_password()
# Validate client_id exists in users
client_id = data.get('client_id')
if not client_id or not User.query.get(client_id):
raise ValueError("Invalid or missing client_id")
# Get client_name from user
user = User.query.get(client_id)
client_name = user.username
new_machine = Machine(
machine_id=machine_id,
machine_model=data['machine_model'],
machine_type=data['machine_type'],
client_id=client_id,
client_name=client_name, # Set client_name from user record
branch_id=branch_id,
branch_name=data['branch_name'],
operation_status=data['operation_status'],
connection_status=data['connection_status'],
created_on=created_on,
password=password
)
db.session.add(new_machine)
db.session.commit()
return new_machine
@staticmethod
def update_machine(id, data):
machine = Machine.query.get_or_404(id)
if any(field in data for field in ['machine_model', 'machine_type', 'branch_name', 'operation_status', 'connection_status']):
MachineService.validate_machine_data({k: data.get(k, getattr(machine, k)) for k in ['machine_model', 'machine_type', 'branch_name', 'operation_status', 'connection_status']})
machine.machine_model = data.get('machine_model', machine.machine_model)
machine.machine_type = data.get('machine_type', machine.machine_type)
if 'client_id' in data:
client_id = data['client_id']
if not User.query.get(client_id):
raise ValueError("Invalid client_id")
machine.client_id = client_id
machine.branch_name = data.get('branch_name', machine.branch_name)
machine.operation_status = data.get('operation_status', machine.operation_status)
machine.connection_status = data.get('connection_status', machine.connection_status)
db.session.commit()
return machine
@staticmethod
def delete_machine(id):
machine = Machine.query.get_or_404(id)
db.session.delete(machine)
db.session.commit()
return True
@staticmethod
def get_machine_by_id(id):
return Machine.query.get_or_404(id)
@staticmethod
def get_vending_state(machine_id):
machine = Machine.query.filter_by(machine_id=machine_id).first_or_404()
slots = VendingSlot.query.filter_by(machine_id=machine_id).all()
vending_rows = {}
for slot in slots:
if slot.row_id not in vending_rows:
vending_rows[slot.row_id] = []
vending_rows[slot.row_id].append(slot.to_dict())
return {
'vendingRows': [{'rowId': row_id, 'slots': slots} for row_id, slots in vending_rows.items()],
'lastUpdated': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
@staticmethod
def update_vending_state(machine_id, vending_rows):
machine = Machine.query.filter_by(machine_id=machine_id).first_or_404()
VendingSlot.query.filter_by(machine_id=machine_id).delete()
for row in vending_rows:
row_id = row['rowId']
for slot_data in row['slots']:
new_slot = VendingSlot(
machine_id=machine_id,
row_id=row_id,
slot_name=slot_data['name'],
enabled=slot_data['enabled'],
product_id=slot_data['productId'],
units=slot_data['units'],
price=slot_data['price']
)
db.session.add(new_slot)
db.session.commit()
return {'success': True, 'message': 'Vending state updated', 'lastUpdated': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
# User Service (MODIFIED FOR BREVO SMTP WITH ENV VARIABLES)
# Updated UserService in services.py
class UserService:
@staticmethod
def generate_unique_password():
"""Generate a secure random password"""
return secrets.token_hex(4).upper() # 8 character hex password
@staticmethod
def save_file(file, folder='user_uploads'):
"""Save uploaded file and return path"""
if not file:
return None
filename = f"{uuid.uuid4().hex}_{file.filename}"
upload_folder = os.path.join(current_app.config['UPLOAD_FOLDER'], folder)
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, filename)
file.save(file_path)
return f"/uploads/{folder}/{filename}"
@staticmethod
def save_multiple_documents(files, metadata_json=None):
"""Save multiple document files with metadata"""
document_paths = []
metadata_list = []
if metadata_json:
try:
metadata_list = json.loads(metadata_json)
print(f"Parsed metadata: {metadata_list}")
except Exception as e:
print(f"Error parsing metadata: {e}")
metadata_list = []
if not isinstance(files, list):
files = [files]
for idx, file in enumerate(files):
if file and hasattr(file, 'filename'):
try:
path = UserService.save_file(file, 'user_documents')
if path:
metadata = metadata_list[idx] if idx < len(metadata_list) else {}
document_paths.append({
'filename': file.filename,
'path': path,
'uploaded_at': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'document_type': metadata.get('document_type', 'other'),
'document_type_other': metadata.get('document_type_other', '')
})
print(f"Saved document: {file.filename}")
except Exception as e:
print(f"Error saving file {file.filename}: {e}")
continue
return document_paths
@staticmethod
def send_email_notification(username, email, contact, password, user_id):
"""Send email notification with user credentials"""
subject = "Your Account Credentials"
body = f"""
Dear {username},
Your account has been created successfully!
Login Credentials:
==================
User ID: {user_id}
Username: {username}
Email: {email}
Contact: {contact}
Password: {password}
Important Security Notice:
- Please change your password after first login
- Keep this information secure and confidential
- Do not share your credentials with anyone
You can login using either your email or username along with your password.
Best regards,
System Administration Team
"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = os.getenv('BREVO_SENDER_EMAIL')
msg['To'] = email
smtp_email = os.getenv('BREVO_SMTP_EMAIL')
smtp_key = os.getenv('BREVO_SMTP_KEY')
if not all([smtp_email, smtp_key, msg['From']]):
print("Error: Brevo SMTP configuration missing in .env")
return False
try:
with smtplib.SMTP('smtp-relay.brevo.com', 587) as server:
server.starttls()
server.login(smtp_email, smtp_key)
server.send_message(msg)
print(f"✓ Email sent successfully to {email}")
return True
except Exception as e:
print(f"✗ Failed to send email to {email}: {str(e)}")
return False
@staticmethod
def log_sms_notification(contact, username, email, password, user_id):
"""Log SMS notification (for future SMS integration)"""
notification = {
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"user_id": user_id,
"username": username,
"email": email,
"contact": contact,
"password": password,
"message": f"Your account credentials: User ID: {user_id}, Username: {username}, Email: {email}, Password: {password}"
}
notifications_file = os.path.join(current_app.root_path, "sms_notifications.json")
try:
notifications = []
if os.path.exists(notifications_file):
with open(notifications_file, "r") as f:
notifications = json.load(f)
notifications.append(notification)
with open(notifications_file, "w") as f:
json.dump(notifications, f, indent=4)
print(f"✓ SMS notification logged for {contact}")
return True
except Exception as e:
print(f"✗ Failed to log SMS notification: {str(e)}")
return False
@staticmethod
def validate_user_data(data):
"""Validate user data before creating/updating"""
import re
required_fields = ['username', 'email', 'contact', 'roles', 'user_status']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# Email validation - FIXED: Added closing quote
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, data['email']):
raise ValueError("Invalid email format")
@staticmethod
def get_all_users():
"""Get all users from database"""
return User.query.all()
@staticmethod
def create_user(data, photo_file=None, logo_file=None, document_files=None, documents_metadata=None):
"""
Create new user with proper password hashing
Password must be provided in data
"""
print(f"\n{'='*60}")
print(f"CREATING NEW USER")
print(f"{'='*60}")
# Validate user data
UserService.validate_user_data(data)
# CHECK: Password must be provided
if 'password' not in data or not data['password']:
raise ValueError("Password is required")
# Generate user ID only
user_id = User.generate_user_id(data['username'], data['email'])
password = data['password'] # Use provided password, not auto-generated
print(f"Generated User ID: {user_id}")
print(f"Password provided by admin")
# Handle file uploads
photo_path = UserService.save_file(photo_file, 'user_photos') if photo_file else None
logo_path = UserService.save_file(logo_file, 'company_logos') if logo_file else None
documents = UserService.save_multiple_documents(document_files, documents_metadata) if document_files else []
# Create new user
new_user = User(
user_id=user_id,
username=data['username'],
email=data['email'],
contact=data['contact'],
roles=data['roles'],
user_status=data['user_status'],
photo=photo_path,
company_logo=logo_path,
documents=json.dumps(documents)
)
# Set password (this will hash it automatically)
new_user.set_password(password)
print(f"Password hashed and set for user")
# Save to database
db.session.add(new_user)
db.session.commit()
print(f"✓ User created successfully with ID: {new_user.id}")
print(f"{'='*60}\n")
# Optional: Send email notification with the password they set
# UserService.send_email_notification(...)
return new_user
@staticmethod
def update_user(id, data, photo_file=None, logo_file=None, document_files=None, documents_metadata=None):
"""
Update existing user
Password is only updated if explicitly provided in data
"""
user = User.query.get_or_404(id)
print(f"\n{'='*60}")
print(f"UPDATING USER: {user.username}")
print(f"{'='*60}")
# Validate only if fields are being updated
if any(field in data for field in ['username', 'email', 'contact', 'roles', 'user_status']):
UserService.validate_user_data({
'username': data.get('username', user.username),
'email': data.get('email', user.email),
'contact': data.get('contact', user.contact),
'roles': data.get('roles', user.roles),
'user_status': data.get('user_status', user.user_status)
})
# Update basic fields
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
user.contact = data.get('contact', user.contact)
user.roles = data.get('roles', user.roles)
user.user_status = data.get('user_status', user.user_status)
# Update password if provided (will be hashed automatically)
if 'password' in data and data['password']:
print(f"Updating password for user")
user.set_password(data['password'])
# Handle file updates
if photo_file:
if user.photo:
old_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
user.photo.replace('/uploads/', ''))
if os.path.exists(old_path):
os.remove(old_path)
user.photo = UserService.save_file(photo_file, 'user_photos')
if logo_file:
if user.company_logo:
old_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
user.company_logo.replace('/uploads/', ''))
if os.path.exists(old_path):
os.remove(old_path)
user.company_logo = UserService.save_file(logo_file, 'company_logos')
if document_files:
existing_docs = json.loads(user.documents) if user.documents else []
new_docs = UserService.save_multiple_documents(document_files, documents_metadata)
existing_docs.extend(new_docs)
user.documents = json.dumps(existing_docs)
user.updated_at = datetime.datetime.utcnow()
db.session.commit()
print(f"✓ User updated successfully")
print(f"{'='*60}\n")
return user
@staticmethod
def delete_user_document(user_id, document_path):
"""Delete a specific document from a user"""
user = User.query.get_or_404(user_id)
if user.documents:
documents = json.loads(user.documents)
documents = [doc for doc in documents if doc['path'] != document_path]
user.documents = json.dumps(documents)
# Delete physical file
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
document_path.replace('/uploads/', ''))
if os.path.exists(file_path):
os.remove(file_path)
db.session.commit()
return True
return False
@staticmethod
def delete_user(id):
"""Delete user and all associated files"""
user = User.query.get_or_404(id)
# Delete all associated files
if user.photo:
photo_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
user.photo.replace('/uploads/', ''))
if os.path.exists(photo_path):
os.remove(photo_path)
if user.company_logo:
logo_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
user.company_logo.replace('/uploads/', ''))
if os.path.exists(logo_path):
os.remove(logo_path)
if user.documents:
documents = json.loads(user.documents)
for doc in documents:
doc_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
doc['path'].replace('/uploads/', ''))
if os.path.exists(doc_path):
os.remove(doc_path)
db.session.delete(user)
db.session.commit()
return True
@staticmethod
def validate_file(file, maxSizeMB=10):
"""Validate file size"""
max_size_bytes = maxSizeMB * 1024 * 1024
if file.size > max_size_bytes:
return f"File size exceeds {maxSizeMB}MB limit"
return None
@staticmethod
def is_valid_image_type(file):
"""Check if file is valid image type"""
valid_types = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif']
return valid_types.includes(file.type) if hasattr(file, 'type') else False
@staticmethod
def is_valid_document_type(file):
"""Check if file is valid document type"""
valid_types = [
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'image/jpeg',
'image/jpg',
'image/png'
]
return valid_types.includes(file.type) if hasattr(file, 'type') else False
# Product Service (UNCHANGED)
class ProductService:
@staticmethod
def generate_id(prefix):
return f"{prefix}{uuid.uuid4().hex[:4].upper()}"
@staticmethod
def validate_product_data(data):
required_fields = ['product_name', 'price']
for field in required_fields:
if field not in data or not data[field]:
raise ValueError(f"Missing or empty required field: {field}")
try:
price = float(data['price'])
if price < 0:
raise ValueError("Price must be a positive number")
except (ValueError, TypeError):
raise ValueError("Price must be a valid number")
@staticmethod
def save_image(file):
if not file:
raise ValueError("Product image is required")
filename = f"{uuid.uuid4().hex}_{file.filename}"
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
return f"/uploads/{filename}"
@staticmethod
def get_all_products():
return Product.query.all()
@staticmethod
def create_product(data, file):
ProductService.validate_product_data(data)
image_path = ProductService.save_image(file)
product_id = ProductService.generate_id("P")
created_date = data.get('created_date', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
new_product = Product(
product_id=product_id,
product_name=data['product_name'],
price=float(data['price']),
product_image=image_path,
created_date=created_date
)
db.session.add(new_product)
db.session.commit()
return new_product
@staticmethod
def update_product(id, data, file):
product = Product.query.get_or_404(id)
ProductService.validate_product_data(data)
product.product_name = data.get('product_name', product.product_name)
product.price = float(data.get('price', product.price))
product.created_date = data.get('created_date', product.created_date)
if file:
if product.product_image:
old_image_path = os.path.join(current_app.config['UPLOAD_FOLDER'], product.product_image.split('/')[-1])
if os.path.exists(old_image_path):
os.remove(old_image_path)
product.product_image = ProductService.save_image(file)
db.session.commit()
return product
@staticmethod
def delete_product(id):
product = Product.query.get_or_404(id)
if product.product_image:
image_path = os.path.join(current_app.config['UPLOAD_FOLDER'], product.product_image.split('/')[-1])
if os.path.exists(image_path):
os.remove(image_path)
db.session.delete(product)
db.session.commit()
return True
class TransactionService:
@staticmethod
def create_transaction(machine_id, product_name, quantity, amount, payment_type, status='Success'):
"""Create a new transaction record"""
transaction_id = f"TXN-{uuid.uuid4().hex[:8].upper()}"
new_transaction = Transaction(
transaction_id=transaction_id,
machine_id=machine_id,
product_name=product_name,
quantity=quantity,
amount=amount,
payment_type=payment_type,
status=status
)
db.session.add(new_transaction)
db.session.commit()
return new_transaction
@staticmethod
def update_transaction_status(transaction_id, status, amount_receiving_status=None, return_amount=None):
"""Update transaction status after dispensing"""
transaction = Transaction.query.filter_by(transaction_id=transaction_id).first()
if transaction:
transaction.status = status
if amount_receiving_status:
transaction.amount_receiving_status = amount_receiving_status
if return_amount:
transaction.return_amount = return_amount
db.session.commit()
return transaction
@staticmethod
def get_all_transactions(filters=None):
"""Get all transactions with optional filters"""
query = Transaction.query
if filters:
if 'machine_id' in filters:
query = query.filter_by(machine_id=filters['machine_id'])
if 'status' in filters:
query = query.filter_by(status=filters['status'])
if 'date_from' in filters:
query = query.filter(Transaction.created_at >= filters['date_from'])
if 'date_to' in filters:
query = query.filter(Transaction.created_at <= filters['date_to'])
return query.order_by(Transaction.created_at.desc()).all()
@staticmethod
def get_transaction_by_id(transaction_id):
"""Get single transaction by ID"""
return Transaction.query.filter_by(transaction_id=transaction_id).first()
class RoleService:
@staticmethod
def get_all_roles():
"""Get all roles"""
return Role.query.all()
@staticmethod
def get_role_by_id(role_id):
"""Get role by ID"""
return Role.query.get_or_404(role_id)
@staticmethod
def create_role(data):
"""Create new role"""
name = data.get('name')
description = data.get('description', '')
permissions = data.get('permissions', [])
# Check if role already exists
existing_role = Role.query.filter_by(name=name).first()
if existing_role:
raise ValueError(f"Role '{name}' already exists")
new_role = Role(
name=name,
description=description,
permissions=json.dumps(permissions)
)
db.session.add(new_role)
db.session.commit()
return new_role
@staticmethod
def update_role(role_id, data):
"""Update existing role"""
role = Role.query.get_or_404(role_id)
if 'name' in data:
# Check if new name conflicts with existing role
existing = Role.query.filter(Role.name == data['name'], Role.id != role_id).first()
if existing:
raise ValueError(f"Role '{data['name']}' already exists")
role.name = data['name']
if 'description' in data:
role.description = data['description']
if 'permissions' in data:
role.permissions = json.dumps(data['permissions'])
role.updated_at = datetime.datetime.utcnow()
db.session.commit()
return role
@staticmethod
def delete_role(role_id):
"""Delete role"""
role = Role.query.get_or_404(role_id)
# Check if any users have this role
users_with_role = User.query.filter_by(roles=role.name).count()
if users_with_role > 0:
raise ValueError(f"Cannot delete role '{role.name}' - {users_with_role} user(s) still assigned to it")
db.session.delete(role)
db.session.commit()
return True