Código fuente de la cola de reintento de Rocketmq
En una transacción distribuida, los participantes, los servidores de soporte de transacciones, los servidores de recursos y los administradores de transacciones están ubicados en diferentes nodos en diferentes sistemas distribuidos. Las transacciones distribuidas tienen como objetivo garantizar la coherencia de los datos entre diferentes nodos.
1. Esquema 2PC (envío en dos fases): gran coherencia
Esquema 2.3PC (envío en tres fases)
3.TCC (intentar-confirmar) - Cancelar) Transacción-Consistencia final
4. Transacción Saga-Consistencia final
5. Tabla de mensajes locales-Consistencia final
6.Transacción MQ- Consistencia final
Además de su propia lógica empresarial, el productor de mensajes también necesita mantener una tabla de mensajes. Esta tabla de mensajes registra información que debe sincronizarse con otros servicios. Por supuesto, en esta tabla de mensajes, cada mensaje tiene un valor de estado para identificar si el mensaje se procesó correctamente.
La lógica empresarial de enviar y reproducir y la inserción de datos en la tabla de mensajes se completarán en una sola transacción, lo que evita el problema de un procesamiento comercial exitoso y la falla al enviar mensajes de transacción, o fallas en el procesamiento comercial. y envío exitoso de mensajes de transacción.
Por ejemplo:
Asumimos que actualmente existen dos servicios, el servicio de pedidos y el servicio de carrito de compras. Los usuarios combinan varios artículos en su carrito de compras para realizar un pedido y luego necesitan información sobre los artículos que acaban de poner en su carrito.
1. El productor del mensaje, es decir, el servicio de pedido, completa su propia lógica (pedido de productos) y luego envía el mensaje a otros servicios que requieren sincronización de datos a través de mq, que es el carrito de compras. en nuestro Servir de castañas.
2. Otros servicios (servicio de carrito de compras) escucharán esta cola;
1. Si se recibe este mensaje y la sincronización de datos se realiza con éxito, por supuesto, también es local. transacción, entonces el mensaje del productor de este mensaje (servicio de pedido) se ha procesado a través de mq, y luego el productor puede reconocer que la transacción ha finalizado. Si se trata de un error empresarial, el productor del mensaje de respuesta debe revertir los datos.
2. No he recibido este mensaje desde hace mucho tiempo, así que esto no sucederá. El remitente del mensaje tendrá una tarea programada y periódicamente volverá a intentar enviar mensajes no procesados en la tabla de mensajes.
3. Si el productor del mensaje (servicio de pedidos) recibe el recibo del mensaje;
1 si tiene éxito, la modificación es que el mensaje ha sido procesado, es decir, la sincronización de. la transacción distribuida se ha completado;
2. Si el resultado del mensaje es que la ejecución falla, la transacción se revertirá localmente al mismo tiempo, lo que indica que el mensaje ha sido procesado;
3. Si el mensaje se pierde, no se recibe el mensaje de recepción, es poco probable que esto suceda. El remitente del mensaje (servicio de pedidos) tendrá una tarea de programación para volver a intentar enviar mensajes no procesados en la tabla de mensajes. El servicio descendente debe ser idempotente y puede recibir mensajes duplicados varias veces. Si se pierde el mensaje de recepción del productor que respondió al mensaje, continuará recibiendo mensajes mq del productor y luego responderá nuevamente con el mensaje de recepción del productor de mensajes.
Hay dos operaciones muy importantes:
1. El servidor debe ser idempotente al procesar mensajes, y tanto el productor como el receptor del mensaje deben ser idempotentes;
2. El remitente necesita agregar un temporizador para recorrer y reenviar mensajes no procesados para evitar la interrupción de la ejecución de la transacción causada por la pérdida de mensajes.
Ventajas y desventajas de esta solución
Ventajas:
1. Logra la confiabilidad de los datos del mensaje a nivel de diseño y no depende del middleware de mensajes. debilitando la dependencia de las características mq.
2. Sencillo y fácil de implementar.
Desventajas:
La razón principal es que debe estar vinculado a datos comerciales y está altamente acoplado.
El uso de la misma base de datos ocupará algunos recursos de la base de datos empresarial.
A continuación se analiza el soporte de transacciones de varias colas de mensajes.
El problema con las transacciones en RocketMQ es garantizar que tanto las transacciones locales como los mensajes enviados tengan éxito o fracasen. Además, RocketMQ ha agregado un mecanismo de seguimiento de transacciones para mejorar la tasa de éxito de la ejecución de transacciones y la coherencia de los datos.
Hay principalmente dos aspectos: el envío normal de transacciones y la compensación de mensajes de transacciones.
Envío de transacción normal
1. Enviar un mensaje (medio mensaje). La diferencia entre este semimensaje y un mensaje normal es que el consumidor no puede ver este mensaje hasta que se envía la transacción.
2. El servidor MQ escribe información y devuelve el resultado de la respuesta;
3. Según el resultado de la respuesta del servidor MQ, decida si ejecutar la transacción local. Si el servidor MQ escribe la información, la transacción local se ejecuta con éxito; de lo contrario, no se ejecuta.
Si el servidor MQ no recibe un mensaje de confirmación o reversión, esta situación debe compensarse.
Proceso de compensación
1. Si MQ SERVER no recibe el mensaje de confirmación o reversión del remitente del mensaje, iniciará una consulta al remitente del mensaje, nuestro servidor, para consultar el estado actual del mensaje;
2. El remitente del mensaje recibe la solicitud de consulta correspondiente, consulta el estado de la transacción y luego envía el estado a MQ SERVER para que MQ SERVER pueda ejecutar procesos posteriores.
En comparación con la tabla de mensajes local que procesa transacciones distribuidas, las transacciones MQ se completan colocando la lógica que debe procesarse en la tabla de mensajes local en MQ.
Las transacciones en Kafka resuelven este problema y garantizan que varios mensajes enviados en una transacción tengan éxito o fracasen. Es decir, se garantiza la atomicidad de las operaciones de escritura en múltiples particiones.
Al cooperar con el mecanismo idempotente de Kafka, se implementa la implementación exactamente una vez de Kafka, satisfaciendo las aplicaciones en el modo lectura-proceso-escritura. Por supuesto, las transacciones en Kafka se refieren principalmente a este modelo.
¿Qué es el patrón lectura-proceso-escritura?
Li Ru: En la computación de flujo, Kafka se utiliza como fuente de datos y los resultados del cálculo se guardan en Kafka. Los datos se consumen de un tema en Kafka, se calculan en un clúster informático y luego se almacenan en otros temas en Kafka. En este proceso, es necesario asegurarse de que cada mensaje se procese solo una vez, para garantizar el éxito del resultado final. La atomicidad de las transacciones de Kafka garantiza que las lecturas y escrituras sean atómicas y que se realicen juntas o no se recuperen.
Analicemos cómo se implementan las transacciones de Kafka.
Su principio de implementación es similar al de las transacciones RocketMQ. Se basa en el envío en dos fases y su implementación puede ser problemática.
Primero permítanme presentarles al coordinador de transacciones. Para resolver el problema de las transacciones distribuidas, Kafka introduce el rol de coordinador de transacciones, que es responsable de coordinar toda la transacción en el lado del servidor. Este coordinador no es un proceso separado, sino parte del proceso del agente. El coordinador, al igual que las particiones, se elige para garantizar su disponibilidad.
El clúster Kafka también tiene un tema dedicado a registrar registros de transacciones, que registra todos los registros de transacciones. Habrá varios coordinadores al mismo tiempo, cada coordinador es responsable de administrar y utilizar varias particiones en el registro de transacciones. Esto permite que las transacciones se ejecuten en paralelo y mejora el rendimiento.
Echemos un vistazo al proceso específico
Envío de transacciones
1. El coordinador establece el estado de la transacción en PrepareCommit y lo escribe en el registro de transacciones. ;
2. El coordinador escribe la identificación del final de la transacción en cada partición, y luego el cliente puede liberar los mensajes de transacción no confirmados previamente filtrados al consumidor para su consumo;
La transacción devuelve Roll
1. El coordinador establece el estado de la transacción en PrepareAbort y lo escribe en el registro de transacciones
2. El coordinador escribe la información de reversión de la transacción en cada partición. Identificación y luego podrá descartar los mensajes de transacciones no confirmados previamente;
Aquí hay una imagen de la clase magistral de Message Queue.
El problema que resuelven las transacciones en RabbitMQ es garantizar que los mensajes del productor lleguen al servidor MQ. Esto es un poco diferente de otras transacciones MQ y no se discutirá aquí.
Analicemos primero las etapas por las que pasa el siguiente mensaje en MQ.
Fase de producción: El productor genera mensajes y los envía al agente a través de la red.
Fase de almacenamiento: cuando el agente recibe el mensaje, necesita descargarlo. Si es la versión agrupada de MQ, necesita sincronizar datos con otros nodos.
Etapa de consumo: el consumidor extrae datos del corredor y los transmite al consumidor a través de la red.
La pérdida de paquetes de red, fallas de red, etc. causarán la pérdida de mensajes.
Antes de que el productor envíe el mensaje, inicie una transacción a través de channel.txSelect y luego envíe el mensaje. Si el mensaje no se entrega al servidor, la transacción se revierte a channel.txRollback y se envía nuevamente. Si el servidor recibe el mensaje, confirma la transacción canal.txCommit.
Pero el rendimiento del uso de transacciones no es bueno. Esta es una operación sincrónica. Después de enviar un mensaje, el remitente será bloqueado y esperará una respuesta del servidor RabbitMQ antes de enviar el siguiente mensaje. El rendimiento y el rendimiento de los mensajes del productor se reducirán significativamente.
Utilizando el mecanismo de confirmación, el productor pone el canal en modo de confirmación. Una vez que un canal ingresa al modo de confirmación, a todos los mensajes publicados en el canal se les asignará una identificación única (comenzando desde 1). Una vez que el mensaje se haya entregado a todas las colas coincidentes, RabbitMQ enviará una confirmación al productor (incluida la etiqueta de entrega única del mensaje y múltiples parámetros), lo que le permitirá saber que el mensaje se ha entregado.
Cuando múltiple es verdadero, significa confirmación del mensaje por lotes; cuando es verdadero, significa que la identificación del mensaje menor o igual a la etiqueta de entrega devuelta ha sido confirmada; cuando es falso, significa que el mensaje es verdadero; Se ha confirmado el ID del mensaje con la etiqueta de entrega devuelta. La noticia ha sido confirmada.
Existen tres tipos de mecanismos de confirmación.
1. Confirmación sincrónica
2. Confirmación por lotes
3. Confirmación asincrónica
La eficiencia del modo sincrónico es muy baja porque Cada Mensaje debe esperar la confirmación antes de poder ser procesado.
El modo de confirmación por lotes es más eficiente que el modo síncrono, pero tiene un defecto fatal. Una vez que falla la confirmación de respuesta, todos los mensajes del lote de confirmación actual se retransmitirán, lo que provocará la duplicación de mensajes.
El modo asíncrono es una buena opción. No tiene el problema de bloqueo del modo síncrono y es muy eficiente.
Kafka presentó un intermediario. El corredor confirma el mensaje al productor y al consumidor, y el productor envía el mensaje al corredor. Si el corredor no recibe la confirmación, puede optar por continuar enviando.
Siempre que el productor reciba la respuesta de confirmación del agente, puede garantizar que el mensaje no se perderá durante la fase de producción. Algunas colas de mensajes volverán a intentarlo automáticamente después de no recibir una respuesta de confirmación durante un tiempo prolongado. Si el reintento falla nuevamente, se notificará al usuario mediante un valor de retorno o una excepción.
Siempre que la respuesta de confirmación del Broker se procese correctamente, se puede evitar la pérdida de mensajes.
RocketMQ proporciona tres formas de enviar mensajes, a saber:
Envío sincrónico: el productor envía un mensaje al corredor, bloqueando el hilo actual para que no espere a que el corredor responda y envíe el mensaje. resultado.
Envío asincrónico: el productor primero crea una tarea para enviar un mensaje al intermediario, envía la tarea al grupo de subprocesos y vuelve a llamar a una función de devolución de llamada personalizada después de que se completa la tarea para ejecutar el resultado del procesamiento.
Envío unidireccional: Oneway solo es responsable de enviar solicitudes sin esperar respuestas, y el Productor solo es responsable de enviar solicitudes sin procesar los resultados de las respuestas.
En la fase de almacenamiento normal, siempre que el agente se ejecute normalmente, no habrá problema de pérdida de mensajes. Sin embargo, si el agente falla, como si el proceso muere o el servidor falla, el mensaje. todavía puede estar perdido.
Para evitar la pérdida de volumen de mensajes durante la fase de almacenamiento, se puede persistir para evitar situaciones anormales (reinicio, apagado, apagado). . .
La persistencia de RabbitMQ tiene tres partes:
Para la persistencia del mensaje, especifique delivery_mode=2 (1 es no persistente) al entregar. Para la persistencia de los mensajes, debe hacer coincidir la persistencia de la cola y solo configurar la persistencia de los mensajes. Después de reiniciar, la cola desaparece y luego los mensajes se pierden. Por lo tanto, no tiene mucho sentido configurar la persistencia de mensajes sin configurar la persistencia de la cola.
Para la persistencia, si todos los mensajes están configurados para persistencia, el rendimiento de escritura se verá afectado, por lo que puede elegir mensajes con requisitos de confiabilidad de persistencia altos.
Sin embargo, la persistencia de los mensajes no puede evitar por completo la pérdida de mensajes.
Por ejemplo, si los datos se pierden durante el proceso de desinstalación y los mensajes no se sincronizan con la memoria a tiempo, los datos también se perderán. Este problema se puede resolver introduciendo una cola espejo.
Función de la cola reflejada: al introducir la cola reflejada, la cola se puede reflejar en otros nodos de agente en el clúster. Si falla un nodo del clúster, la cola puede cambiar automáticamente a otro nodo en el espejo para garantizar la disponibilidad del servicio. (No se discutirán más detalles aquí.)
El sistema operativo en sí tiene una capa de caché llamada caché de página. Al escribir en un archivo de disco, el sistema primero escribe el flujo de datos en la caché.
Después de que Kafka reciba el mensaje, también se almacenará primero en el caché de la página y luego el sistema operativo vaciará el disco de acuerdo con su propia política o forzará el vaciado del disco mediante el comando fsync. Si el sistema se bloquea, los datos de PageCache se perderán. Es decir, se perderán los datos en el agente correspondiente.
Procesamiento de ideas
1. Controlar el agente del responsable de la campaña. Si un corredor va demasiado por detrás del líder original, inevitablemente provocará la pérdida de mensajes una vez que se convierta en el nuevo líder.
2. El mensaje de control solo se puede enviar después de escribir varias copias, evitando el problema 1 anterior.
1. Cambie el modo de cepillado de disco a cepillado de disco sincrónico;
2 para agentes con múltiples nodos, es necesario configurar el clúster de agentes para enviar mensajes a al menos dos nodos. y luego Enviar una respuesta de confirmación al cliente. De esta manera, cuando un corredor cae, otros corredores pueden reemplazar al corredor caído y no se perderá ningún mensaje.
La etapa de consumo es muy sencilla. Si se pierde en la transmisión de la red, el mensaje seguirá enviándose al consumidor. En la etapa de consumo, solo necesitamos controlar la confirmación del consumo una vez que se completa el procesamiento de la lógica de negocios.
Resumen: para la pérdida de mensajes, también podemos usar la idea de tablas de mensajes locales para descartar los mensajes cuando se generan y enviar los mensajes que no se han procesado durante mucho tiempo de regreso al cola a través del tiempo.
La transmisión de mensajes en MQ se puede dividir aproximadamente en los siguientes tres tipos:
1 Como máximo una vez: como máximo una vez. Cuando se entrega un mensaje, se entrega como máximo una vez. No es seguro, puede ocurrir pérdida de datos.
2. Al menos una vez: al menos una vez. Los mensajes se entregan al menos una vez. En otras palabras, no se permiten mensajes perdidos, pero sí una pequeña cantidad de mensajes duplicados.
3. Exactamente una vez: Exactamente una vez. Cuando se entrega un mensaje, se entrega solo una vez y no se permite pérdida ni duplicación. Este es el estado más alto.
La mayoría de las colas de mensajes tienen al menos una vez, lo que significa que se pueden permitir mensajes duplicados.
Los consumidores necesitamos satisfacer la idempotencia y normalmente existen las siguientes soluciones.
1. Utilice la unicidad de la base de datos
Según la situación empresarial, seleccione el valor único en la empresa como clave única de la base de datos, cree una nueva lista en ejecución y luego ejecútelo en la misma transacción Operaciones comerciales e insertando datos de la lista de ejecución. Si los datos de la lista de ejecución ya existen, la ejecución no garantizará la idempotencia. También puede consultar primero los datos de la lista en ejecución, luego ejecutar el negocio si no hay datos e insertar los datos de la lista en ejecución.
Pero tenga en cuenta la latencia al leer y escribir en la base de datos.
2. Agregar requisitos previos para actualizar la base de datos.
3. Asigne al mensaje una identificación única.
Se agrega una identificación única a cada mensaje. El consumo de mensajes repetidos se maneja agregando una lista en ejecución en el método 1 con la ayuda de la unicidad de la base de datos.