Red de conocimiento informático - Problemas con los teléfonos móviles - Características clave de RocketMQ

Características clave de RocketMQ

Como componente clave para lograr la escalabilidad y extensibilidad de los sistemas distribuidos, los sistemas de mensajería distribuida deben tener alto rendimiento, alta disponibilidad y otras características. Las características que un sistema de mensajería distribuida debe implementar incluyen:

I. Ordenamiento de mensajes

Ordenamiento de mensajes significa que un tipo de mensaje se puede consumir en el orden en que se envía. Por ejemplo: un pedido generará 3 mensajes, a saber, creación del pedido, pago del pedido y finalización del pedido. Tiene sentido consumirlos en este orden. Pero al mismo tiempo, los pedidos también se pueden consumir en paralelo.

RocketMQ implementa la entrega secuencial de mensajes enviando "mensajes con el mismo ID a la misma cola, y los mensajes en una cola son procesados ​​por un solo consumidor".

De esta forma, para un mismo mensaje de creación, pago y finalización de pedido, su orden de envío y consumo es el mismo.

Lado del productor

Para garantizar el orden de los mensajes, lo único que el lado del productor tiene que hacer es enrutar los mensajes a particiones específicas, lo cual se hace en RocketMQ a través de MessageQueueSelector.

La siguiente implementación garantiza que los mensajes para el mismo pedido se enruten a la misma partición:

long orderId = ((Order) object).getOrderId return mqs.get(orderId mqs; .size ());

Lado del consumidor

Hay dos tipos de consumidores de RocketMQ:

MQPullConsumer es un hilo controlado por el usuario, que obtiene datos activamente del servidor, uno a la vez, de MessageQueue.PullResult List?

Para PushConsumer, los mensajes se almacenan en la cola de mensajes en un solo hilo y, por lo tanto, están en orden. Cada vez que hay un mensaje en espera de ser consumido, ConsumeMessageOrderlyService se asegurará de que cada subproceso necesite obtener un bloqueo en la cola de mensajes cuando consuma el mensaje, y solo el subproceso que obtiene el bloqueo puede consumir el mensaje. Dado que los mensajes en una cola de mensajes están en orden, también lo estarán cada vez que se consumen.

En segundo lugar, la duplicación de mensajes

Si un consumidor recibe dos mensajes idénticos, ¿cómo debe manejarse?

El método de procesamiento de RocketMQ es el siguiente:

1. La lógica empresarial de procesar mensajes en el lado del consumidor sigue siendo vaga.

2. Asegúrese de que cada mensaje tenga un número único y asegúrese de que el registro de éxito del mensaje y la tabla de deduplicación aparezcan al mismo tiempo

El primer caso es fácil de entender. El primer principio es muy fácil de entender. Mientras se mantenga la idempotencia, no importa cuántos mensajes duplicados haya, el resultado final del procesamiento será el mismo. El segundo principio es utilizar una tabla de registro para registrar los ID de los paquetes procesados ​​con éxito. Si el ID de un paquete recién llegado ya está en la tabla de registro, el paquete ya no se procesará.

Podemos ver que la solución en el punto 1 obviamente debe implementarse en el lado del consumidor y no es una función que deba implementar el sistema de mensajería. El punto 2 se puede implementar mediante el sistema de mensajería o desde el lado comercial. En circunstancias normales, la probabilidad de duplicación de mensajes no es necesariamente muy alta. Si se implementa mediante un sistema de mensajes, inevitablemente afectará el rendimiento y la alta disponibilidad del sistema de mensajes. Por lo tanto, es mejor que el lado empresarial maneje el problema. duplicación de mensajes. Esto también es lo que RocketMQ no puede resolver el problema de la duplicación de mensajes.

En tercer lugar, mensajes de transacciones

La clave para la implementación de mensajes de transacciones por parte de RocketMQ radica en la mirada retrospectiva de las transacciones.

Tomemos la transmisión como ejemplo para comprender los mensajes de transacciones de RocketMQ.

Cuando RocketMQ envía un mensaje preparado en la primera etapa, obtiene la dirección del mensaje, la segunda etapa realiza transacciones locales y la tercera etapa accede al mensaje a través de la dirección obtenida en la primera etapa y lo modifica. el estado. Si tiene cuidado, puede volver a encontrar problemas. ¿Qué pasa si no se envía el mensaje de confirmación? RocketMQ escaneará periódicamente los mensajes de eventos en el grupo de mensajes. Cuando encuentre el mensaje preparado, confirmará al remitente del mensaje si el dinero se ha reducido o no. Si es así, ¿revertir o continuar enviando mensajes de confirmación? RocketMQ decidirá si revertir o continuar enviando mensajes de confirmación según la política establecida por el remitente. Esto garantiza que los mensajes se envíen y que las transacciones locales tengan éxito o fallen al mismo tiempo.

Si se envía un mensaje de transacción a MQ en el estado de preparación, el consumidor aún no ha visto el mensaje y requiere que el ejecutor de la transacción local devuelva un acuse de recibo a MQ. Que los consumidores puedan ver los mensajes de transacción depende completamente del mensaje de confirmación devuelto por el ejecutor de transacciones local. Si hay un retraso en la recepción de acuses de recibo, MQ utiliza un mecanismo de retrospectiva de transacciones. El principio de implementación es que el mensaje de transacción comienza desde el estado de preparación y RocketMQ lo conserva en el Mysql local. Si se recibe un mensaje de confirmación, el mensaje de preparación se elimina; si se recibe un mensaje de confirmación retrasado, RMQ escaneará el mensaje de preparación; periódicamente y guárdelo y envíelo al equipo de producción para su revisión y confirmación.

Cuarto, equilibrio de carga

El productor sondeará todas las colas bajo el tema para lograr el equilibrio de carga en el lado del remitente. La cola de sondeo aleatorio (Roundbin) mantiene una ID que se incrementa automáticamente a nivel global, se incrementa una vez que se envía un mensaje y toma la cola de envío en modo queueSize.

El hilo RebalanceService en el lado del consumidor se cargará una vez en 10 segundos de acuerdo con todas las colas bajo el tema: recorra todos los temas bajo el Consumidor y luego suscríbase a todos los mensajes según el tema para obtener el mismo tema. y la información en Consumidores Todos los grupos de consumidores, y luego Asigne colas de consumidores de acuerdo con estrategias de asignación específicas, que incluyen: asignación promedio, configuración del consumidor, etc.

En rocketmq, los consumidores se dividen en dos categorías: MQPullConsumer y MQPushConsumer son esencialmente modelos de extracción, y los consumidores extraen mensajes del corredor a través de encuestas. La diferencia entre los dos es que en el modo push, el consumidor encapsula el proceso de sondeo y registra el oyente MessageListener. Después de obtener el mensaje, activa el consumoMessage () del messageListener para consumir el mensaje, lo que hace que el usuario sienta que el mensaje es. empujado. En el modo pull, el proceso de recepción de mensajes debe ser escrito por el propio usuario. Primero, el usuario obtiene la colección de MessageQueue a través del tema que desea consumir, recorre la colección de MessageQueue y luego obtiene los mensajes de cada MessageQueue en lotes. Después de obtener el mensaje una vez, obtiene el desplazamiento inicial del siguiente mensaje. la cola se registra hasta que finaliza la recuperación y luego se reemplaza con otra MessageQueue. /p>