Código fuente de deduplicación de Redis
Por ejemplo, se envía un mensaje M al middleware de mensajes y el mensaje se entrega al programa consumidor A. A consume el mensaje después de recibirlo, pero el programa se reinicia durante el consumo. En este momento, el mensaje no está marcado como consumido exitosamente y el mensaje continuará enviándose al consumidor hasta que se consuma exitosamente.
Sin embargo, debido a esta naturaleza confiable, los mensajes pueden entregarse varias veces. Por ejemplo, este es el ejemplo. El programa A recibe este mensaje M. Después de completar la lógica de consumo, está a punto de notificar al middleware el mensaje "He consumido correctamente" y el programa se reinicia. Entonces, para el middleware de mensajes, el mensaje no se ha consumido correctamente, por lo que se seguirá entregando. En este momento, para la aplicación A, parece que el mensaje se ha consumido correctamente, pero el middleware del mensaje todavía se entrega repetidamente.
En el escenario RockectMQ, los mensajes con el mismo ID de mensaje se entregan repetidamente.
La confiabilidad de la entrega basada en mensajes (sin pérdida de mensajes) es una prioridad más alta, por lo que las tareas que no requieren muchos mensajes se transferirán a la autoimplementación de la aplicación. Es por eso que la documentación de RocketMQ enfatiza que se necesita lógica de consumo. ser razón idempotente. De hecho, la lógica detrás de esto es: ninguna pérdida ni duplicación son contradictorias (en un escenario distribuido), pero existen soluciones para la replicación de mensajes y la pérdida de mensajes es muy problemática.
Por ejemplo, supongamos que la lógica de consumo de mensajes de nuestro negocio es: insertar datos en una tabla de pedidos y luego actualizar el inventario:
Para lograr la idempotencia de los mensajes, podemos adoptar esta solución:
Muchas veces esto funciona muy bien, pero en escenarios concurrentes, aún pueden surgir problemas.
Suponiendo que la suma de todos los códigos consumidos es 1 segundo y que hay mensajes duplicados que llegan en 1 segundo (suponiendo 100 milisegundos) (como que el productor retransmite rápidamente, el agente se reinicia, etc.), entonces Es muy posible que los datos en el código repetido anterior todavía estén vacíos (porque el último mensaje no se ha consumido y el estado del pedido no se ha actualizado correctamente).
Luego se traspasará la barrera de verificación, lo que eventualmente provocará que la lógica de consumo repetido de mensajes ingrese al código comercial de seguridad no idempotente, lo que resultará en problemas de consumo repetido (como conflictos de clave primaria que generan excepciones, deducciones repetidas de inventario). pero no publicado), etc.)
Para resolver el problema de idempotencia de mensajes en el escenario de concurrencia anterior, una solución ideal es abrir una transacción, cambiar la instrucción de selección a una instrucción de selección para actualización y bloquear el registro.
Sin embargo, esta lógica de consumo hará que el tiempo de consumo de todo el mensaje sea más largo y la concurrencia disminuya debido a la introducción de paquetes de transacciones.
Por supuesto, existen otras soluciones más avanzadas, como utilizar el bloqueo optimista para actualizar el estado del pedido. Si la actualización falla, el mensaje se consumirá nuevamente. Sin embargo, para escenarios comerciales específicos, esto requiere un desarrollo de código y un diseño de tablas de biblioteca más complejos y detallados, lo cual está más allá del alcance de este artículo.
Sin embargo, tanto la solución de selección para actualización como la solución de bloqueo optimista en realidad se basan en la propia tabla de negocios, lo que sin duda aumenta la complejidad del desarrollo empresarial. Una gran parte del procesamiento de solicitudes en los sistemas empresariales se basa en MQ. Si cada lógica de consumo necesita desarrollarse en función del propio negocio, será una carga de trabajo tediosa. Este artículo espera explorar un método general de procesamiento de mensajes idempotentes, abstrayendo así ciertas clases de herramientas para aplicarlas a diversos escenarios comerciales.
En el middleware de mensajes, existe un concepto de semántica de entrega. Una de estas semánticas se llama "exactamente una vez", es decir, el mensaje se consumirá con éxito y solo una vez.
La siguiente es la explicación de Alibaba Cloud de exactamente una vez:
En el campo del procesamiento idempotente de mensajes comerciales, podemos pensar que el código del mensaje comercial definitivamente se ejecutará y solo se ejecutará una vez, entonces podemos pensar. de ello como exactamente una vez.
Sin embargo, encontrar una solución universal en escenarios distribuidos es casi imposible. Pero si se trata de una lógica de consumo basada en transacciones de bases de datos, en realidad es factible.
Supongamos que la lógica de consumo de mensajes de nuestro negocio es: actualizar el estado de una tabla de pedidos en la base de datos MySQL:
Para lograr una precisión única, es decir, el mensaje es solo se consume una vez (y debe consumirse una vez). Podemos hacer esto: agregar una tabla de registro de consumo de mensajes a esta base de datos, insertar el mensaje en esta tabla y enviar la actualización del pedido original junto con la acción de inserción en la misma transacción, de modo que para garantizar que el mensaje solo se consuma una vez.
1. Abra la transacción
2. Inserte la tabla de mensajes (maneje el problema del conflicto de clave principal)
3. lógica)
4. Enviar la transacción
Descripción:
1. En este momento, si el consumo del mensaje es exitoso y la transacción se envía, el mensaje. La tabla se insertará correctamente. En este momento, incluso si RocketMQ no ha recibido la actualización del sitio consumidor y la envía nuevamente, se considerará que la inserción del mensaje se ha consumido y luego el sitio consumidor se actualizará directamente. Esto garantiza que nuestro código de consumidor solo se ejecute una vez. 2. Si el servicio se bloquea (por ejemplo, se reinicia) antes de enviar la transacción, la transacción local no se ejecutará, el pedido no se actualizará y la inserción de la tabla de mensajes para el servidor RocketMQ, el sitio del consumidor, no será exitosa; no se ha actualizado, por lo que el mensaje se seguirá entregando. Después de la entrega, se descubre que el mensaje se insertó correctamente en la tabla de mensajes y el consumo puede continuar. Esto asegura que los mensajes no se pierdan.
De hecho, la implementación semántica exactamente una vez de Alibaba Cloud ONS es similar a esta solución basada en las características de transacción de la base de datos. Para obtener más detalles, consulte: /document_detail/102777.html.
Basado en este método, de hecho se puede extender a diferentes escenarios de aplicación, porque su implementación no tiene nada que ver con el negocio específico en sí: se basa en una tabla de mensajes.
Pero tiene sus limitaciones.
1. La lógica de consumo de mensajes debe basarse en transacciones de bases de datos relacionales. Si se modifican otros datos durante el proceso de consumo, como Redis, una fuente de datos que no admite funciones de transacción, los datos no se pueden revertir.
2. Los datos de la base de datos deben estar en una base de datos y no se pueden resolver entre bases de datos.
Nota: En las empresas, el diseño de la tabla de mensajes no debe identificarse por el ID del mensaje, sino por la clave principal comercial de la empresa, de modo que sea más razonable manejar la retransmisión del productor. La deduplicación de mensajes en Alibaba Cloud es solo el ID del mensaje de RocketMQ. En el escenario en el que el productor retransmite manualmente por algún motivo (como solicitudes de transacciones ascendentes varias veces), no logrará el efecto de deduplicación/idempotencia (porque el ID del mensaje no es el mismo). ).
Como se mencionó anteriormente, la implementación de la semántica exactamente una vez de esta manera en realidad tiene muchas limitaciones, lo que hace que esta solución básicamente no sea digna de una aplicación generalizada. Y debido a que se basa en transacciones, puede causar problemas de rendimiento, como un tiempo prolongado de bloqueo de la tabla.
Como ejemplo, tomemos un mensaje común de una aplicación de pedido. Pueden existir los siguientes pasos (en adelante denominados colectivamente paso X):
1. Verificar inventario (RPC)
2 Bloquear inventario (RPC)
4. Llame a otros servicios posteriores (RPC)
5. >6. Enviar la transacción (MySQL)
En este caso, si adoptamos el modo de implementación de transacción local de la tabla de mensajes, muchos subprocesos en el proceso de consumo de mensajes no admiten la reversión, lo que significa que incluso si Agregamos una transacción, sus operaciones detrás de ella tampoco son atómicas. ¿Cómo decirlo? En otras palabras, es posible que el servicio se haya reiniciado cuando la primera tienda estaba realizando el segundo paso de bloquear el inventario.
En cuanto a la tercera pregunta, siempre que diseñemos una clave de mensaje repetido que admita claves primarias comerciales (como número de pedido, número de serie de solicitud, etc.). ), no solo ID de mensaje. Entonces no es un problema.
Si es un lector cuidadoso, puede encontrar que en realidad hay una laguna lógica aquí. El problema radica en el segundo de los tres problemas mencionados anteriormente (escenario de concurrencia). En un escenario de concurrencia, confiamos en el estado del mensaje para el control de concurrencia, lo que permite que los mensajes duplicados del segundo mensaje se sigan consumiendo con retraso (reintento). Pero, ¿qué pasa si 1 mensaje no se puede consumir debido a motivos anormales (como reinicio de la máquina, excepción externa que provoca una falla en el consumo)? En otras palabras, el consumo retrasado en este momento en realidad ve el estado de cada consumo, y el consumo final se considerará como una falla de consumo y se entregará al tema de letra muerta (RocketMQ puede volver a consumir 16 veces de forma predeterminada).
¡Es correcto tener tales preocupaciones! Nuestra solución a este problema es que la tabla de mensajes insertada debe tener un tiempo máximo de vencimiento de consumo, como 10 minutos, lo que significa que si un mensaje se consume por más de 10 minutos, debe eliminarse de la tabla de mensajes (el programa necesita implementarlo usted mismo). Entonces, al final, el flujo de este mensaje será así:
67_2.png
En realidad, nuestra solución no tiene transacciones y solo requiere un medio de almacenamiento central, por lo que, naturalmente, Puede elegir un medio de almacenamiento más flexible, como Redis. Hay dos ventajas al usar Redis:
1.
2. El tiempo de espera mencionado anteriormente se puede implementar directamente utilizando el propio ttl de Redis.
Por supuesto, la confiabilidad y coherencia de los datos almacenados en Redis no son tan buenas como las de MySQL, y los usuarios deben elegir.
La implementación Java de la solución anterior de RocketMQ se ha incluido en Github como código abierto. Para obtener documentación de uso específica, consulte /jaskey/rocketmqdeduplistener.
El siguiente es solo un ejemplo del uso de Redis para eliminar la duplicación en un archivo Léame, para mostrar lo fácil que es agregar mensajes para eliminar la idempotencia si la herramienta se usa en los negocios:
La mayoría de los códigos anteriores son todos códigos necesarios para el RocketMQ original. La única modificación necesaria es crear un ejemplo de DedupConcurrentListener deduplicado que señale su lógica de consumo y la clave comercial duplicada (messageId de forma predeterminada).
Para más detalles, consulta las instrucciones en Github.
En la actualidad, la solución parece ser bastante completa. Se puede acceder y copiar rápidamente todos los mensajes y están completamente desacoplados de implementaciones comerciales específicas. Entonces, ¿es esta la manera perfecta de hacerlo todo?
Lamentablemente no. La razón es simple: debido a que el mensaje debe consumirse exitosamente al menos una vez, es posible que el mensaje falle durante el proceso de consumo y desencadene un reintento del mensaje. O utilice la secuencia anterior para procesar la tabla (MySQL)
4. Llame a otros servicios posteriores (RPC)
5. Actualizar el estado del pedido
6. transacción (MySQL)
Cuando el mensaje se consume en el paso 3, asumimos que la excepción de MySQL causó la falla y desencadenó el reintento del mensaje. Debido a que eliminaremos los registros en la tabla idempotente antes de volver a intentarlo, el mensaje volverá a ingresar el código de consumo al volver a intentarlo, por lo que los pasos 1 y 2 se realizarán nuevamente. Si el paso 2 en sí no es idempotente, entonces el consumo de mensajes comerciales aún no es completamente idempotente.
Entonces, dado que esto no puede lograr completamente la idempotencia del mensaje, ¿cuánto vale? ¡El valor será genial! Aunque esta no es una solución milagrosa para resolver la idempotencia de mensajes (de hecho, básicamente no existe una solución milagrosa en el campo de la ingeniería de software), se puede resolver mediante medios convenientes:
1. , etc. Los mensajes duplicados se retransmiten por diversos motivos.
2. Duplicación de mensajes a nivel empresarial causada por varios productores ascendentes.
3. El problema de la ventana de control del consumo concurrente de mensajes repetidos. Incluso si se repite, no puede ingresar a la lógica de consumo al mismo tiempo.
En otras palabras, el uso de este método puede garantizar que, en escenarios lógicos de consumo normales (sin excepciones, sin salidas de excepciones), se pueda resolver el trabajo idempotente de los mensajes, ya sea causado por la duplicación de negocios o las características de rocketmq. . repetir.
De hecho, esto ha resuelto el 99% del problema de duplicación de mensajes. Después de todo, definitivamente hay algunas escenas inusuales. Luego, si desea solucionar problemas idempotentes en escenarios anormales, puede hacer lo siguiente para reducir la tasa de problemas:
1. No utilizar mensajes de reversión. Si la falla del consumo de mensajes en sí se devuelve al mecanismo de rotación, entonces, naturalmente, el reintento del mensaje no tendrá efectos secundarios.
2. Los consumidores deben salir con dignidad. Esto es para evitar reintentos de mensajes causados por la salida del programa cuando se consume la mitad del mensaje.
3. Para algunas operaciones que no pueden ser idempotentes, al menos detenga el consumo y llame a la policía. Por ejemplo, en la operación de bloqueo de inventario, si el proceso comercial unificado bloquea con éxito el inventario una vez, el bloqueo de inventario se activará nuevamente. Si no se puede realizar el procesamiento idempotente, al menos el consumo de mensajes desencadenará excepciones (como el consumo anormal causado por conflictos de clave primaria).
4. Bajo la premisa #3, monitorear el consumo de mensajes. Cuando descubra que los reintentos de mensajes fallan continuamente, realice el trabajo de reversión n.° 1 manualmente para que el siguiente reintento de consumo tenga éxito.