967 lines
35 KiB
Python
967 lines
35 KiB
Python
import uuid
|
|
import datetime
|
|
import datetime
|
|
import re
|
|
import os
|
|
import json
|
|
import serial
|
|
import time
|
|
import threading
|
|
from typing import Optional, Dict, Any
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from app import db
|
|
from app.models.models import Machine, User, Product, VendingSlot, Transaction, Role
|
|
from flask import current_app
|
|
import secrets
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
class SerialCommunicationService:
|
|
def __init__(self):
|
|
self.serial_port: Optional[serial.Serial] = None
|
|
self.is_connected = False
|
|
self.response_buffer = ""
|
|
self.last_response = None
|
|
self.response_lock = threading.Lock()
|
|
|
|
def connect_to_machine(self, port: str = '/dev/ttyUSB0', baudrate: int = 9600) -> bool:
|
|
"""
|
|
Connect to vending machine via serial port
|
|
Common ports:
|
|
- Linux: /dev/ttyUSB0, /dev/ttyACM0
|
|
- Windows: COM3, COM4, etc.
|
|
- macOS: /dev/cu.usbserial-*
|
|
"""
|
|
try:
|
|
self.serial_port = serial.Serial(
|
|
port=port,
|
|
baudrate=baudrate,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=1 # 1 second timeout for reads
|
|
)
|
|
|
|
# Test connection with a status command
|
|
time.sleep(2) # Allow time for Arduino to reset
|
|
self.serial_port.write(b'STATUS\n')
|
|
time.sleep(1)
|
|
|
|
if self.serial_port.in_waiting > 0:
|
|
response = self.serial_port.readline().decode('utf-8').strip()
|
|
print(f"Machine connected successfully. Status: {response}")
|
|
self.is_connected = True
|
|
return True
|
|
else:
|
|
print("No response from machine")
|
|
self.serial_port.close()
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Failed to connect to machine: {e}")
|
|
self.is_connected = False
|
|
return False
|
|
|
|
def send_dispense_command(self, slot_id: str, timeout: int = 10) -> Dict[str, Any]:
|
|
"""
|
|
Send dispense command and wait for drop sensor response
|
|
"""
|
|
if not self.is_connected or not self.serial_port:
|
|
return {
|
|
'success': False,
|
|
'error': 'Not connected to machine',
|
|
'response': 'NO_CONNECTION'
|
|
}
|
|
|
|
try:
|
|
# Clear any pending data
|
|
self.serial_port.reset_input_buffer()
|
|
|
|
# Send dispense command
|
|
command = f"DISPENSE:{slot_id}\n"
|
|
self.serial_port.write(command.encode('utf-8'))
|
|
print(f"Sent command: {command.strip()}")
|
|
|
|
# Wait for response with timeout
|
|
start_time = time.time()
|
|
response = ""
|
|
|
|
while time.time() - start_time < timeout:
|
|
if self.serial_port.in_waiting > 0:
|
|
line = self.serial_port.readline().decode('utf-8').strip()
|
|
print(f"Received: {line}")
|
|
|
|
if line in ['PRODUCT_DISPENSED', 'DISPENSE_FAILED', 'SLOT_EMPTY', 'MOTOR_ERROR']:
|
|
response = line
|
|
break
|
|
|
|
time.sleep(0.1) # Small delay to prevent busy waiting
|
|
|
|
if not response:
|
|
return {
|
|
'success': False,
|
|
'error': 'Timeout waiting for drop sensor',
|
|
'response': 'TIMEOUT'
|
|
}
|
|
|
|
success = response == 'PRODUCT_DISPENSED'
|
|
return {
|
|
'success': success,
|
|
'response': response,
|
|
'slot_id': slot_id,
|
|
'message': self._get_response_message(response, slot_id)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error during dispensing: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'response': 'COMMUNICATION_ERROR'
|
|
}
|
|
|
|
def _get_response_message(self, response: str, slot_id: str) -> str:
|
|
"""Get user-friendly message for response codes"""
|
|
messages = {
|
|
'PRODUCT_DISPENSED': f'Product successfully dispensed from slot {slot_id}',
|
|
'DISPENSE_FAILED': f'Dispensing failed - no product detected from slot {slot_id}',
|
|
'SLOT_EMPTY': f'Slot {slot_id} is empty',
|
|
'MOTOR_ERROR': f'Motor malfunction in slot {slot_id}',
|
|
'TIMEOUT': f'Timeout - no response from slot {slot_id}',
|
|
'COMMUNICATION_ERROR': 'Communication error with machine'
|
|
}
|
|
return messages.get(response, f'Unknown response: {response}')
|
|
|
|
def check_machine_status(self) -> Dict[str, Any]:
|
|
"""Check overall machine status"""
|
|
if not self.is_connected or not self.serial_port:
|
|
return {'status': 'disconnected', 'message': 'Not connected to machine'}
|
|
|
|
try:
|
|
self.serial_port.reset_input_buffer()
|
|
self.serial_port.write(b'STATUS\n')
|
|
time.sleep(1)
|
|
|
|
if self.serial_port.in_waiting > 0:
|
|
response = self.serial_port.readline().decode('utf-8').strip()
|
|
return {
|
|
'status': 'connected',
|
|
'response': response,
|
|
'message': 'Machine is responding'
|
|
}
|
|
else:
|
|
return {
|
|
'status': 'no_response',
|
|
'message': 'Machine not responding'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e),
|
|
'message': 'Communication error'
|
|
}
|
|
|
|
def reset_machine(self) -> Dict[str, Any]:
|
|
"""Send reset command to clear any error states"""
|
|
if not self.is_connected or not self.serial_port:
|
|
return {'success': False, 'error': 'Not connected to machine'}
|
|
|
|
try:
|
|
self.serial_port.write(b'RESET\n')
|
|
time.sleep(2)
|
|
|
|
response = self.serial_port.readline().decode('utf-8').strip()
|
|
return {
|
|
'success': True,
|
|
'response': response,
|
|
'message': 'Reset command sent'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from serial port"""
|
|
if self.serial_port and self.serial_port.is_open:
|
|
self.serial_port.close()
|
|
self.is_connected = False
|
|
print("Disconnected from vending machine")
|
|
|
|
# Global instance
|
|
serial_service = SerialCommunicationService()
|
|
|
|
# Machine Service (UNCHANGED)
|
|
class MachineService:
|
|
@staticmethod
|
|
def generate_id(prefix):
|
|
return f"{prefix}{uuid.uuid4().hex[:8].upper()}"
|
|
|
|
@staticmethod
|
|
def generate_unique_password():
|
|
while True:
|
|
password = secrets.token_hex(4).upper() # e.g., "A1B2C3D4"
|
|
if not Machine.query.filter_by(password=password).first():
|
|
return password
|
|
|
|
@staticmethod
|
|
def validate_machine_data(data):
|
|
required_fields = ['machine_model', 'machine_type', 'client_name', 'branch_name', 'operation_status', 'connection_status']
|
|
for field in required_fields:
|
|
if field not in data:
|
|
raise ValueError(f"Missing required field: {field}")
|
|
|
|
status_mapping = {
|
|
'connected': 'connected',
|
|
'disconnected': 'disconnected',
|
|
'maintenance':'maintenance',
|
|
'active': 'active',
|
|
'inactive': 'inactive'
|
|
}
|
|
|
|
if data['operation_status'] not in status_mapping:
|
|
raise ValueError(f"Invalid operation status. Allowed statuses: {list(status_mapping.keys())}")
|
|
data['operation_status'] = status_mapping[data['operation_status']]
|
|
|
|
if data['connection_status'] not in status_mapping:
|
|
raise ValueError(f"Invalid connection status. Allowed statuses: {list(status_mapping.keys())}")
|
|
data['connection_status'] = status_mapping[data['connection_status']]
|
|
|
|
@staticmethod
|
|
def get_all_machines():
|
|
return Machine.query.all()
|
|
|
|
@staticmethod
|
|
def get_machine_slots(machine_id: str):
|
|
machine = Machine.query.filter_by(machine_id=machine_id).first()
|
|
if not machine:
|
|
return None
|
|
|
|
slots = VendingSlot.query.filter_by(machine_id=machine_id).all()
|
|
|
|
product_ids = [slot.product_id for slot in slots if slot.product_id]
|
|
products = {p.product_id: p for p in Product.query.filter(Product.product_id.in_(product_ids)).all()} if product_ids else {}
|
|
|
|
return [{
|
|
'slot_name': slot.row_id,
|
|
'column': int(slot.slot_name[1:]) if slot.slot_name[1:].isdigit() else 1,
|
|
'enabled': slot.enabled,
|
|
'product_id': slot.product_id,
|
|
'units': slot.units or 0,
|
|
'price': slot.price or '0.0',
|
|
'product_name': products.get(slot.product_id, {'product_name': 'N/A'}).product_name if slot.product_id else 'N/A',
|
|
'product_image': products.get(slot.product_id, {'product_image': None}).product_image if slot.product_id else None
|
|
} for slot in slots]
|
|
|
|
@staticmethod
|
|
def create_machine(data):
|
|
# Modified validation to accept client_id instead of client_name
|
|
required_fields = ['machine_model', 'machine_type', 'client_id', 'branch_name', 'operation_status', 'connection_status']
|
|
for field in required_fields:
|
|
if field not in data:
|
|
raise ValueError(f"Missing required field: {field}")
|
|
|
|
status_mapping = {
|
|
'connected': 'connected',
|
|
'disconnected': 'disconnected',
|
|
'maintenance':'maintenance',
|
|
'active': 'active',
|
|
'inactive': 'inactive'
|
|
}
|
|
|
|
if data['operation_status'] not in status_mapping:
|
|
raise ValueError(f"Invalid operation status. Allowed statuses: {list(status_mapping.keys())}")
|
|
data['operation_status'] = status_mapping[data['operation_status']]
|
|
|
|
if data['connection_status'] not in status_mapping:
|
|
raise ValueError(f"Invalid connection status. Allowed statuses: {list(status_mapping.keys())}")
|
|
data['connection_status'] = status_mapping[data['connection_status']]
|
|
|
|
machine_id = MachineService.generate_id("M")
|
|
branch_id = MachineService.generate_id("B")
|
|
created_on = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
password = MachineService.generate_unique_password()
|
|
|
|
# Validate client_id exists in users
|
|
client_id = data.get('client_id')
|
|
if not client_id or not User.query.get(client_id):
|
|
raise ValueError("Invalid or missing client_id")
|
|
|
|
# Get client_name from user
|
|
user = User.query.get(client_id)
|
|
client_name = user.username
|
|
|
|
new_machine = Machine(
|
|
machine_id=machine_id,
|
|
machine_model=data['machine_model'],
|
|
machine_type=data['machine_type'],
|
|
client_id=client_id,
|
|
client_name=client_name, # Set client_name from user record
|
|
branch_id=branch_id,
|
|
branch_name=data['branch_name'],
|
|
operation_status=data['operation_status'],
|
|
connection_status=data['connection_status'],
|
|
created_on=created_on,
|
|
password=password
|
|
)
|
|
|
|
db.session.add(new_machine)
|
|
db.session.commit()
|
|
return new_machine
|
|
|
|
@staticmethod
|
|
def update_machine(id, data):
|
|
machine = Machine.query.get_or_404(id)
|
|
|
|
if any(field in data for field in ['machine_model', 'machine_type', 'branch_name', 'operation_status', 'connection_status']):
|
|
MachineService.validate_machine_data({k: data.get(k, getattr(machine, k)) for k in ['machine_model', 'machine_type', 'branch_name', 'operation_status', 'connection_status']})
|
|
|
|
machine.machine_model = data.get('machine_model', machine.machine_model)
|
|
machine.machine_type = data.get('machine_type', machine.machine_type)
|
|
if 'client_id' in data:
|
|
client_id = data['client_id']
|
|
if not User.query.get(client_id):
|
|
raise ValueError("Invalid client_id")
|
|
machine.client_id = client_id
|
|
machine.branch_name = data.get('branch_name', machine.branch_name)
|
|
machine.operation_status = data.get('operation_status', machine.operation_status)
|
|
machine.connection_status = data.get('connection_status', machine.connection_status)
|
|
|
|
db.session.commit()
|
|
return machine
|
|
|
|
@staticmethod
|
|
def delete_machine(id):
|
|
machine = Machine.query.get_or_404(id)
|
|
db.session.delete(machine)
|
|
db.session.commit()
|
|
return True
|
|
|
|
@staticmethod
|
|
def get_machine_by_id(id):
|
|
return Machine.query.get_or_404(id)
|
|
|
|
@staticmethod
|
|
def get_vending_state(machine_id):
|
|
machine = Machine.query.filter_by(machine_id=machine_id).first_or_404()
|
|
slots = VendingSlot.query.filter_by(machine_id=machine_id).all()
|
|
|
|
vending_rows = {}
|
|
for slot in slots:
|
|
if slot.row_id not in vending_rows:
|
|
vending_rows[slot.row_id] = []
|
|
vending_rows[slot.row_id].append(slot.to_dict())
|
|
|
|
return {
|
|
'vendingRows': [{'rowId': row_id, 'slots': slots} for row_id, slots in vending_rows.items()],
|
|
'lastUpdated': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
|
|
@staticmethod
|
|
def update_vending_state(machine_id, vending_rows):
|
|
machine = Machine.query.filter_by(machine_id=machine_id).first_or_404()
|
|
VendingSlot.query.filter_by(machine_id=machine_id).delete()
|
|
|
|
for row in vending_rows:
|
|
row_id = row['rowId']
|
|
for slot_data in row['slots']:
|
|
new_slot = VendingSlot(
|
|
machine_id=machine_id,
|
|
row_id=row_id,
|
|
slot_name=slot_data['name'],
|
|
enabled=slot_data['enabled'],
|
|
product_id=slot_data['productId'],
|
|
units=slot_data['units'],
|
|
price=slot_data['price']
|
|
)
|
|
db.session.add(new_slot)
|
|
|
|
db.session.commit()
|
|
return {'success': True, 'message': 'Vending state updated', 'lastUpdated': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
|
|
# User Service (MODIFIED FOR BREVO SMTP WITH ENV VARIABLES)
|
|
# Updated UserService in services.py
|
|
class UserService:
|
|
@staticmethod
|
|
def generate_unique_password():
|
|
"""Generate a secure random password"""
|
|
return secrets.token_hex(4).upper() # 8 character hex password
|
|
|
|
@staticmethod
|
|
def save_file(file, folder='user_uploads'):
|
|
"""Save uploaded file and return path"""
|
|
if not file:
|
|
return None
|
|
|
|
filename = f"{uuid.uuid4().hex}_{file.filename}"
|
|
upload_folder = os.path.join(current_app.config['UPLOAD_FOLDER'], folder)
|
|
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
|
|
file_path = os.path.join(upload_folder, filename)
|
|
file.save(file_path)
|
|
|
|
return f"/uploads/{folder}/{filename}"
|
|
|
|
@staticmethod
|
|
def save_multiple_documents(files, metadata_json=None):
|
|
"""Save multiple document files with metadata"""
|
|
document_paths = []
|
|
|
|
metadata_list = []
|
|
if metadata_json:
|
|
try:
|
|
metadata_list = json.loads(metadata_json)
|
|
print(f"Parsed metadata: {metadata_list}")
|
|
except Exception as e:
|
|
print(f"Error parsing metadata: {e}")
|
|
metadata_list = []
|
|
|
|
if not isinstance(files, list):
|
|
files = [files]
|
|
|
|
for idx, file in enumerate(files):
|
|
if file and hasattr(file, 'filename'):
|
|
try:
|
|
path = UserService.save_file(file, 'user_documents')
|
|
if path:
|
|
metadata = metadata_list[idx] if idx < len(metadata_list) else {}
|
|
|
|
document_paths.append({
|
|
'filename': file.filename,
|
|
'path': path,
|
|
'uploaded_at': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
'document_type': metadata.get('document_type', 'other'),
|
|
'document_type_other': metadata.get('document_type_other', '')
|
|
})
|
|
print(f"Saved document: {file.filename}")
|
|
except Exception as e:
|
|
print(f"Error saving file {file.filename}: {e}")
|
|
continue
|
|
|
|
return document_paths
|
|
|
|
@staticmethod
|
|
def send_email_notification(username, email, contact, password, user_id):
|
|
"""Send email notification with user credentials"""
|
|
subject = "Your Account Credentials"
|
|
body = f"""
|
|
Dear {username},
|
|
|
|
Your account has been created successfully!
|
|
|
|
Login Credentials:
|
|
==================
|
|
User ID: {user_id}
|
|
Username: {username}
|
|
Email: {email}
|
|
Contact: {contact}
|
|
Password: {password}
|
|
|
|
Important Security Notice:
|
|
- Please change your password after first login
|
|
- Keep this information secure and confidential
|
|
- Do not share your credentials with anyone
|
|
|
|
You can login using either your email or username along with your password.
|
|
|
|
Best regards,
|
|
System Administration Team
|
|
"""
|
|
|
|
msg = MIMEText(body)
|
|
msg['Subject'] = subject
|
|
msg['From'] = os.getenv('BREVO_SENDER_EMAIL')
|
|
msg['To'] = email
|
|
|
|
smtp_email = os.getenv('BREVO_SMTP_EMAIL')
|
|
smtp_key = os.getenv('BREVO_SMTP_KEY')
|
|
|
|
if not all([smtp_email, smtp_key, msg['From']]):
|
|
print("Error: Brevo SMTP configuration missing in .env")
|
|
return False
|
|
|
|
try:
|
|
with smtplib.SMTP('smtp-relay.brevo.com', 587) as server:
|
|
server.starttls()
|
|
server.login(smtp_email, smtp_key)
|
|
server.send_message(msg)
|
|
print(f"✓ Email sent successfully to {email}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Failed to send email to {email}: {str(e)}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def log_sms_notification(contact, username, email, password, user_id):
|
|
"""Log SMS notification (for future SMS integration)"""
|
|
notification = {
|
|
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"email": email,
|
|
"contact": contact,
|
|
"password": password,
|
|
"message": f"Your account credentials: User ID: {user_id}, Username: {username}, Email: {email}, Password: {password}"
|
|
}
|
|
|
|
notifications_file = os.path.join(current_app.root_path, "sms_notifications.json")
|
|
|
|
try:
|
|
notifications = []
|
|
if os.path.exists(notifications_file):
|
|
with open(notifications_file, "r") as f:
|
|
notifications = json.load(f)
|
|
|
|
notifications.append(notification)
|
|
|
|
with open(notifications_file, "w") as f:
|
|
json.dump(notifications, f, indent=4)
|
|
|
|
print(f"✓ SMS notification logged for {contact}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Failed to log SMS notification: {str(e)}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def validate_user_data(data):
|
|
"""Validate user data before creating/updating"""
|
|
import re
|
|
|
|
required_fields = ['username', 'email', 'contact', 'roles', 'user_status']
|
|
for field in required_fields:
|
|
if field not in data:
|
|
raise ValueError(f"Missing required field: {field}")
|
|
|
|
# Email validation - FIXED: Added closing quote
|
|
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
if not re.match(email_regex, data['email']):
|
|
raise ValueError("Invalid email format")
|
|
|
|
@staticmethod
|
|
def get_all_users():
|
|
"""Get all users from database"""
|
|
return User.query.all()
|
|
|
|
@staticmethod
|
|
def create_user(data, photo_file=None, logo_file=None, document_files=None, documents_metadata=None):
|
|
"""
|
|
Create new user with proper password hashing
|
|
Password must be provided in data
|
|
"""
|
|
print(f"\n{'='*60}")
|
|
print(f"CREATING NEW USER")
|
|
print(f"{'='*60}")
|
|
|
|
# Validate user data
|
|
UserService.validate_user_data(data)
|
|
|
|
# CHECK: Password must be provided
|
|
if 'password' not in data or not data['password']:
|
|
raise ValueError("Password is required")
|
|
|
|
# Generate user ID only
|
|
user_id = User.generate_user_id(data['username'], data['email'])
|
|
password = data['password'] # Use provided password, not auto-generated
|
|
|
|
print(f"Generated User ID: {user_id}")
|
|
print(f"Password provided by admin")
|
|
|
|
# Handle file uploads
|
|
photo_path = UserService.save_file(photo_file, 'user_photos') if photo_file else None
|
|
logo_path = UserService.save_file(logo_file, 'company_logos') if logo_file else None
|
|
documents = UserService.save_multiple_documents(document_files, documents_metadata) if document_files else []
|
|
|
|
# Create new user
|
|
new_user = User(
|
|
user_id=user_id,
|
|
username=data['username'],
|
|
email=data['email'],
|
|
contact=data['contact'],
|
|
roles=data['roles'],
|
|
user_status=data['user_status'],
|
|
photo=photo_path,
|
|
company_logo=logo_path,
|
|
documents=json.dumps(documents)
|
|
)
|
|
|
|
# Set password (this will hash it automatically)
|
|
new_user.set_password(password)
|
|
print(f"Password hashed and set for user")
|
|
|
|
# Save to database
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
print(f"✓ User created successfully with ID: {new_user.id}")
|
|
print(f"{'='*60}\n")
|
|
|
|
# Optional: Send email notification with the password they set
|
|
# UserService.send_email_notification(...)
|
|
|
|
return new_user
|
|
|
|
@staticmethod
|
|
def update_user(id, data, photo_file=None, logo_file=None, document_files=None, documents_metadata=None):
|
|
"""
|
|
Update existing user
|
|
Password is only updated if explicitly provided in data
|
|
"""
|
|
user = User.query.get_or_404(id)
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"UPDATING USER: {user.username}")
|
|
print(f"{'='*60}")
|
|
|
|
# Validate only if fields are being updated
|
|
if any(field in data for field in ['username', 'email', 'contact', 'roles', 'user_status']):
|
|
UserService.validate_user_data({
|
|
'username': data.get('username', user.username),
|
|
'email': data.get('email', user.email),
|
|
'contact': data.get('contact', user.contact),
|
|
'roles': data.get('roles', user.roles),
|
|
'user_status': data.get('user_status', user.user_status)
|
|
})
|
|
|
|
# Update basic fields
|
|
user.username = data.get('username', user.username)
|
|
user.email = data.get('email', user.email)
|
|
user.contact = data.get('contact', user.contact)
|
|
user.roles = data.get('roles', user.roles)
|
|
user.user_status = data.get('user_status', user.user_status)
|
|
|
|
# Update password if provided (will be hashed automatically)
|
|
if 'password' in data and data['password']:
|
|
print(f"Updating password for user")
|
|
user.set_password(data['password'])
|
|
|
|
# Handle file updates
|
|
if photo_file:
|
|
if user.photo:
|
|
old_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
|
|
user.photo.replace('/uploads/', ''))
|
|
if os.path.exists(old_path):
|
|
os.remove(old_path)
|
|
user.photo = UserService.save_file(photo_file, 'user_photos')
|
|
|
|
if logo_file:
|
|
if user.company_logo:
|
|
old_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
|
|
user.company_logo.replace('/uploads/', ''))
|
|
if os.path.exists(old_path):
|
|
os.remove(old_path)
|
|
user.company_logo = UserService.save_file(logo_file, 'company_logos')
|
|
|
|
if document_files:
|
|
existing_docs = json.loads(user.documents) if user.documents else []
|
|
new_docs = UserService.save_multiple_documents(document_files, documents_metadata)
|
|
existing_docs.extend(new_docs)
|
|
user.documents = json.dumps(existing_docs)
|
|
|
|
user.updated_at = datetime.datetime.utcnow()
|
|
|
|
db.session.commit()
|
|
|
|
print(f"✓ User updated successfully")
|
|
print(f"{'='*60}\n")
|
|
|
|
return user
|
|
|
|
@staticmethod
|
|
def delete_user_document(user_id, document_path):
|
|
"""Delete a specific document from a user"""
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
if user.documents:
|
|
documents = json.loads(user.documents)
|
|
documents = [doc for doc in documents if doc['path'] != document_path]
|
|
user.documents = json.dumps(documents)
|
|
|
|
# Delete physical file
|
|
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
|
|
document_path.replace('/uploads/', ''))
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def delete_user(id):
|
|
"""Delete user and all associated files"""
|
|
user = User.query.get_or_404(id)
|
|
|
|
# Delete all associated files
|
|
if user.photo:
|
|
photo_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
|
|
user.photo.replace('/uploads/', ''))
|
|
if os.path.exists(photo_path):
|
|
os.remove(photo_path)
|
|
|
|
if user.company_logo:
|
|
logo_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
|
|
user.company_logo.replace('/uploads/', ''))
|
|
if os.path.exists(logo_path):
|
|
os.remove(logo_path)
|
|
|
|
if user.documents:
|
|
documents = json.loads(user.documents)
|
|
for doc in documents:
|
|
doc_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
|
|
doc['path'].replace('/uploads/', ''))
|
|
if os.path.exists(doc_path):
|
|
os.remove(doc_path)
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return True
|
|
|
|
@staticmethod
|
|
def validate_file(file, maxSizeMB=10):
|
|
"""Validate file size"""
|
|
max_size_bytes = maxSizeMB * 1024 * 1024
|
|
if file.size > max_size_bytes:
|
|
return f"File size exceeds {maxSizeMB}MB limit"
|
|
return None
|
|
|
|
@staticmethod
|
|
def is_valid_image_type(file):
|
|
"""Check if file is valid image type"""
|
|
valid_types = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif']
|
|
return valid_types.includes(file.type) if hasattr(file, 'type') else False
|
|
|
|
@staticmethod
|
|
def is_valid_document_type(file):
|
|
"""Check if file is valid document type"""
|
|
valid_types = [
|
|
'application/pdf',
|
|
'application/msword',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'image/jpeg',
|
|
'image/jpg',
|
|
'image/png'
|
|
]
|
|
return valid_types.includes(file.type) if hasattr(file, 'type') else False
|
|
|
|
# Product Service (UNCHANGED)
|
|
class ProductService:
|
|
@staticmethod
|
|
def generate_id(prefix):
|
|
return f"{prefix}{uuid.uuid4().hex[:4].upper()}"
|
|
|
|
@staticmethod
|
|
def validate_product_data(data):
|
|
required_fields = ['product_name', 'price']
|
|
for field in required_fields:
|
|
if field not in data or not data[field]:
|
|
raise ValueError(f"Missing or empty required field: {field}")
|
|
|
|
try:
|
|
price = float(data['price'])
|
|
if price < 0:
|
|
raise ValueError("Price must be a positive number")
|
|
except (ValueError, TypeError):
|
|
raise ValueError("Price must be a valid number")
|
|
|
|
@staticmethod
|
|
def save_image(file):
|
|
if not file:
|
|
raise ValueError("Product image is required")
|
|
|
|
filename = f"{uuid.uuid4().hex}_{file.filename}"
|
|
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
|
|
|
|
file.save(file_path)
|
|
|
|
return f"/uploads/{filename}"
|
|
|
|
@staticmethod
|
|
def get_all_products():
|
|
return Product.query.all()
|
|
|
|
@staticmethod
|
|
def create_product(data, file):
|
|
ProductService.validate_product_data(data)
|
|
|
|
image_path = ProductService.save_image(file)
|
|
|
|
product_id = ProductService.generate_id("P")
|
|
created_date = data.get('created_date', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
|
|
|
new_product = Product(
|
|
product_id=product_id,
|
|
product_name=data['product_name'],
|
|
price=float(data['price']),
|
|
product_image=image_path,
|
|
created_date=created_date
|
|
)
|
|
|
|
db.session.add(new_product)
|
|
db.session.commit()
|
|
return new_product
|
|
|
|
@staticmethod
|
|
def update_product(id, data, file):
|
|
product = Product.query.get_or_404(id)
|
|
|
|
ProductService.validate_product_data(data)
|
|
|
|
product.product_name = data.get('product_name', product.product_name)
|
|
product.price = float(data.get('price', product.price))
|
|
product.created_date = data.get('created_date', product.created_date)
|
|
|
|
if file:
|
|
if product.product_image:
|
|
old_image_path = os.path.join(current_app.config['UPLOAD_FOLDER'], product.product_image.split('/')[-1])
|
|
if os.path.exists(old_image_path):
|
|
os.remove(old_image_path)
|
|
product.product_image = ProductService.save_image(file)
|
|
|
|
db.session.commit()
|
|
return product
|
|
|
|
@staticmethod
|
|
def delete_product(id):
|
|
product = Product.query.get_or_404(id)
|
|
|
|
if product.product_image:
|
|
image_path = os.path.join(current_app.config['UPLOAD_FOLDER'], product.product_image.split('/')[-1])
|
|
if os.path.exists(image_path):
|
|
os.remove(image_path)
|
|
|
|
db.session.delete(product)
|
|
db.session.commit()
|
|
return True
|
|
|
|
class TransactionService:
|
|
@staticmethod
|
|
def create_transaction(machine_id, product_name, quantity, amount, payment_type, status='Success'):
|
|
"""Create a new transaction record"""
|
|
transaction_id = f"TXN-{uuid.uuid4().hex[:8].upper()}"
|
|
|
|
new_transaction = Transaction(
|
|
transaction_id=transaction_id,
|
|
machine_id=machine_id,
|
|
product_name=product_name,
|
|
quantity=quantity,
|
|
amount=amount,
|
|
payment_type=payment_type,
|
|
status=status
|
|
)
|
|
|
|
db.session.add(new_transaction)
|
|
db.session.commit()
|
|
return new_transaction
|
|
|
|
@staticmethod
|
|
def update_transaction_status(transaction_id, status, amount_receiving_status=None, return_amount=None):
|
|
"""Update transaction status after dispensing"""
|
|
transaction = Transaction.query.filter_by(transaction_id=transaction_id).first()
|
|
if transaction:
|
|
transaction.status = status
|
|
if amount_receiving_status:
|
|
transaction.amount_receiving_status = amount_receiving_status
|
|
if return_amount:
|
|
transaction.return_amount = return_amount
|
|
db.session.commit()
|
|
return transaction
|
|
|
|
@staticmethod
|
|
def get_all_transactions(filters=None):
|
|
"""Get all transactions with optional filters"""
|
|
query = Transaction.query
|
|
|
|
if filters:
|
|
if 'machine_id' in filters:
|
|
query = query.filter_by(machine_id=filters['machine_id'])
|
|
if 'status' in filters:
|
|
query = query.filter_by(status=filters['status'])
|
|
if 'date_from' in filters:
|
|
query = query.filter(Transaction.created_at >= filters['date_from'])
|
|
if 'date_to' in filters:
|
|
query = query.filter(Transaction.created_at <= filters['date_to'])
|
|
|
|
return query.order_by(Transaction.created_at.desc()).all()
|
|
|
|
@staticmethod
|
|
def get_transaction_by_id(transaction_id):
|
|
"""Get single transaction by ID"""
|
|
return Transaction.query.filter_by(transaction_id=transaction_id).first()
|
|
|
|
class RoleService:
|
|
|
|
@staticmethod
|
|
def get_all_roles():
|
|
"""Get all roles"""
|
|
return Role.query.all()
|
|
|
|
@staticmethod
|
|
def get_role_by_id(role_id):
|
|
"""Get role by ID"""
|
|
return Role.query.get_or_404(role_id)
|
|
|
|
@staticmethod
|
|
def create_role(data):
|
|
"""Create new role"""
|
|
name = data.get('name')
|
|
description = data.get('description', '')
|
|
permissions = data.get('permissions', [])
|
|
|
|
# Check if role already exists
|
|
existing_role = Role.query.filter_by(name=name).first()
|
|
if existing_role:
|
|
raise ValueError(f"Role '{name}' already exists")
|
|
|
|
new_role = Role(
|
|
name=name,
|
|
description=description,
|
|
permissions=json.dumps(permissions)
|
|
)
|
|
|
|
db.session.add(new_role)
|
|
db.session.commit()
|
|
|
|
return new_role
|
|
|
|
@staticmethod
|
|
def update_role(role_id, data):
|
|
"""Update existing role"""
|
|
role = Role.query.get_or_404(role_id)
|
|
|
|
if 'name' in data:
|
|
# Check if new name conflicts with existing role
|
|
existing = Role.query.filter(Role.name == data['name'], Role.id != role_id).first()
|
|
if existing:
|
|
raise ValueError(f"Role '{data['name']}' already exists")
|
|
role.name = data['name']
|
|
|
|
if 'description' in data:
|
|
role.description = data['description']
|
|
|
|
if 'permissions' in data:
|
|
role.permissions = json.dumps(data['permissions'])
|
|
|
|
role.updated_at = datetime.datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
return role
|
|
|
|
@staticmethod
|
|
def delete_role(role_id):
|
|
"""Delete role"""
|
|
role = Role.query.get_or_404(role_id)
|
|
|
|
# Check if any users have this role
|
|
users_with_role = User.query.filter_by(roles=role.name).count()
|
|
if users_with_role > 0:
|
|
raise ValueError(f"Cannot delete role '{role.name}' - {users_with_role} user(s) still assigned to it")
|
|
|
|
db.session.delete(role)
|
|
db.session.commit()
|
|
|
|
return True |