Red de conocimiento informático - Problemas con los teléfonos móviles - Consumo continuo de RocketMQ

Consumo continuo de RocketMQ

Una de las preguntas imprescindibles en cualquier entrevista de MQ es: ¿Cómo funciona la mensajería secuencial de RocketMQ? ¿Cuál es el principio?

En primer lugar, aclaremos qué es el consumo secuencial y cuál es la definición de consumo secuencial. Lo que quiero decir con consumo secuencial es que para un tipo específico de mensaje (por ejemplo, pedir un mensaje) se consumirá secuencialmente, similar al primero en entrar, primero en salir. Supongamos que el pedido A tiene varios tipos de mensajes, como creación, pago y finalización, luego debemos consumir primero el mensaje de creación, luego el pago y finalmente el mensaje de finalización del pedido A.

Así que, para toda la cadena, no sólo debemos organizarnos a la hora de rellenar, sino también a la hora de consumir. Incluso si se completa en orden FIFO, si usamos varios subprocesos para consumir el mismo ConsumerQueue y podemos consumir varios mensajes al mismo tiempo, entonces el consumo no será ordenado. La siguiente sección explica cómo hacer esto desde la perspectiva del proveedor y del consumidor.

En primer lugar, para mensajes secuenciales, el productor puede ser multiproceso, por lo que siempre que se asegure de que cada hilo envíe un tipo de mensaje diferente (por ejemplo, una secuencia diferente de mensajes), entonces el productor Puede enviar varios mensajes. Para los proveedores, RocketMQ proporciona una forma de enviar mensajes secuenciales, llamada MessageQueueSelector:

Cuando el proveedor envía un mensaje, solo necesita seleccionar el ConsumerQueue al que se envía el mensaje. Por ejemplo, para los pedidos, la identificación del pedido se utilizará como clave para seleccionar la cola, por lo que los mensajes para el mismo pedido definitivamente se enviarán a la misma cola.

Por tanto, el envío del pedido por parte del proveedor es muy sencillo.

Para los consumidores, es necesario utilizar MessageListenerOrderly para consumir mensajes:

El principio de pedido de los consumidores también es muy simple. Cuando un consumidor consume un mensaje, el hilo de extracción (subproceso único) de PullMessageService extraerá el mensaje y lo colocará en la cola de proceso (cada cola de consumidor tiene una cola de proceso, ya que es una extracción de un solo subproceso, el consumidor no necesita hacerlo). Los mensajes establecen una única cola (aunque los consumidores pueden suscribirse a varias colas, el mismo mensaje será ordenado por la misma cola). Después de ser colocado en ProcessQueue, se llamará a ConsumeMessageConcurrentlyService o ConsumeMessageOrderlyService para consumir el mensaje; en este caso, se llamará a ConsumeMessageOrderlyService para consumir el mensaje. Cuando ConsumeMessageOrderlyService consume, primero adquirirá el bloqueo de cada ConsumerQueue y luego obtendrá el mensaje de ProcessQueue para su consumo, lo que significa que la lógica de consumo de cada mensaje de ConsumerQueue también está en orden.

Si los reintentos de MessageQueue no se pueden reemplazar, MessageQueue debe tener su propia copia y garantizar la disponibilidad de la copia a través de algoritmos como Raft, Paxos u otro almacenamiento de alta disponibilidad de MessageQueue. Podemos intentar equilibrar la distribución de mensajes en diferentes MessageQueue dividiendo MessageQueue y optimizando el método de enrutamiento.

El paralelismo de consumo no es un gran problema en teoría porque el número de MessageQueue se puede ajustar.

La imposibilidad de omitir fallas de consumo es inevitable porque omitir puede causar que todo el procesamiento de datos posterior sea incorrecto. Sin embargo, podemos proporcionar políticas que permitan a los usuarios decidir si omitir mensajes según el tipo de error y proporcionar una cola de reintento u otra funcionalidad que permita a los usuarios volver a consumir mensajes "en otro lugar" después de omitirlos.

De hecho, para el llamado consumo secuencial, que es esencialmente similar al comportamiento de una máquina de estados, por ejemplo, primero se crea una orden, luego se realiza el pago y finalmente finaliza el comportamiento. Se puede definir un estado y el orden de aparición es secuencial. Por lo tanto, no hay absolutamente ninguna necesidad de utilizar ningún consumo secuencial, puede crear primero, enviar el mensaje de creación a mq, obtener el mensaje de creación de mq y consumirlo, luego crear el mensaje de pago y enviarlo a mq, consumir el mensaje de pago de mq y luego marcar el finalizar el pedido. Esto es completamente posible usando la máquina de estado + mq + base de datos, y será más estable y versátil.