Red de conocimiento informático - Problemas con los teléfonos móviles - Solución Rocket MQ-zero loss para mensajes de consumidores

Solución Rocket MQ-zero loss para mensajes de consumidores

Mediante el esquema anterior, podemos asegurarnos de que el mensaje llegue a MQ y que el mensaje en MQ no se pierda. Mientras se haga esto, se puede garantizar que el sistema consumidor intermedio pueda recibir el mensaje, pero incluso si el consumidor intermedio recibe el mensaje, ¿no se perderán los datos del mensaje?

La respuesta es no necesariamente. Supongamos que el sistema consumidor descendente ha obtenido el mensaje, pero el mensaje todavía está en su memoria y la lógica empresarial aún no se ha ejecutado. En ese momento, envió directamente el desplazamiento del mensaje al corredor y dijo que había sido procesado. Luego, el sistema de consumo posterior colapsó repentinamente, los mensajes en la memoria desaparecieron y la lógica empresarial no se ejecutó. Como resultado, el corredor recibe el offset del mensaje que envió y considera que ha terminado de procesar el mensaje. Después de reiniciar el sistema consumidor, el mensaje no se consumirá nuevamente y aún habrá pérdida de datos.

La clase MessageListenerConcurrently en el código anterior se utiliza para registrar un oyente. Cuando reciba un lote de mensajes, se le devolverá la llamada para obtener esta función de escucha, lo que le permitirá procesar el lote de mensajes.

Luego, cuando haya terminado de procesar, regrese al estado de ConsumeCumurent. Consume_success sirve como señal de consumo exitoso y le dice a RocketMQ que he terminado de procesar este lote de mensajes. RocketMQ enviará el desplazamiento de este lote de mensajes al corredor.

Por lo tanto, si se procesa un lote de mensajes y la compensación del mensaje se envía al corredor, incluso si el sistema del consumidor no funciona, el mensaje no se perderá en este momento.

Si un lote de mensajes no se ha procesado y no ha regresado al estado Consumeconcurrentstatus. Consume_success, en este momento el consumidor está inactivo, RocketMQ realmente sentirá que el consumidor ha colgado y entregará los mensajes no procesados ​​a otras máquinas para su procesamiento, por lo que en este caso los mensajes nunca se perderán.

Nota: No puedes utilizar mensajes de forma asincrónica.

No podemos procesar mensajes de forma asincrónica en el código porque una vez que se activa el procesamiento asincrónico, el oyente del consumidor volverá al estado ConsumeCumurentstatus. Consumir_éxito. Es posible que la lógica de procesamiento asincrónico haya enviado las compensaciones de este lote de mensajes al intermediario antes de procesar los mensajes, pensando que el procesamiento ha finalizado. En este punto, si la lógica de procesamiento asincrónico informa un error, el mensaje también se perderá.