Equilibrio de carga de la cola de mensajes del consumidor de RocketMQ
Primero echemos un breve vistazo a todo el proceso de carga de la cola de mensajes.
La carga de la cola de mensajes la ejecuta el subproceso Rebalance cada 20 segundos de forma predeterminada. Se obtienen la información de la cola de temas mqSet y el grupo de consumidores cidAll de todos los consumidores actuales, y luego la cola se asigna de acuerdo con la carga. algoritmo El principio de asignación es el mismo consumidor. El consumidor se puede asignar a varias colas de mensajes, y la misma cola de consumo de mensajes solo se puede asignar a un consumidor al mismo tiempo. En este punto, puede calcular los consumidores actualmente asignados a la colección de colas de mensajes y comparar la cola de carga original y la cola de asignación actual.
El equilibrio de carga se inicia en el subproceso RebalanceService, donde MQClientInstance contiene la implementación de RebalanceService, y se inicia cuando se inicia MQClientInstance.
Como se muestra arriba, MQClientinstance atravesará los consumidores registrados y ejecutará el método doRebalance en el consumidor.
Lo anterior es la cola que atraviesa la información de suscripción para recargar cada tema. A continuación, se ejecutará el método rebalanceByTopic, que lo manejará de manera diferente según el modo de transmisión o el modo de clúster. Aquí solo se explica el método en modo de agrupación.
Obtiene la información de la cola del tema y los ID de todos los consumidores actuales en el grupo de consumidores. Cada DefaultMQPushConsumerImpl contiene un objeto RebalanceImpl separado.
Ordenar la información de la cola bajo este tema y todos los ID de consumidores actuales en este grupo de consumidores garantiza que los miembros del grupo de consumidores vean el mismo orden y evita que la misma cola de consumidores se asigne a varios consumidores.
allocateResult registra la cola de mensajes asignada al consumidor actual
Llame a updateProcessQueueTableInRebalance para comparar si la cola de mensajes ha cambiado
Como se puede ver en lo anterior, ProcessQueueTable registros Se registra la tabla de caché de la cola de mensajes de la carga del consumidor actual, y el mqSet dentro de este método registra el conjunto de cola de mensajes del consumidor actual después de la distribución de la carga. Si la cola de mensajes en ProcessQueueTable no existe en mqSet, significa que la cola de mensajes se ha asignado a otro consumidor, por lo que debe pausar el consumo de mensajes en la cola mediante ** pq.setDropped(true); Esta declaración servirá.
Luego determine si el mq se ha eliminado del caché mediante el método removeUnnecessaryMessageQueue**.
Después de eso, comience a recorrer la cola de mensajes asociada con el mqSet asignado a este consumidor. Si ProcessQueueTable no contiene esta cola de mensajes, significa que es una nueva cola de mensajes agregada para esta carga.
Primero elimine el progreso del mensaje de la cola de mensajes de la memoria y luego llame a computePullFromWhere para leer el progreso del consumo de la cola de mensajes del disco para crear un objeto PullRequest.
Como se puede ver en lo anterior, existen tres métodos principales para calcular el progreso del mensaje, y algunos de ellos son básicamente los mismos.
Primero obtenga el progreso del consumo de la cola de mensajes del disco. Si es mayor que 0, significa que la cola de mensajes se ha consumido y el próximo consumo comenzará desde esta posición.
Si es igual a -1, intente operar con la marca de tiempo de almacenamiento del mensaje como la marca de tiempo cuando comenzó el consumidor, si se puede encontrar, devuelva el desplazamiento encontrado, si no se encuentra, devuelva 0 si es menor que -1, significa Este mensaje; tiene el desplazamiento incorrecto almacenado en el archivo de progreso, por lo que se devuelve -1.
Al final de este método, llamará al método DispatchPullRequest y agregará PullRequest a PullMessageService para activar el hilo PullMessageService para extraer mensajes.
En este punto, finaliza la parte de equilibrio de carga del consumidor.