Mecanismo de verificación de compensación de RocketMQ
El desplazamiento que se analizará en este artículo se refiere al desplazamiento de la cola en la figura anterior.
Para guardar el progreso del consumo y evitar el consumo repetido, necesitamos guardar la compensación.
Para el consumo del clúster, el desplazamiento se guarda en el broker, utilizando el RemoteBrokerOffsetStore del cliente.
Para el consumo de transmisión, el desplazamiento se guarda localmente, utilizando el LocalFileOffsetStore del cliente.
Por último, y lo que es más importante, el desplazamiento guardado se refiere al desplazamiento de la cola en la imagen de arriba.
Por ejemplo, si consume un mensaje con un desplazamiento de 0 en la primera cola en la imagen de arriba, en realidad guarda un desplazamiento de 1, lo que significa que la próxima vez que este mensaje se consumirá desde compensación = 1.
Del lado del agente, el progreso del consumo de cada ConsumerGroup bajo el tema se guarda a través de offsetTable en ConsumerOffsetManager.
En la estructura del mapa de dos capas de offsetTable, también puede ver que el progreso del consumo que mencioné anteriormente se refiere al progreso del consumo de cada cola en ConsumerGroup en el tema.
Después de todo, offsetTable es solo una estructura de memoria, por lo que ConsumerOffsetManager hereda ConfigManager para su persistencia.
Implementamos tres métodos de plantilla: codificación, decodificación y configFilePath. Se utiliza para especificar la serialización, la lógica de deserialización y la ubicación de almacenamiento.
La lógica de serialización y deserialización es muy simple, solo use nuestro FastJson.
Cargar el inicio del Broker desde un archivo local
org.BrokerController#initialize<
BrokerController#shutdown
Los consumidores lo utilizan para la sincronización programada.
El desplazamiento se confirma al extraer el mensaje
Activado por el seguimiento de la transacción.
En este artículo, solo analizamos el consumo del clúster en modo PUSH, en este En el modo, el desplazamiento local se almacena en caché en la tabla de desplazamiento de RemoteBrokerOffsetStore y se sincroniza con el intermediario periódicamente.
Dado que la compensación se recuperará cada vez que el consumidor reinicie y es solo un almacén temporal, el diseño de la tabla de compensación de RemoteBrokerOffsetStore no es tan complejo como el de ConsumerOffsetManager.
El primer reequilibrio se produce después de que se inicia el consumo y periódicamente a partir de entonces.
Después de que el reequilibrio asigna una cola de mensajes, genera una cola de proceso basada en la cola de mensajes para extraer mensajes de la cola de proceso.
Antes de extraer el mensaje, hay una operación clave, que consiste en extraer el desplazamiento de la cola de mensajes correspondiente.
RebalanceImpl#updateProcessQueueTableInRebalance
Hay tres estrategias para obtener la posición inicial de extracción del mensaje
CONSUME_FROM_LAST_OFFSET último desplazamiento
CONSUME_FROM_FIRST_OFFSET El primer desplazamiento
CONSUME_FROM_TIMESTAMP Obtiene el desplazamiento según la marca de tiempo
Sin embargo, como se puede ver en el código fuente, la lógica real es algo diferente de lo que imaginamos. Los tres anteriores La premisa de activación. La lógica es extraer la compensación del agente, pero no el progreso de la compensación.
Esto debería ser para evitar el consumo repetido y el consumo insuficiente; después de todo, rocketmq es un mq relacionado con los negocios.
En el lado del consumidor, las actualizaciones de offsetTable, por supuesto, se activan mediante mensajes del consumidor.
ConsumeMessageConcurrentlyService#processConsumeResult
Para compensaciones consumidas simultáneamente, el valor actualizado proviene del método ProcessQueue#removeMessage
La lógica de removeMessage utiliza el algoritmo de ventana deslizante.
Supongamos que hay 10 mensajes con compensaciones del 0 al 9.
En un escenario de consumo concurrente de subprocesos múltiples
Supongamos que mi primer hilo consume un mensaje con un desplazamiento de 0, luego el desplazamiento en la tabla de desplazamiento se actualizará a 1
Entonces mi segundo hilo consume un mensaje en el desplazamiento 5. El desplazamiento devuelto por removeMessage sigue siendo 1
La ventana se deslizará solo cuando se hayan consumido todos los mensajes anteriores
ConsumeMessageOrderlyService#processConsumeResult
Presione Los mensajes se consumen secuencialmente y Esta cuestión no se considera por el momento.
Las compensaciones finales se basan en proxy, por lo que las compensaciones locales persisten en las compensaciones periódicamente.
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
persistenceAll y la lógica de persistencia son aproximadamente las mismas, y la lógica central se pasa persistentemente al corredor a través de updateConsumeOffsetToBroker .
Hay cuatro momentos para activar la lógica de persistencia
MQClientInstance#startScheduledTask
DefaultMQPullConsumerImpl#shutdown
DefaultMQPushConsumerImpl#shutdown
Cuando la cola ya no pertenece al consumidor actual, es necesario sincronizarla y enviarla al intermediario para que los nuevos consumidores que obtienen la cola comiencen a extraer los últimos mensajes no consumidos
RebalancePullImpl#removeUnnecessaryMessageQueue p> p>
Envíe el desplazamiento al extraer el mensaje
DefaultMQPushConsumerImpl#pullMessage
PullMessageProcessor#processRequest
Normalmente, la falla en el consumo del mensaje no afectará el La ventana se desliza porque el cliente enviará mensajes que no se consumen.
Después de devolverlo, el mensaje se retrasará y se enviará al reintento en Topic=%RETRY%{CONSUMERGROUP}
ConsumeMessageConcurrentlyService#processConsumeResult
ConsumeMessageConcurrentlyService#processConsumeResult en cola.
ConsumeMessageProcessor#procesoResultado