mecanismo de compensación del consumidor de Kafka
El consumidor consume 5 mensajes y el desplazamiento es 5, apuntando a Para el siguiente. registro que se consumirá, el consumidor
debe informar sus propios datos de compensación a Kafka. Debido a que el consumidor puede consumir múltiples particiones, la granularidad del desplazamiento
es Particionado. El consumidor puede consumir varias particiones, por lo que la granularidad del desplazamiento
es la partición, y el consumidor debe enviar la información de desplazamiento para cada partición que se le asignó por separado.
Desde la perspectiva del usuario, el envío de reemplazo se divide en envío automático y envío manual; desde la perspectiva del consumidor, el envío de reemplazo
se divide en envío sincrónico y envío asincrónico.
Hay un tema llamado "__consumer_offsets" dentro de Kafka. El envío de compensación es para enviar un mensaje a este tema.
El formato del mensaje tiene la forma de valor clave. El valor clave consta de groupId y tema. El cuerpo del mensaje es el valor de desplazamiento, así como datos definidos por el usuario y marca de tiempo. . También hay dos formatos especiales. También hay dos formatos especiales, uno se usa para guardar información del grupo de consumidores para registrar el grupo y el otro se usa para eliminar el desplazamiento de vencimiento del grupo y eliminar mensajes del grupo.
El tema __consumer_offsets se crea cuando se inicia el primer consumidor en el clúster kafka, con 50
particiones y 3 réplicas por defecto.
Cuando el método de envío es el envío automático, incluso si el desplazamiento del consumidor actual ya no se actualiza, Kafka seguirá enviando automáticamente
mensajes de desplazamiento periódicos a __consumer_offsets, por lo que usted debe periódicamente Eliminar mensaje sobre tema desplazamiento.
Si hay dos mensajes A y B para la misma clave, y A se envía antes que B, el mensaje se eliminará.
Si hay dos mensajes para la misma clave y A se envía antes que B, entonces A es el mensaje caducado.
Compacto es un poco como mark-organize de jvm gc. Elimina los mensajes caducados y organiza los mensajes restantes juntos.
Kafka proporciona un hilo de fondo especial que patrulla regularmente. El tema a comprimir es. comprobado para ver si hay datos eliminables elegibles
Este hilo se llama Log Cleaner y se usa cuando encontramos demasiados temas desplazados en el registro. Cuando encontramos demasiados temas de desplazamiento, podemos
verificar si el hilo está colgado.
enable.auto.commit tiene el valor predeterminado verdadero,
auto.commit. intervalo.ms tiene un valor predeterminado de 5 segundos, lo que significa que Kafka enviará automáticamente información de desplazamiento cada 5 segundos.
El envío automático tiene el problema de consumir mensajes repetidamente porque se envía a intervalos. Si se produce un reequilibrio durante el intervalo,
después del reequilibrio, todos los usuarios deben comenzar a consumir desde el último desplazamiento
Luego, entre la última confirmación automática y el reequilibrio, el desplazamiento de datos consumido durante el intervalo se reducirá al mismo nivel que la última confirmación automática.
El desplazamiento de datos consumido durante este período no se ha confirmado, por lo que se consumirá repetidamente Incluso si aumentamos la frecuencia de envío reduciendo el valor de auto.commit.interval.ms,
Simplemente reduce la ventana de tiempo para el doble consumo, así que veamos si podemos evitar el doble consumo enviándolo manualmente.
commitSync() es una API sincrónica para consumidores. La ventaja del envío manual es que podemos controlar el tiempo
y la frecuencia del envío, pero como es una API sincrónica, es así. voluntad El bloqueo no finalizará hasta que el agente devuelva el resultado. Si el bloqueo no se debe a limitaciones de recursos, el sistema naturalmente no quiere hacer esto.
Si no se debe a limitaciones de recursos, el sistema
naturalmente no quiere que esto suceda.
commitAsync() es una API asincrónica para consumidores. commitAsync() no bloquea y por lo tanto no afecta los
tps del consumidor, pero el problema es que el consumidor no puede volver a intentarlo ya que se trata de una confirmación asincrónica cuando la confirmación falla debido a un bloqueo de recursos de la red o del sistema p>
, el consumidor volverá a intentarlo. Cuando vuelve a intentarlo, es posible que el consumidor haya consumido muchos mensajes
y los haya confirmado, por lo que la compensación confirmada por el reintento no es el último valor y no tiene sentido. Podemos combinar
confirmaciones asincrónicas con confirmaciones sincrónicas y usar confirmaciones sincrónicas para evitar problemas de red o problemas de gc del lado del agente
. Usamos la confirmación sincrónica para evitar problemas de red o problemas de gc en el lado del agente
, porque los problemas de red o gc pueden causar una falla de confirmación instantánea, y luego confirmamos el desplazamiento a través del mecanismo de reintento y usamos la confirmación asincrónica para evitarlo; commit El problema de bloqueo
Los commitsSync() y commitAsync() anteriores son todos consumidores que sondean mensajes para consumirlos.
Si el mensaje sondeado no es el último valor, el último valor será ser presentado compensado. Compensación, ¿qué pasa si hay muchos mensajes de encuesta? Lleva mucho tiempo si el sistema intermedio falla, ¿no sería necesario comenzar de nuevo desde cero, por lo que Kafka proporciona una API de envío segmentada
commitSync(Maplt; TopicPartition, OffsetAndMetadatagt;)?
?commitAsync (Maplt; TopicPartition, OffsetAndMetadatagt;)
Suponiendo que sondeamos un segundo de datos y hay 5000 entradas, podemos usar un contador para acumular hasta 100 entradas,
Luego envíe un desplazamiento a Kafka a través de la API de confirmación segmentada.