Red de conocimiento informático - Aprendizaje de programación - mecanismo de compensación del consumidor de Kafka

mecanismo de compensación del consumidor de Kafka

Los mensajes de Kafka se almacenan en particiones uno por uno. Si hay 10 mensajes en la partición, el desplazamiento es 0-9.

El consumidor consume 5 mensajes y el desplazamiento es 5, apuntando a Para el siguiente. registro que se consumirá, el consumidor

debe informar sus propios datos de compensación a Kafka. Debido a que el consumidor puede consumir múltiples particiones, la granularidad del desplazamiento

es Particionado. El consumidor puede consumir varias particiones, por lo que la granularidad del desplazamiento

es la partición, y el consumidor debe enviar la información de desplazamiento para cada partición que se le asignó por separado.

Desde la perspectiva del usuario, el envío de reemplazo se divide en envío automático y envío manual; desde la perspectiva del consumidor, el envío de reemplazo

se divide en envío sincrónico y envío asincrónico.

Hay un tema llamado "__consumer_offsets" dentro de Kafka. El envío de compensación es para enviar un mensaje a este tema.

El formato del mensaje tiene la forma de valor clave. El valor clave consta de groupId y tema. El cuerpo del mensaje es el valor de desplazamiento, así como datos definidos por el usuario y marca de tiempo. . También hay dos formatos especiales. También hay dos formatos especiales, uno se usa para guardar información del grupo de consumidores para registrar el grupo y el otro se usa para eliminar el desplazamiento de vencimiento del grupo y eliminar mensajes del grupo.

El tema __consumer_offsets se crea cuando se inicia el primer consumidor en el clúster kafka, con 50

particiones y 3 réplicas por defecto.

Cuando el método de envío es el envío automático, incluso si el desplazamiento del consumidor actual ya no se actualiza, Kafka seguirá enviando automáticamente

mensajes de desplazamiento periódicos a __consumer_offsets, por lo que usted debe periódicamente Eliminar mensaje sobre tema desplazamiento.

Si hay dos mensajes A y B para la misma clave, y A se envía antes que B, el mensaje se eliminará.

Si hay dos mensajes para la misma clave y A se envía antes que B, entonces A es el mensaje caducado.

Compacto es un poco como mark-organize de jvm gc. Elimina los mensajes caducados y organiza los mensajes restantes juntos.

Kafka proporciona un hilo de fondo especial que patrulla regularmente. El tema a comprimir es. comprobado para ver si hay datos eliminables elegibles

Este hilo se llama Log Cleaner y se usa cuando encontramos demasiados temas desplazados en el registro. Cuando encontramos demasiados temas de desplazamiento, podemos

verificar si el hilo está colgado.

enable.auto.commit tiene el valor predeterminado verdadero,

auto.commit. intervalo.ms tiene un valor predeterminado de 5 segundos, lo que significa que Kafka enviará automáticamente información de desplazamiento cada 5 segundos.

El envío automático tiene el problema de consumir mensajes repetidamente porque se envía a intervalos. Si se produce un reequilibrio durante el intervalo,

después del reequilibrio, todos los usuarios deben comenzar a consumir desde el último desplazamiento

Luego, entre la última confirmación automática y el reequilibrio, el desplazamiento de datos consumido durante el intervalo se reducirá al mismo nivel que la última confirmación automática.

El desplazamiento de datos consumido durante este período no se ha confirmado, por lo que se consumirá repetidamente Incluso si aumentamos la frecuencia de envío reduciendo el valor de auto.commit.interval.ms,

Simplemente reduce la ventana de tiempo para el doble consumo, así que veamos si podemos evitar el doble consumo enviándolo manualmente.

commitSync() es una API sincrónica para consumidores. La ventaja del envío manual es que podemos controlar el tiempo

y la frecuencia del envío, pero como es una API sincrónica, es así. voluntad El bloqueo no finalizará hasta que el agente devuelva el resultado. Si el bloqueo no se debe a limitaciones de recursos, el sistema naturalmente no quiere hacer esto.

Si no se debe a limitaciones de recursos, el sistema

naturalmente no quiere que esto suceda.

commitAsync() es una API asincrónica para consumidores. commitAsync() no bloquea y por lo tanto no afecta los

tps del consumidor, pero el problema es que el consumidor no puede volver a intentarlo ya que se trata de una confirmación asincrónica cuando la confirmación falla debido a un bloqueo de recursos de la red o del sistema

, el consumidor volverá a intentarlo. Cuando vuelve a intentarlo, es posible que el consumidor haya consumido muchos mensajes

y los haya confirmado, por lo que la compensación confirmada por el reintento no es el último valor y no tiene sentido. Podemos combinar

confirmaciones asincrónicas con confirmaciones sincrónicas y usar confirmaciones sincrónicas para evitar problemas de red o problemas de gc del lado del agente

. Usamos la confirmación sincrónica para evitar problemas de red o problemas de gc en el lado del agente

, porque los problemas de red o gc pueden causar una falla de confirmación instantánea, y luego confirmamos el desplazamiento a través del mecanismo de reintento y usamos la confirmación asincrónica para evitarlo; commit El problema de bloqueo

Los commitsSync() y commitAsync() anteriores son todos consumidores que sondean mensajes para consumirlos.

Si el mensaje sondeado no es el último valor, el último valor será ser presentado compensado. Compensación, ¿qué pasa si hay muchos mensajes de encuesta? Lleva mucho tiempo si el sistema intermedio falla, ¿no sería necesario comenzar de nuevo desde cero, por lo que Kafka proporciona una API de envío segmentada

commitSync(Maplt; TopicPartition, OffsetAndMetadatagt;)?

?commitAsync (Maplt; TopicPartition, OffsetAndMetadatagt;)

Suponiendo que sondeamos un segundo de datos y hay 5000 entradas, podemos usar un contador para acumular hasta 100 entradas,

Luego envíe un desplazamiento a Kafka a través de la API de confirmación segmentada.