# -*- coding: utf-8 -*- from odoo import models, fields, api, _ from odoo.exceptions import UserError from datetime import datetime import random class StockLot(models.Model): _name = 'stock.lot' _inherit = ['stock.lot', 'mail.thread', 'mail.activity.mixin'] is_lab_sample = fields.Boolean(string='Es Muestra de Laboratorio') barcode = fields.Char( string='Código de Barras', compute='_compute_barcode', store=True, readonly=True, help="Código de barras único para la muestra en formato YYMMDDNNNNNNC" ) patient_id = fields.Many2one( 'res.partner', string='Paciente', domain="[('is_patient', '=', True)]" ) request_id = fields.Many2one( 'sale.order', string='Orden de Laboratorio', domain="[('is_lab_request', '=', True)]" ) collection_date = fields.Datetime(string='Fecha de Recolección') container_type = fields.Selection([ ('serum_tube', 'Tubo de Suero'), ('edta_tube', 'Tubo EDTA'), ('swab', 'Hisopo'), ('urine', 'Contenedor de Orina'), ('other', 'Otro') ], string='Tipo de Contenedor (Obsoleto)', help='Campo obsoleto, use sample_type_product_id en su lugar') sample_type_product_id = fields.Many2one( 'product.template', string='Tipo de Muestra', domain="[('is_sample_type', '=', True)]", help="Producto que representa el tipo de contenedor/muestra" ) collector_id = fields.Many2one( 'res.users', string='Recolectado por', default=lambda self: self.env.user ) doctor_id = fields.Many2one( 'res.partner', string='Médico Referente', domain="[('is_doctor', '=', True)]", help="Médico que ordenó los análisis" ) origin = fields.Char( string='Origen', help="Referencia a la orden de laboratorio que generó esta muestra" ) volume_ml = fields.Float( string='Volumen (ml)', help="Volumen total de muestra requerido" ) analysis_names = fields.Char( string='Análisis', help="Lista de análisis que se realizarán con esta muestra" ) state = fields.Selection([ ('pending_collection', 'Pendiente de Recolección'), ('collected', 'Recolectada'), ('received', 'Recibida en Laboratorio'), ('in_process', 'En Proceso'), ('analyzed', 'Analizada'), ('stored', 'Almacenada'), ('disposed', 'Desechada'), ('cancelled', 'Cancelada'), ('rejected', 'Rechazada') ], string='Estado', default='collected', tracking=True) # Rejection fields rejection_reason_id = fields.Many2one( 'lims.rejection.reason', string='Motivo de Rechazo', tracking=True ) rejection_notes = fields.Text( string='Notas de Rechazo', help="Información adicional sobre el rechazo" ) rejected_by = fields.Many2one( 'res.users', string='Rechazado por', readonly=True ) rejection_date = fields.Datetime( string='Fecha de Rechazo', readonly=True ) # Re-sampling fields parent_sample_id = fields.Many2one( 'stock.lot', string='Muestra Original', help='Muestra original de la cual esta es un re-muestreo', domain="[('is_lab_sample', '=', True)]" ) child_sample_ids = fields.One2many( 'stock.lot', 'parent_sample_id', string='Re-muestras', help='Muestras generadas como re-muestreo de esta' ) resample_count = fields.Integer( string='Número de Re-muestreo', help='Indica cuántas veces se ha re-muestreado esta muestra', compute='_compute_resample_count', store=True ) is_resample = fields.Boolean( string='Es Re-muestra', compute='_compute_is_resample', store=True ) def action_collect(self): """Mark sample as collected""" old_state = self.state self.write({'state': 'collected', 'collection_date': fields.Datetime.now()}) self.message_post( body='Muestra recolectada por %s' % self.env.user.name, subject='Estado actualizado: Recolectada', message_type='notification' ) def action_receive(self): """Mark sample as received in laboratory""" old_state = self.state self.write({'state': 'received'}) self.message_post( body='Muestra recibida en laboratorio por %s' % self.env.user.name, subject='Estado actualizado: Recibida', message_type='notification' ) def action_start_analysis(self): """Start analysis process""" old_state = self.state self.write({'state': 'in_process'}) self.message_post( body='Análisis iniciado por %s' % self.env.user.name, subject='Estado actualizado: En Proceso', message_type='notification' ) def action_complete_analysis(self): """Mark analysis as completed""" old_state = self.state self.write({'state': 'analyzed'}) self.message_post( body='Análisis completado por %s' % self.env.user.name, subject='Estado actualizado: Analizada', message_type='notification' ) def action_store(self): """Store the sample""" old_state = self.state self.write({'state': 'stored'}) self.message_post( body='Muestra almacenada por %s' % self.env.user.name, subject='Estado actualizado: Almacenada', message_type='notification' ) def action_dispose(self): """Dispose of the sample""" old_state = self.state self.write({'state': 'disposed'}) self.message_post( body='Muestra desechada por %s. Motivo de disposición registrado.' % self.env.user.name, subject='Estado actualizado: Desechada', message_type='notification' ) def action_cancel(self): """Cancel the sample""" old_state = self.state self.write({'state': 'cancelled'}) self.message_post( body='Muestra cancelada por %s' % self.env.user.name, subject='Estado actualizado: Cancelada', message_type='notification' ) def action_open_rejection_wizard(self): """Open the rejection wizard""" self.ensure_one() return { 'type': 'ir.actions.act_window', 'name': 'Rechazar Muestra', 'res_model': 'lims.sample.rejection.wizard', 'view_mode': 'form', 'target': 'new', 'context': { 'default_sample_id': self.id, } } def action_reject(self): """Reject the sample - to be called from wizard""" self.ensure_one() if self.state == 'completed': raise ValueError('No se puede rechazar una muestra ya completada') # This method is called from the wizard, so rejection fields should already be set self.write({ 'state': 'rejected', 'rejected_by': self.env.user.id, 'rejection_date': fields.Datetime.now() }) reason_name = self.rejection_reason_id.name if self.rejection_reason_id else 'Sin especificar' notes = self.rejection_notes or '' body = f'Muestra rechazada por {self.env.user.name}
Motivo: {reason_name}' if notes: body += f'
Notas: {notes}' self.message_post( body=body, subject='Estado actualizado: Rechazada', message_type='notification' ) # Notify related sale order if exists if self.request_id: self.request_id.message_post( body=f'La muestra {self.name} ha sido rechazada. Motivo: {reason_name}', subject='Muestra Rechazada', message_type='notification' ) # Check if automatic resample is enabled IrConfig = self.env['ir.config_parameter'].sudo() auto_resample = IrConfig.get_param('lims_management.auto_resample_on_rejection', 'True') == 'True' if auto_resample: try: # Create resample automatically resample_action = self.action_create_resample() self.message_post( body=_('Re-muestra generada automáticamente debido al rechazo'), subject='Re-muestreo Automático', message_type='notification' ) except UserError as e: # If resample creation fails (e.g., max attempts reached), log it self.message_post( body=_('No se pudo generar re-muestra automática: %s') % str(e), subject='Error en Re-muestreo', message_type='notification' ) @api.onchange('sample_type_product_id') def _onchange_sample_type_product_id(self): """Synchronize container_type when sample_type_product_id changes""" if self.sample_type_product_id: # Try to map product name to legacy container type product_name = self.sample_type_product_id.name.lower() if 'suero' in product_name or 'serum' in product_name: self.container_type = 'serum_tube' elif 'edta' in product_name: self.container_type = 'edta_tube' elif 'hisopo' in product_name or 'swab' in product_name: self.container_type = 'swab' elif 'orina' in product_name or 'urine' in product_name: self.container_type = 'urine' else: self.container_type = 'other' def get_container_name(self): """Get container name from product or legacy field""" if self.sample_type_product_id: return self.sample_type_product_id.name elif self.container_type: return dict(self._fields['container_type'].selection).get(self.container_type) return 'Unknown' @api.depends('is_lab_sample', 'create_date') def _compute_barcode(self): """Generate unique barcode for laboratory samples""" for record in self: if record.is_lab_sample and not record.barcode: record.barcode = record._generate_unique_barcode() elif not record.is_lab_sample: record.barcode = False def _generate_unique_barcode(self): """Generate a unique barcode in format YYMMDDNNNNNNC YY: Year (2 digits) MM: Month (2 digits) DD: Day (2 digits) NNNNNN: Sequential number (6 digits) C: Check digit """ self.ensure_one() now = datetime.now() date_prefix = now.strftime('%y%m%d') # Get the highest sequence number for today domain = [ ('is_lab_sample', '=', True), ('barcode', 'like', date_prefix + '%'), ('id', '!=', self.id) ] max_barcode = self.search(domain, order='barcode desc', limit=1) if max_barcode and max_barcode.barcode: # Extract sequence number from existing barcode try: sequence = int(max_barcode.barcode[6:12]) + 1 except: sequence = 1 else: sequence = 1 # Ensure we don't exceed 6 digits if sequence > 999999: # Add prefix based on sample type to allow more barcodes prefix_map = { 'suero': '1', 'edta': '2', 'orina': '3', 'hisopo': '4', 'other': '9' } type_prefix = '9' # default if self.sample_type_product_id: name_lower = self.sample_type_product_id.name.lower() for key, val in prefix_map.items(): if key in name_lower: type_prefix = val break sequence = int(type_prefix + str(sequence % 100000).zfill(5)) # Format sequence with leading zeros sequence_str = str(sequence).zfill(6) # Calculate check digit using Luhn algorithm barcode_without_check = date_prefix + sequence_str check_digit = self._calculate_luhn_check_digit(barcode_without_check) final_barcode = barcode_without_check + str(check_digit) # Verify uniqueness existing = self.search([ ('barcode', '=', final_barcode), ('id', '!=', self.id) ], limit=1) if existing: # If collision, add random component and retry sequence = sequence * 10 + random.randint(0, 9) sequence_str = str(sequence % 1000000).zfill(6) barcode_without_check = date_prefix + sequence_str check_digit = self._calculate_luhn_check_digit(barcode_without_check) final_barcode = barcode_without_check + str(check_digit) return final_barcode def _calculate_luhn_check_digit(self, number_str): """Calculate Luhn check digit for barcode validation""" digits = [int(d) for d in number_str] odd_sum = sum(digits[-1::-2]) even_sum = sum([sum(divmod(2 * d, 10)) for d in digits[-2::-2]]) total = odd_sum + even_sum return (10 - (total % 10)) % 10 def _ensure_barcode(self): """Ensure all lab samples have a barcode""" for record in self: if record.is_lab_sample and not record.barcode: record.barcode = record._generate_unique_barcode() return True @api.depends('parent_sample_id') def _compute_is_resample(self): """Compute if this sample is a resample""" for record in self: record.is_resample = bool(record.parent_sample_id) @api.depends('child_sample_ids') def _compute_resample_count(self): """Compute the number of times this sample has been resampled""" for record in self: record.resample_count = len(record.child_sample_ids) def action_create_resample(self): """Create a new sample as a resample of the current one""" self.ensure_one() # Get configuration IrConfig = self.env['ir.config_parameter'].sudo() auto_resample = IrConfig.get_param('lims_management.auto_resample_on_rejection', 'True') == 'True' initial_state = IrConfig.get_param('lims_management.resample_state', 'pending_collection') prefix = IrConfig.get_param('lims_management.resample_prefix', 'RE-') max_attempts = int(IrConfig.get_param('lims_management.max_resample_attempts', '3')) # Check maximum resample attempts if max_attempts > 0 and self.resample_count >= max_attempts: raise UserError(_('Se ha alcanzado el número máximo de re-muestreos (%d) para esta muestra.') % max_attempts) # Calculate resample number for naming resample_number = self.resample_count + 1 # Prepare values for new sample vals = { 'name': f"{prefix}{self.name}-{resample_number}", 'product_id': self.product_id.id, 'patient_id': self.patient_id.id, 'doctor_id': self.doctor_id.id, 'origin': self.origin, 'sample_type_product_id': self.sample_type_product_id.id, 'volume_ml': self.volume_ml, 'is_lab_sample': True, 'state': initial_state, 'analysis_names': self.analysis_names, 'parent_sample_id': self.id, 'request_id': self.request_id.id if self.request_id else False, } # Create the resample resample = self.create(vals) # Post message in both samples self.message_post( body=_('Re-muestra creada: %s') % resample.name, subject='Re-muestreo', message_type='notification' ) resample.message_post( body=_('Esta es una re-muestra de: %s
Motivo: %s') % (self.name, self.rejection_reason_id.name if self.rejection_reason_id else 'No especificado'), subject='Re-muestra creada', message_type='notification' ) # Notify receptionist if configured auto_notify = IrConfig.get_param('lims_management.auto_notify_resample', 'True') == 'True' if auto_notify: self._notify_resample_created(resample) # If there's a related order, update it if self.request_id: self.request_id.message_post( body=_('Se ha creado una re-muestra (%s) para la muestra rechazada %s') % (resample.name, self.name), subject='Re-muestra creada', message_type='notification' ) # Add the new sample to the order's generated samples self.request_id.generated_sample_ids = [(4, resample.id)] return { 'type': 'ir.actions.act_window', 'name': 'Re-muestra Creada', 'res_model': 'stock.lot', 'res_id': resample.id, 'view_mode': 'form', 'target': 'current', } def _notify_resample_created(self, resample): """Notify receptionist users about the created resample""" # Find receptionist users receptionist_group = self.env.ref('lims_management.group_lims_receptionist', raise_if_not_found=False) if receptionist_group: receptionist_users = receptionist_group.users # Get the model id for stock.lot model_id = self.env['ir.model'].search([('model', '=', 'stock.lot')], limit=1).id # Create activities for receptionists for user in receptionist_users: self.env['mail.activity'].create({ 'res_model': 'stock.lot', 'res_model_id': model_id, # Campo obligatorio 'res_id': resample.id, 'activity_type_id': self.env.ref('mail.mail_activity_data_todo').id, 'summary': _('Nueva re-muestra pendiente de recolección'), 'note': _('Se ha generado una re-muestra (%s) que requiere recolección. Muestra original: %s') % (resample.name, self.name), 'user_id': user.id, 'date_deadline': fields.Date.today(), })