# -*- coding: utf-8 -*- from odoo import models, fields, api, _ from odoo.exceptions import UserError from datetime import datetime import random class StockLot(models.Model): _name = 'stock.lot' _inherit = ['stock.lot', 'mail.thread', 'mail.activity.mixin'] is_lab_sample = fields.Boolean(string='Es Muestra de Laboratorio') barcode = fields.Char( string='Código de Barras', compute='_compute_barcode', store=True, readonly=True, help="Código de barras único para la muestra en formato YYMMDDNNNNNNC" ) patient_id = fields.Many2one( 'res.partner', string='Paciente', domain="[('is_patient', '=', True)]" ) request_id = fields.Many2one( 'sale.order', string='Orden de Laboratorio', domain="[('is_lab_request', '=', True)]" ) collection_date = fields.Datetime(string='Fecha de Recolección') container_type = fields.Selection([ ('serum_tube', 'Tubo de Suero'), ('edta_tube', 'Tubo EDTA'), ('swab', 'Hisopo'), ('urine', 'Contenedor de Orina'), ('other', 'Otro') ], string='Tipo de Contenedor (Obsoleto)', help='Campo obsoleto, use sample_type_product_id en su lugar') sample_type_product_id = fields.Many2one( 'product.template', string='Tipo de Muestra', domain="[('is_sample_type', '=', True)]", help="Producto que representa el tipo de contenedor/muestra" ) collector_id = fields.Many2one( 'res.users', string='Recolectado por', default=lambda self: self.env.user ) doctor_id = fields.Many2one( 'res.partner', string='Médico Referente', domain="[('is_doctor', '=', True)]", help="Médico que ordenó los análisis" ) origin = fields.Char( string='Origen', help="Referencia a la orden de laboratorio que generó esta muestra" ) volume_ml = fields.Float( string='Volumen (ml)', help="Volumen total de muestra requerido" ) analysis_names = fields.Char( string='Análisis', help="Lista de análisis que se realizarán con esta muestra" ) state = fields.Selection([ ('pending_collection', 'Pendiente de Recolección'), ('collected', 'Recolectada'), ('received', 'Recibida en Laboratorio'), ('in_process', 'En Proceso'), ('analyzed', 'Analizada'), ('stored', 'Almacenada'), ('disposed', 'Desechada'), ('cancelled', 'Cancelada'), ('rejected', 'Rechazada') ], string='Estado', default='collected', tracking=True) # Rejection fields rejection_reason_id = fields.Many2one( 'lims.rejection.reason', string='Motivo de Rechazo', tracking=True ) rejection_notes = fields.Text( string='Notas de Rechazo', help="Información adicional sobre el rechazo" ) rejected_by = fields.Many2one( 'res.users', string='Rechazado por', readonly=True ) rejection_date = fields.Datetime( string='Fecha de Rechazo', readonly=True ) # Re-sampling fields parent_sample_id = fields.Many2one( 'stock.lot', string='Muestra Original', help='Muestra original de la cual esta es un re-muestreo', domain="[('is_lab_sample', '=', True)]" ) child_sample_ids = fields.One2many( 'stock.lot', 'parent_sample_id', string='Re-muestras', help='Muestras generadas como re-muestreo de esta' ) resample_count = fields.Integer( string='Número de Re-muestreo', help='Indica cuántas veces se ha re-muestreado esta muestra', compute='_compute_resample_count', store=True ) is_resample = fields.Boolean( string='Es Re-muestra', compute='_compute_is_resample', store=True ) root_sample_id = fields.Many2one( 'stock.lot', string='Muestra Original (Raíz)', compute='_compute_root_sample', store=True, help='Muestra original de la cadena de re-muestreos' ) resample_chain_count = fields.Integer( string='Re-muestreos en Cadena', compute='_compute_resample_chain_count', help='Número total de re-muestreos en toda la cadena' ) def action_collect(self): """Mark sample(s) as collected""" for record in self: old_state = record.state record.write({'state': 'collected', 'collection_date': fields.Datetime.now()}) record.message_post( body='Muestra recolectada por %s' % self.env.user.name, subject='Estado actualizado: Recolectada', message_type='notification' ) def action_receive(self): """Mark sample(s) as received in laboratory""" for record in self: old_state = record.state record.write({'state': 'received'}) record.message_post( body='Muestra recibida en laboratorio por %s' % self.env.user.name, subject='Estado actualizado: Recibida', message_type='notification' ) def action_start_analysis(self): """Start analysis process""" for record in self: old_state = record.state record.write({'state': 'in_process'}) record.message_post( body='Análisis iniciado por %s' % self.env.user.name, subject='Estado actualizado: En Proceso', message_type='notification' ) def action_complete_analysis(self): """Mark analysis as completed""" for record in self: old_state = record.state record.write({'state': 'analyzed'}) record.message_post( body='Análisis completado por %s' % self.env.user.name, subject='Estado actualizado: Analizada', message_type='notification' ) def action_store(self): """Store the sample(s)""" for record in self: old_state = record.state record.write({'state': 'stored'}) record.message_post( body='Muestra almacenada por %s' % self.env.user.name, subject='Estado actualizado: Almacenada', message_type='notification' ) def action_dispose(self): """Dispose of the sample(s)""" for record in self: old_state = record.state record.write({'state': 'disposed'}) record.message_post( body='Muestra desechada por %s. Motivo de disposición registrado.' % self.env.user.name, subject='Estado actualizado: Desechada', message_type='notification' ) def action_cancel(self): """Cancel the sample(s)""" for record in self: old_state = record.state record.write({'state': 'cancelled'}) record.message_post( body='Muestra cancelada por %s' % self.env.user.name, subject='Estado actualizado: Cancelada', message_type='notification' ) def action_open_rejection_wizard(self): """Open the rejection wizard""" self.ensure_one() return { 'type': 'ir.actions.act_window', 'name': 'Rechazar Muestra', 'res_model': 'lims.sample.rejection.wizard', 'view_mode': 'form', 'target': 'new', 'context': { 'default_sample_id': self.id, } } def action_reject(self, create_resample=None): """Reject the sample - to be called from wizard Args: create_resample: Boolean to force resample creation. If None, uses system config """ self.ensure_one() if self.state == 'completed': raise ValueError('No se puede rechazar una muestra ya completada') # This method is called from the wizard, so rejection fields should already be set self.write({ 'state': 'rejected', 'rejected_by': self.env.user.id, 'rejection_date': fields.Datetime.now() }) reason_name = self.rejection_reason_id.name if self.rejection_reason_id else 'Sin especificar' notes = self.rejection_notes or '' body = f'Muestra rechazada por {self.env.user.name}
Motivo: {reason_name}' if notes: body += f'
Notas: {notes}' self.message_post( body=body, subject='Estado actualizado: Rechazada', message_type='notification' ) # Notify related sale order if exists if self.request_id: self.request_id.message_post( body=f'La muestra {self.name} ha sido rechazada. Motivo: {reason_name}', subject='Muestra Rechazada', message_type='notification' ) # Determine if we should create a resample should_create_resample = False if create_resample is not None: # Explicit value from wizard should_create_resample = create_resample else: # Check system configuration IrConfig = self.env['ir.config_parameter'].sudo() auto_resample = IrConfig.get_param('lims_management.auto_resample_on_rejection', 'True') == 'True' should_create_resample = auto_resample if should_create_resample: try: # Create resample automatically resample_action = self.action_create_resample() self.message_post( body=_('Re-muestra generada automáticamente debido al rechazo'), subject='Re-muestreo Automático', message_type='notification' ) except UserError as e: # If resample creation fails (e.g., max attempts reached), log it self.message_post( body=_('No se pudo generar re-muestra automática: %s') % str(e), subject='Error en Re-muestreo', message_type='notification' ) @api.onchange('sample_type_product_id') def _onchange_sample_type_product_id(self): """Synchronize container_type when sample_type_product_id changes""" if self.sample_type_product_id: # Try to map product name to legacy container type product_name = self.sample_type_product_id.name.lower() if 'suero' in product_name or 'serum' in product_name: self.container_type = 'serum_tube' elif 'edta' in product_name: self.container_type = 'edta_tube' elif 'hisopo' in product_name or 'swab' in product_name: self.container_type = 'swab' elif 'orina' in product_name or 'urine' in product_name: self.container_type = 'urine' else: self.container_type = 'other' def get_container_name(self): """Get container name from product or legacy field""" if self.sample_type_product_id: return self.sample_type_product_id.name elif self.container_type: return dict(self._fields['container_type'].selection).get(self.container_type) return 'Unknown' @api.depends('is_lab_sample', 'create_date') def _compute_barcode(self): """Generate unique barcode for laboratory samples""" for record in self: if record.is_lab_sample and not record.barcode: record.barcode = record._generate_unique_barcode() elif not record.is_lab_sample: record.barcode = False def _generate_unique_barcode(self): """Generate a unique barcode in format YYMMDDNNNNNNC YY: Year (2 digits) MM: Month (2 digits) DD: Day (2 digits) NNNNNN: Sequential number (6 digits) C: Check digit """ self.ensure_one() now = datetime.now() date_prefix = now.strftime('%y%m%d') # Get the highest sequence number for today domain = [ ('is_lab_sample', '=', True), ('barcode', 'like', date_prefix + '%'), ('id', '!=', self.id) ] max_barcode = self.search(domain, order='barcode desc', limit=1) if max_barcode and max_barcode.barcode: # Extract sequence number from existing barcode try: sequence = int(max_barcode.barcode[6:12]) + 1 except: sequence = 1 else: sequence = 1 # Ensure we don't exceed 6 digits if sequence > 999999: # Add prefix based on sample type to allow more barcodes prefix_map = { 'suero': '1', 'edta': '2', 'orina': '3', 'hisopo': '4', 'other': '9' } type_prefix = '9' # default if self.sample_type_product_id: name_lower = self.sample_type_product_id.name.lower() for key, val in prefix_map.items(): if key in name_lower: type_prefix = val break sequence = int(type_prefix + str(sequence % 100000).zfill(5)) # Format sequence with leading zeros sequence_str = str(sequence).zfill(6) # Calculate check digit using Luhn algorithm barcode_without_check = date_prefix + sequence_str check_digit = self._calculate_luhn_check_digit(barcode_without_check) final_barcode = barcode_without_check + str(check_digit) # Verify uniqueness existing = self.search([ ('barcode', '=', final_barcode), ('id', '!=', self.id) ], limit=1) if existing: # If collision, add random component and retry sequence = sequence * 10 + random.randint(0, 9) sequence_str = str(sequence % 1000000).zfill(6) barcode_without_check = date_prefix + sequence_str check_digit = self._calculate_luhn_check_digit(barcode_without_check) final_barcode = barcode_without_check + str(check_digit) return final_barcode def _calculate_luhn_check_digit(self, number_str): """Calculate Luhn check digit for barcode validation""" digits = [int(d) for d in number_str] odd_sum = sum(digits[-1::-2]) even_sum = sum([sum(divmod(2 * d, 10)) for d in digits[-2::-2]]) total = odd_sum + even_sum return (10 - (total % 10)) % 10 def _ensure_barcode(self): """Ensure all lab samples have a barcode""" for record in self: if record.is_lab_sample and not record.barcode: record.barcode = record._generate_unique_barcode() return True @api.depends('parent_sample_id') def _compute_is_resample(self): """Compute if this sample is a resample""" for record in self: record.is_resample = bool(record.parent_sample_id) @api.depends('child_sample_ids') def _compute_resample_count(self): """Compute the number of times this sample has been resampled""" for record in self: record.resample_count = len(record.child_sample_ids) @api.depends('parent_sample_id') def _compute_root_sample(self): """Compute the root sample of the resample chain""" for record in self: root = record while root.parent_sample_id: root = root.parent_sample_id record.root_sample_id = root if root != record else False @api.depends('parent_sample_id', 'child_sample_ids') def _compute_resample_chain_count(self): """Compute total resamples in the entire chain""" for record in self: # Find root sample root = record while root.parent_sample_id: root = root.parent_sample_id # Count all resamples from root record.resample_chain_count = self._count_all_resamples_in_chain(root) def action_create_resample(self): """Create a new sample as a resample of the current one""" self.ensure_one() # Determine the parent sample for the new resample # If current sample is already a resample, use its parent # Otherwise, use the current sample as parent parent_for_resample = self.parent_sample_id if self.parent_sample_id else self # Check if there's already an active resample for the parent active_resamples = parent_for_resample.child_sample_ids.filtered( lambda s: s.state not in ['rejected', 'cancelled', 'disposed'] ) if active_resamples: raise UserError(_('La muestra %s ya tiene una re-muestra activa (%s). No se puede crear otra hasta que se procese o rechace la existente.') % (parent_for_resample.name, ', '.join(active_resamples.mapped('name')))) # Get configuration IrConfig = self.env['ir.config_parameter'].sudo() auto_resample = IrConfig.get_param('lims_management.auto_resample_on_rejection', 'True') == 'True' initial_state = IrConfig.get_param('lims_management.resample_state', 'pending_collection') prefix = IrConfig.get_param('lims_management.resample_prefix', 'RE-') max_attempts = int(IrConfig.get_param('lims_management.max_resample_attempts', '3')) # Find the original sample (root of the resample chain) original_sample = parent_for_resample while original_sample.parent_sample_id: original_sample = original_sample.parent_sample_id # Count all resamples in the chain total_resamples = self._count_all_resamples_in_chain(original_sample) # Check maximum resample attempts based on the entire chain if max_attempts > 0 and total_resamples >= max_attempts: raise UserError(_('Se ha alcanzado el número máximo de re-muestreos (%d) para esta cadena de muestras.') % max_attempts) # Calculate resample number for naming (based on parent's resample count) resample_number = len(parent_for_resample.child_sample_ids) + 1 # Prepare values for new sample vals = { 'name': f"{prefix}{parent_for_resample.name}-{resample_number}", 'product_id': self.product_id.id, 'patient_id': self.patient_id.id, 'doctor_id': self.doctor_id.id, 'origin': self.origin, 'sample_type_product_id': self.sample_type_product_id.id, 'volume_ml': self.volume_ml, 'is_lab_sample': True, 'state': initial_state, 'analysis_names': self.analysis_names, 'parent_sample_id': parent_for_resample.id, # Always use the determined parent 'request_id': self.request_id.id if self.request_id else False, } # Create the resample resample = self.create(vals) # Post message in all relevant samples self.message_post( body=_('Re-muestra creada: %s') % resample.name, subject='Re-muestreo', message_type='notification' ) if self != parent_for_resample: # If we're creating from a resample, also notify the parent parent_for_resample.message_post( body=_('Nueva re-muestra creada: %s (debido al rechazo de %s)') % (resample.name, self.name), subject='Re-muestreo', message_type='notification' ) resample.message_post( body=_('Esta es una re-muestra de: %s
Creada debido al rechazo de: %s
Motivo: %s') % (parent_for_resample.name, self.name, self.rejection_reason_id.name if self.rejection_reason_id else 'No especificado'), subject='Re-muestra creada', message_type='notification' ) # Notify receptionist if configured auto_notify = IrConfig.get_param('lims_management.auto_notify_resample', 'True') == 'True' if auto_notify: self._notify_resample_created(resample) # If there's a related order, update it if self.request_id: self.request_id.message_post( body=_('Se ha creado una re-muestra (%s) para la muestra rechazada %s') % (resample.name, self.name), subject='Re-muestra creada', message_type='notification' ) # Add the new sample to the order's generated samples self.request_id.generated_sample_ids = [(4, resample.id)] return { 'type': 'ir.actions.act_window', 'name': 'Re-muestra Creada', 'res_model': 'stock.lot', 'res_id': resample.id, 'view_mode': 'form', 'target': 'current', } def _count_all_resamples_in_chain(self, root_sample): """Count all resamples in the entire chain starting from root""" count = 0 samples_to_check = [root_sample] while samples_to_check: sample = samples_to_check.pop(0) # Add all child samples to the check list for child in sample.child_sample_ids: count += 1 samples_to_check.append(child) return count def _notify_resample_created(self, resample): """Notify receptionist users about the created resample""" # Find receptionist users receptionist_group = self.env.ref('lims_management.group_lims_receptionist', raise_if_not_found=False) if receptionist_group: receptionist_users = receptionist_group.users # Get the model id for stock.lot model_id = self.env['ir.model'].search([('model', '=', 'stock.lot')], limit=1).id # Create activities for receptionists for user in receptionist_users: self.env['mail.activity'].create({ 'res_model': 'stock.lot', 'res_model_id': model_id, # Campo obligatorio 'res_id': resample.id, 'activity_type_id': self.env.ref('mail.mail_activity_data_todo').id, 'summary': _('Nueva re-muestra pendiente de recolección'), 'note': _('Se ha generado una re-muestra (%s) que requiere recolección. Muestra original: %s') % (resample.name, self.name), 'user_id': user.id, 'date_deadline': fields.Date.today(), })