Mensajes de transacciones de RocketMQ
El mensaje de transacción de RocketMQ significa que el envío de eventos de mensajes y otros eventos deben tener éxito o fallar al mismo tiempo. Por ejemplo, si desea transferir $10,000 de una cuenta en el Banco A a una cuenta en el Banco B, entonces el mensaje que el Banco A envía "Agregar $10,000 a la cuenta del Banco B" debe ser el mismo que "Debitar $10,000 de la cuenta del Banco A". la operación tiene éxito o falla al mismo tiempo.
RocketMQ utiliza un método de confirmación de dos fases para implementar mensajes de transacción. Cuando TransactionMQ Producer maneja la situación anterior, primero envía el mensaje "Prepárese para agregar $10,000 al banco B" y luego deduce $10,000 del banco A después de que el mensaje se envía exitosamente. Durante la operación, se juzga si el mensaje anterior "preparándose para agregar 10,000 yuanes de la cuenta bancaria de B" se envía o se revierte en función del éxito o fracaso de la operación. El proceso específico es el siguiente:
2) El remitente envía un mensaje de "confirmación pendiente" a RocketMQ.
2) RocketMQ persiste con éxito el mensaje recibido "por confirmar" y responde al remitente que el mensaje se envió correctamente. En este momento, se completa la primera fase del envío del mensaje.
3) El remitente comienza a ejecutar la lógica de eventos local.
4) El remitente envía un segundo mensaje de confirmación (confirmación o reversión) a RocketMQ según el resultado de la ejecución del evento local. Cuando RocketMQ reciba el estado de confirmación, marcará el mensaje de la primera fase como entregable y el suscriptor podrá recibir el mensaje; cuando RocketMQ reciba el estado de reversión, eliminará el mensaje de la primera fase y el suscriptor no podrá recibir el mensaje; ser recibido.
5) Si se produce una excepción y la confirmación secundaria enviada en el paso 4 no llega a RocketMQ, el servidor iniciará una solicitud de sondeo después de un tiempo fijo para obtener el mensaje a confirmar.
6) Cuando el remitente recibe una solicitud de reversión de mensaje (si el productor que envió el mensaje de la primera fase no está funcionando, la solicitud de reversión se enviará a otro productor en el mismo grupo de productores), que devolverá un estado de Commit o Roolback comprobando los resultados de ejecución del evento local del mensaje correspondiente.
7) RocketMQ recibe la solicitud Roolback y sigue la lógica del paso 4).
La lógica anterior parece implementar bien la funcionalidad de mensajería transaccional y es la misma lógica utilizada para implementar la mensajería transaccional en versiones anteriores de RocketMQ.
Sin embargo, dado que RocketMQ depende de la capacidad de escribir datos secuencialmente en el disco para mejorar el rendimiento, el paso 4) requiere cambiar el estado de la primera etapa del mensaje, lo que puede provocar demasiadas capturas de disco Sucio. páginas, reduciendo así el rendimiento del sistema. Por lo tanto, RocketMQ eliminó esta funcionalidad en la versión 4.x. Todavía existen algunas clases de nivel superior en el sistema y los usuarios pueden implementar sus propias funciones de transacción de acuerdo con sus necesidades reales.
El cliente tiene tres clases para ayudar a los usuarios a implementar mensajes de transacción.
La primera clase es LocalTransaction-Executer, que se utiliza para crear una instancia de la lógica en el paso 3) y devolver LocalTransactionState según. a la situación .ROLLBACK_MESSAGE o
LocalTransactionState.COMMIT_MESSAGE.
La segunda clase es TransactionMQProducer. Su uso es similar al de DefaultMQProducer. Inicia un Productor al mismo tiempo a través de mensajes, pero tiene más funciones de transacciones locales y funciones de verificación de estado que DefaultMQProducer.
La tercera clase es TransactionCheckListener, que implementa la solicitud de verificación del servidor MQ en el paso 5 y devuelve LocalTransactionState.ROLLBACK_MESSAGE o LocalTransactionState.COMMIT_MESSAGE
La tercera clase es TransactionCheckListener. Solicitud de verificación del servidor MQ en el paso 5 y devuelve LocalTransactionState.ROLLBACK_MESSAGE o LocalTransactionState.COMMIT_MESSAGE
La tercera clase es TransactionCheckListener. MENSAJE
La figura anterior ilustra el esquema general de mensajes de transacción, que se divide en dos procesos: el envío y envío de mensajes de transacción normales y el proceso de compensación de mensajes de transacción.
1. Envío y envío de mensajes de transacciones:
(1) Enviar mensajes (MEDIOS mensajes).
(2) El servidor responde al resultado de la escritura del mensaje.
(3) Ejecute transacciones locales en función de los resultados del envío (si la escritura falla, la empresa no verá el medio mensaje en este momento y la lógica local no se ejecutará).
(4) Realice una confirmación o reversión según el estado de la transacción local (la operación de confirmación generará un índice de mensaje y los consumidores podrán ver el mensaje).
2. Proceso de compensación:
(1) Para mensajes de transacción que no se han confirmado/revertido (mensajes en estado pendiente), inicie una "reversión" desde el servidor.
(2) El productor recibe el mensaje de reversión y verifica el estado de la transacción local correspondiente al mensaje de reversión.
(3) Reenviar o revertir según el estado de la transacción local.
La fase de compensación se utiliza para resolver situaciones en las que los mensajes de confirmación o reversión caducan o fallan.
En el proceso principal de los mensajes de transacción de RocketMQ, los usuarios no pueden ver cómo se ejecutan los mensajes. En comparación con los mensajes normales, una de las características más importantes de los mensajes de transacciones es que los usuarios no pueden ver los mensajes enviados en una determinada etapa. La mensajería de transacciones RocketMQ hace esto: si el mensaje es medio mensaje, realiza una copia de seguridad del tema del mensaje original con la cola de consumo de mensajes y luego cambia el tema a RMQ_SYS_TRANS_HALF_TOPIC. Dado que el grupo de consumidores no se suscribe al tema, el consumidor no puede consumir mensajes a medias. La segunda fase muestra la ejecución de un mensaje MEDIO de confirmación o reversión (lápida). Por supuesto, para evitar el fallo de la operación de dos fases, RocketMQ inicia una tarea programada para extraer mensajes del tema como RMQ_SYS_TRANS_HALF_TOPIC para su consumo y obtiene el proveedor de servicios según el grupo de productores para enviar una solicitud para revisar la transacción. estado y, de acuerdo con el estado de la transacción, decida si confirmar o revertir el mensaje.
En RocketMQ, los mensajes se almacenan en el lado del servidor en la siguiente estructura. Cada mensaje tendrá una información de índice correspondiente. El consumidor lee el contenido de la entidad del mensaje a través del índice secundario de ConsumeQueue. siguiente:
La estrategia de implementación específica de RocketMQ es: al escribir un mensaje de transacción, si se reemplazan los atributos de Tema y Cola del mensaje, y la información original de Tema y Cola se almacena en los atributos del mensaje, el El mensaje no se actualizará debido al reemplazo del tema del mensaje. Se reenviará a la cola de consumo de mensajes del tema original. El consumidor no puede percibir la existencia del mensaje y no se consumirá. De hecho, cambiar el tema de los mensajes es una práctica común en RocketMQ, lo que recuerda cómo se implementan los mensajes diferidos.
RMQ_SYS_TRANS_HALF_TOPIC
Después de completar la primera fase de escribir el mensaje que es invisible para el usuario, la segunda fase (si es una operación de confirmación) debe hacer que el mensaje sea visible para el usuario (si; es una operación de confirmación) necesita hacer que el mensaje sea visible para el usuario. Visible para el usuario; la segunda fase (en el caso de una operación de confirmación) necesita hacer que el mensaje sea visible para el usuario. Si se trata de una operación de reversión, es necesario revocar la información de la primera etapa. Comencemos con la reversión. En el caso de una reversión, el usuario no puede ver el mensaje, por lo que no es necesario deshacerlo (de hecho, RocketMQ no puede eliminar el mensaje porque escribe el archivo secuencialmente). Sin embargo, dado que el mensaje no está en un estado definido (Pendiente, transacción pendiente), se necesita una operación para identificar el estado final del mensaje. El esquema de mensajería de transacciones de RocketMQ introduce el concepto de mensaje Op, que identifica el estado definido de la transacción. mensaje (confirmar o revertir). Si el mensaje de transacción no tiene un mensaje Op, no se puede determinar el estado de la transacción (es posible que haya fallado en la segunda fase). Cuando se introduce un mensaje Op, el mensaje de transacción, ya sea Commit o Rollback, registrará una operación Op; en comparación con Rollback, Commit simplemente crea un índice del medio mensaje antes de escribir el mensaje Op.
RocketMQ escribe el mensaje Op en un tema global específico a través de un método en el código fuente:
TransactionalMessageUtil.buildOpTopic(); este tema es un tema interno (al igual que el medio mensaje); Tema), no será utilizado por Half. El contenido del mensaje Op es el desplazamiento almacenado del mensaje Half correspondiente, lo que permite que el mensaje Op se indexe en el mensaje Half en operaciones retrospectivas posteriores.
Al realizar una operación de confirmación de dos fases, es necesario establecer el índice del mensaje Half. El medio mensaje en la primera etapa se escribe en un tema especial, por lo que la segunda etapa de indexación debe leer el medio mensaje, reemplazar el tema y la cola con el tema y la cola de destino reales, y luego generarlo mediante operaciones normales de escritura de mensajes. Mensajes visibles para el usuario. Por lo tanto, la segunda etapa del mensaje de transacción RocketMQ en realidad utiliza el contenido del mensaje almacenado en la primera etapa, restaura el mensaje normal completo en la segunda etapa y luego completa el proceso de escritura del mensaje.
Si un mensaje de transacción de RocketMQ falla en la segunda fase, como un problema de red durante la operación de confirmación que hace que la confirmación falle, existe una estrategia que permite que el mensaje finalmente se confirme. RocketMQ utiliza un mecanismo de compensación llamado "retrospectiva". RocketMQ utiliza un mecanismo de compensación llamado "retrospectiva". En este mecanismo, el corredor inicia una retrospectiva de mensajes con estado incierto y envía los mensajes a los productores correspondientes (el mismo grupo). Los productores verifican el estado de los mensajes locales. transacción y luego realizar una operación de confirmación o reversión. Al comparar el mensaje Half y el mensaje Op, el corredor verifica los mensajes de transacción y avanza el CheckPoint (registrando si el estado de estos mensajes de transacción es normal).
Vale la pena señalar que rocketmq no verificará continuamente la información del estado de la transacción; lo hará 15 veces de forma predeterminada. Si el estado de la transacción aún se desconoce después de 15 veces, rocketmq se ejecutará. devolver el mensaje de forma predeterminada.
Implementación de la clase TxConsumer