clinical_laboratory/lims_management/models/stock_lot.py
Luis Ernesto Portillo Zaldivar 0637f0a9e3 fix(#69): Corregir error 'Expected singleton' en métodos de acción de stock.lot
- Modificar todos los métodos action_* para manejar múltiples registros
- Usar bucle 'for record in self:' en lugar de acceder directamente a self
- Afecta a: action_collect, action_receive, action_start_analysis,
  action_complete_analysis, action_store, action_dispose, action_cancel
- Previene el error cuando se llaman estos métodos con múltiples muestras
2025-07-16 21:49:50 -06:00

603 lines
23 KiB
Python

# -*- coding: utf-8 -*-
from odoo import models, fields, api, _
from odoo.exceptions import UserError
from datetime import datetime
import random
class StockLot(models.Model):
_name = 'stock.lot'
_inherit = ['stock.lot', 'mail.thread', 'mail.activity.mixin']
is_lab_sample = fields.Boolean(string='Es Muestra de Laboratorio')
barcode = fields.Char(
string='Código de Barras',
compute='_compute_barcode',
store=True,
readonly=True,
help="Código de barras único para la muestra en formato YYMMDDNNNNNNC"
)
patient_id = fields.Many2one(
'res.partner',
string='Paciente',
domain="[('is_patient', '=', True)]"
)
request_id = fields.Many2one(
'sale.order',
string='Orden de Laboratorio',
domain="[('is_lab_request', '=', True)]"
)
collection_date = fields.Datetime(string='Fecha de Recolección')
container_type = fields.Selection([
('serum_tube', 'Tubo de Suero'),
('edta_tube', 'Tubo EDTA'),
('swab', 'Hisopo'),
('urine', 'Contenedor de Orina'),
('other', 'Otro')
], string='Tipo de Contenedor (Obsoleto)', help='Campo obsoleto, use sample_type_product_id en su lugar')
sample_type_product_id = fields.Many2one(
'product.template',
string='Tipo de Muestra',
domain="[('is_sample_type', '=', True)]",
help="Producto que representa el tipo de contenedor/muestra"
)
collector_id = fields.Many2one(
'res.users',
string='Recolectado por',
default=lambda self: self.env.user
)
doctor_id = fields.Many2one(
'res.partner',
string='Médico Referente',
domain="[('is_doctor', '=', True)]",
help="Médico que ordenó los análisis"
)
origin = fields.Char(
string='Origen',
help="Referencia a la orden de laboratorio que generó esta muestra"
)
volume_ml = fields.Float(
string='Volumen (ml)',
help="Volumen total de muestra requerido"
)
analysis_names = fields.Char(
string='Análisis',
help="Lista de análisis que se realizarán con esta muestra"
)
state = fields.Selection([
('pending_collection', 'Pendiente de Recolección'),
('collected', 'Recolectada'),
('received', 'Recibida en Laboratorio'),
('in_process', 'En Proceso'),
('analyzed', 'Analizada'),
('stored', 'Almacenada'),
('disposed', 'Desechada'),
('cancelled', 'Cancelada'),
('rejected', 'Rechazada')
], string='Estado', default='collected', tracking=True)
# Rejection fields
rejection_reason_id = fields.Many2one(
'lims.rejection.reason',
string='Motivo de Rechazo',
tracking=True
)
rejection_notes = fields.Text(
string='Notas de Rechazo',
help="Información adicional sobre el rechazo"
)
rejected_by = fields.Many2one(
'res.users',
string='Rechazado por',
readonly=True
)
rejection_date = fields.Datetime(
string='Fecha de Rechazo',
readonly=True
)
# Re-sampling fields
parent_sample_id = fields.Many2one(
'stock.lot',
string='Muestra Original',
help='Muestra original de la cual esta es un re-muestreo',
domain="[('is_lab_sample', '=', True)]"
)
child_sample_ids = fields.One2many(
'stock.lot',
'parent_sample_id',
string='Re-muestras',
help='Muestras generadas como re-muestreo de esta'
)
resample_count = fields.Integer(
string='Número de Re-muestreo',
help='Indica cuántas veces se ha re-muestreado esta muestra',
compute='_compute_resample_count',
store=True
)
is_resample = fields.Boolean(
string='Es Re-muestra',
compute='_compute_is_resample',
store=True
)
root_sample_id = fields.Many2one(
'stock.lot',
string='Muestra Original (Raíz)',
compute='_compute_root_sample',
store=True,
help='Muestra original de la cadena de re-muestreos'
)
resample_chain_count = fields.Integer(
string='Re-muestreos en Cadena',
compute='_compute_resample_chain_count',
help='Número total de re-muestreos en toda la cadena'
)
def action_collect(self):
"""Mark sample(s) as collected"""
for record in self:
old_state = record.state
record.write({'state': 'collected', 'collection_date': fields.Datetime.now()})
record.message_post(
body='Muestra recolectada por %s' % self.env.user.name,
subject='Estado actualizado: Recolectada',
message_type='notification'
)
def action_receive(self):
"""Mark sample(s) as received in laboratory"""
for record in self:
old_state = record.state
record.write({'state': 'received'})
record.message_post(
body='Muestra recibida en laboratorio por %s' % self.env.user.name,
subject='Estado actualizado: Recibida',
message_type='notification'
)
def action_start_analysis(self):
"""Start analysis process"""
for record in self:
old_state = record.state
record.write({'state': 'in_process'})
record.message_post(
body='Análisis iniciado por %s' % self.env.user.name,
subject='Estado actualizado: En Proceso',
message_type='notification'
)
def action_complete_analysis(self):
"""Mark analysis as completed"""
for record in self:
old_state = record.state
record.write({'state': 'analyzed'})
record.message_post(
body='Análisis completado por %s' % self.env.user.name,
subject='Estado actualizado: Analizada',
message_type='notification'
)
def action_store(self):
"""Store the sample(s)"""
for record in self:
old_state = record.state
record.write({'state': 'stored'})
record.message_post(
body='Muestra almacenada por %s' % self.env.user.name,
subject='Estado actualizado: Almacenada',
message_type='notification'
)
def action_dispose(self):
"""Dispose of the sample(s)"""
for record in self:
old_state = record.state
record.write({'state': 'disposed'})
record.message_post(
body='Muestra desechada por %s. Motivo de disposición registrado.' % self.env.user.name,
subject='Estado actualizado: Desechada',
message_type='notification'
)
def action_cancel(self):
"""Cancel the sample(s)"""
for record in self:
old_state = record.state
record.write({'state': 'cancelled'})
record.message_post(
body='Muestra cancelada por %s' % self.env.user.name,
subject='Estado actualizado: Cancelada',
message_type='notification'
)
def action_open_rejection_wizard(self):
"""Open the rejection wizard"""
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'name': 'Rechazar Muestra',
'res_model': 'lims.sample.rejection.wizard',
'view_mode': 'form',
'target': 'new',
'context': {
'default_sample_id': self.id,
}
}
def action_reject(self, create_resample=None):
"""Reject the sample - to be called from wizard
Args:
create_resample: Boolean to force resample creation. If None, uses system config
"""
self.ensure_one()
if self.state == 'completed':
raise ValueError('No se puede rechazar una muestra ya completada')
# This method is called from the wizard, so rejection fields should already be set
self.write({
'state': 'rejected',
'rejected_by': self.env.user.id,
'rejection_date': fields.Datetime.now()
})
reason_name = self.rejection_reason_id.name if self.rejection_reason_id else 'Sin especificar'
notes = self.rejection_notes or ''
body = f'Muestra rechazada por {self.env.user.name}<br/>Motivo: {reason_name}'
if notes:
body += f'<br/>Notas: {notes}'
self.message_post(
body=body,
subject='Estado actualizado: Rechazada',
message_type='notification'
)
# Notify related sale order if exists
if self.request_id:
self.request_id.message_post(
body=f'La muestra {self.name} ha sido rechazada. Motivo: {reason_name}',
subject='Muestra Rechazada',
message_type='notification'
)
# Determine if we should create a resample
should_create_resample = False
if create_resample is not None:
# Explicit value from wizard
should_create_resample = create_resample
else:
# Check system configuration
IrConfig = self.env['ir.config_parameter'].sudo()
auto_resample = IrConfig.get_param('lims_management.auto_resample_on_rejection', 'True') == 'True'
should_create_resample = auto_resample
if should_create_resample:
try:
# Create resample automatically
resample_action = self.action_create_resample()
self.message_post(
body=_('Re-muestra generada automáticamente debido al rechazo'),
subject='Re-muestreo Automático',
message_type='notification'
)
except UserError as e:
# If resample creation fails (e.g., max attempts reached), log it
self.message_post(
body=_('No se pudo generar re-muestra automática: %s') % str(e),
subject='Error en Re-muestreo',
message_type='notification'
)
@api.onchange('sample_type_product_id')
def _onchange_sample_type_product_id(self):
"""Synchronize container_type when sample_type_product_id changes"""
if self.sample_type_product_id:
# Try to map product name to legacy container type
product_name = self.sample_type_product_id.name.lower()
if 'suero' in product_name or 'serum' in product_name:
self.container_type = 'serum_tube'
elif 'edta' in product_name:
self.container_type = 'edta_tube'
elif 'hisopo' in product_name or 'swab' in product_name:
self.container_type = 'swab'
elif 'orina' in product_name or 'urine' in product_name:
self.container_type = 'urine'
else:
self.container_type = 'other'
def get_container_name(self):
"""Get container name from product or legacy field"""
if self.sample_type_product_id:
return self.sample_type_product_id.name
elif self.container_type:
return dict(self._fields['container_type'].selection).get(self.container_type)
return 'Unknown'
@api.depends('is_lab_sample', 'create_date')
def _compute_barcode(self):
"""Generate unique barcode for laboratory samples"""
for record in self:
if record.is_lab_sample and not record.barcode:
record.barcode = record._generate_unique_barcode()
elif not record.is_lab_sample:
record.barcode = False
def _generate_unique_barcode(self):
"""Generate a unique barcode in format YYMMDDNNNNNNC
YY: Year (2 digits)
MM: Month (2 digits)
DD: Day (2 digits)
NNNNNN: Sequential number (6 digits)
C: Check digit
"""
self.ensure_one()
now = datetime.now()
date_prefix = now.strftime('%y%m%d')
# Get the highest sequence number for today
domain = [
('is_lab_sample', '=', True),
('barcode', 'like', date_prefix + '%'),
('id', '!=', self.id)
]
max_barcode = self.search(domain, order='barcode desc', limit=1)
if max_barcode and max_barcode.barcode:
# Extract sequence number from existing barcode
try:
sequence = int(max_barcode.barcode[6:12]) + 1
except:
sequence = 1
else:
sequence = 1
# Ensure we don't exceed 6 digits
if sequence > 999999:
# Add prefix based on sample type to allow more barcodes
prefix_map = {
'suero': '1',
'edta': '2',
'orina': '3',
'hisopo': '4',
'other': '9'
}
type_prefix = '9' # default
if self.sample_type_product_id:
name_lower = self.sample_type_product_id.name.lower()
for key, val in prefix_map.items():
if key in name_lower:
type_prefix = val
break
sequence = int(type_prefix + str(sequence % 100000).zfill(5))
# Format sequence with leading zeros
sequence_str = str(sequence).zfill(6)
# Calculate check digit using Luhn algorithm
barcode_without_check = date_prefix + sequence_str
check_digit = self._calculate_luhn_check_digit(barcode_without_check)
final_barcode = barcode_without_check + str(check_digit)
# Verify uniqueness
existing = self.search([
('barcode', '=', final_barcode),
('id', '!=', self.id)
], limit=1)
if existing:
# If collision, add random component and retry
sequence = sequence * 10 + random.randint(0, 9)
sequence_str = str(sequence % 1000000).zfill(6)
barcode_without_check = date_prefix + sequence_str
check_digit = self._calculate_luhn_check_digit(barcode_without_check)
final_barcode = barcode_without_check + str(check_digit)
return final_barcode
def _calculate_luhn_check_digit(self, number_str):
"""Calculate Luhn check digit for barcode validation"""
digits = [int(d) for d in number_str]
odd_sum = sum(digits[-1::-2])
even_sum = sum([sum(divmod(2 * d, 10)) for d in digits[-2::-2]])
total = odd_sum + even_sum
return (10 - (total % 10)) % 10
def _ensure_barcode(self):
"""Ensure all lab samples have a barcode"""
for record in self:
if record.is_lab_sample and not record.barcode:
record.barcode = record._generate_unique_barcode()
return True
@api.depends('parent_sample_id')
def _compute_is_resample(self):
"""Compute if this sample is a resample"""
for record in self:
record.is_resample = bool(record.parent_sample_id)
@api.depends('child_sample_ids')
def _compute_resample_count(self):
"""Compute the number of times this sample has been resampled"""
for record in self:
record.resample_count = len(record.child_sample_ids)
@api.depends('parent_sample_id')
def _compute_root_sample(self):
"""Compute the root sample of the resample chain"""
for record in self:
root = record
while root.parent_sample_id:
root = root.parent_sample_id
record.root_sample_id = root if root != record else False
@api.depends('parent_sample_id', 'child_sample_ids')
def _compute_resample_chain_count(self):
"""Compute total resamples in the entire chain"""
for record in self:
# Find root sample
root = record
while root.parent_sample_id:
root = root.parent_sample_id
# Count all resamples from root
record.resample_chain_count = self._count_all_resamples_in_chain(root)
def action_create_resample(self):
"""Create a new sample as a resample of the current one"""
self.ensure_one()
# Determine the parent sample for the new resample
# If current sample is already a resample, use its parent
# Otherwise, use the current sample as parent
parent_for_resample = self.parent_sample_id if self.parent_sample_id else self
# Check if there's already an active resample for the parent
active_resamples = parent_for_resample.child_sample_ids.filtered(
lambda s: s.state not in ['rejected', 'cancelled', 'disposed']
)
if active_resamples:
raise UserError(_('La muestra %s ya tiene una re-muestra activa (%s). No se puede crear otra hasta que se procese o rechace la existente.') %
(parent_for_resample.name, ', '.join(active_resamples.mapped('name'))))
# Get configuration
IrConfig = self.env['ir.config_parameter'].sudo()
auto_resample = IrConfig.get_param('lims_management.auto_resample_on_rejection', 'True') == 'True'
initial_state = IrConfig.get_param('lims_management.resample_state', 'pending_collection')
prefix = IrConfig.get_param('lims_management.resample_prefix', 'RE-')
max_attempts = int(IrConfig.get_param('lims_management.max_resample_attempts', '3'))
# Find the original sample (root of the resample chain)
original_sample = parent_for_resample
while original_sample.parent_sample_id:
original_sample = original_sample.parent_sample_id
# Count all resamples in the chain
total_resamples = self._count_all_resamples_in_chain(original_sample)
# Check maximum resample attempts based on the entire chain
if max_attempts > 0 and total_resamples >= max_attempts:
raise UserError(_('Se ha alcanzado el número máximo de re-muestreos (%d) para esta cadena de muestras.') % max_attempts)
# Calculate resample number for naming (based on parent's resample count)
resample_number = len(parent_for_resample.child_sample_ids) + 1
# Prepare values for new sample
vals = {
'name': f"{prefix}{parent_for_resample.name}-{resample_number}",
'product_id': self.product_id.id,
'patient_id': self.patient_id.id,
'doctor_id': self.doctor_id.id,
'origin': self.origin,
'sample_type_product_id': self.sample_type_product_id.id,
'volume_ml': self.volume_ml,
'is_lab_sample': True,
'state': initial_state,
'analysis_names': self.analysis_names,
'parent_sample_id': parent_for_resample.id, # Always use the determined parent
'request_id': self.request_id.id if self.request_id else False,
}
# Create the resample
resample = self.create(vals)
# Post message in all relevant samples
self.message_post(
body=_('Re-muestra creada: %s') % resample.name,
subject='Re-muestreo',
message_type='notification'
)
if self != parent_for_resample:
# If we're creating from a resample, also notify the parent
parent_for_resample.message_post(
body=_('Nueva re-muestra creada: %s (debido al rechazo de %s)') % (resample.name, self.name),
subject='Re-muestreo',
message_type='notification'
)
resample.message_post(
body=_('Esta es una re-muestra de: %s<br/>Creada debido al rechazo de: %s<br/>Motivo: %s') %
(parent_for_resample.name, self.name, self.rejection_reason_id.name if self.rejection_reason_id else 'No especificado'),
subject='Re-muestra creada',
message_type='notification'
)
# Notify receptionist if configured
auto_notify = IrConfig.get_param('lims_management.auto_notify_resample', 'True') == 'True'
if auto_notify:
self._notify_resample_created(resample)
# If there's a related order, update it
if self.request_id:
self.request_id.message_post(
body=_('Se ha creado una re-muestra (%s) para la muestra rechazada %s') % (resample.name, self.name),
subject='Re-muestra creada',
message_type='notification'
)
# Add the new sample to the order's generated samples
self.request_id.generated_sample_ids = [(4, resample.id)]
return {
'type': 'ir.actions.act_window',
'name': 'Re-muestra Creada',
'res_model': 'stock.lot',
'res_id': resample.id,
'view_mode': 'form',
'target': 'current',
}
def _count_all_resamples_in_chain(self, root_sample):
"""Count all resamples in the entire chain starting from root"""
count = 0
samples_to_check = [root_sample]
while samples_to_check:
sample = samples_to_check.pop(0)
# Add all child samples to the check list
for child in sample.child_sample_ids:
count += 1
samples_to_check.append(child)
return count
def _notify_resample_created(self, resample):
"""Notify receptionist users about the created resample"""
# Find receptionist users
receptionist_group = self.env.ref('lims_management.group_lims_receptionist', raise_if_not_found=False)
if receptionist_group:
receptionist_users = receptionist_group.users
# Get the model id for stock.lot
model_id = self.env['ir.model'].search([('model', '=', 'stock.lot')], limit=1).id
# Create activities for receptionists
for user in receptionist_users:
self.env['mail.activity'].create({
'res_model': 'stock.lot',
'res_model_id': model_id, # Campo obligatorio
'res_id': resample.id,
'activity_type_id': self.env.ref('mail.mail_activity_data_todo').id,
'summary': _('Nueva re-muestra pendiente de recolección'),
'note': _('Se ha generado una re-muestra (%s) que requiere recolección. Muestra original: %s') %
(resample.name, self.name),
'user_id': user.id,
'date_deadline': fields.Date.today(),
})