Componentes principales de Kafka: controladores y coordinadores
[TOC]
Ya sabemos que un clúster de Kafka consta de n intermediarios, cada uno de los cuales es una instancia de Kafka o un servicio de Kafka.
De hecho, el controlador también es un agente, y el controlador también se llama agente líder.
Además de las funciones generales del agente, también se encarga de seleccionar el líder de la partición, es decir, se encarga de elegir la réplica líder de la partición.
Cuando se inicia Kafka, creará una instancia de Kafka del servicio Kafka, que servirá como réplica líder de la partición.
Incluyendo el inicio del clúster, hay tres situaciones que activarán la elección del controlador:
1. Inicio del clúster
2. El agente donde se encuentra el controlador falla
3. El latido de Zookeeper detecta que el controlador expira junto con su propia sesión
Como de costumbre, veamos primero el diagrama. Expliquemos el proceso de elección del controlador al inicio del clúster según el siguiente diagrama.
Supongamos que el clúster tiene tres agentes iniciados simultáneamente.
(a) Los tres agentes obtienen la información del nodo temporal /controlador del cuidador del zoológico. /controller almacena información del líder elegido.
(ii) Si aún no se ha elegido un líder, el nodo no existe y se devuelve -1. Si no es -1 sino los datos json del líder, significa que el líder ya existe y la elección terminó.
(c) Tres agentes descubren -1 y se dan cuenta de que no hay ningún líder, por lo que todos activan escrituras en el nodo/controlador efímero.
(d) Suponiendo que el intermediario 0 es el más rápido, primero escribe en el nodo /controller y luego se convierte en el líder.
Después de estos cuatro pasos, el intermediario 0 escribió con éxito en el nodo /controller, pero otros intermediarios no pudieron escribir correctamente, por lo que el intermediario 0 fue elegido exitosamente como líder.
Además, hay un nodo controlador_epoch en zk, que se utiliza para almacenar el número de cambios de líder. Todas las solicitudes al controlador vienen con este valor. Si el controlador compara con su propia memoria y el valor de la solicitud es menor, significa que se ha producido una nueva elección en el clúster de Kafka, esta solicitud ha caducado y esta solicitud no es válida. Si el valor de la solicitud es mayor que el valor en la memoria del controlador, significa que se ha seleccionado un nuevo controlador, ha abdicado y la solicitud no es válida. Kafka garantiza la unicidad del controlador del clúster y la coherencia de las operaciones a través de controller_epoch.
Como puede ver, la clave para la elección del controlador Kafka es quién ingresa primero al nodo /controller para escribir su propia información.
La inicialización del controlador en realidad inicializa los componentes y oyentes utilizados por el controlador y prepara los metadatos.
Como se mencionó anteriormente, cada agente crea una instancia e inicia un KafkaController. La relación entre KafkaController y sus componentes y la introducción de cada componente se muestran en la siguiente figura:
En la figura. La flecha representa la jerarquía de componentes, con otros componentes inicializados debajo del componente. Se puede ver que el controlador todavía es algo complejo, con los siguientes componentes principales:
1. ControllerContext Este objeto almacena toda la información contextual necesaria para que el controlador funcione, incluido el estado de supervivencia del agente, todo. temas y plan de asignación de particiones, AR, líder, ISR y otra información para cada partición.
2. Una serie de oyentes, al escuchar al cuidador del zoológico, activan las operaciones correspondientes. Los cuadros amarillos son todos oyentes.
3. Particionar y copiar máquinas de estado, administrar particiones y copias.
4. ZookeeperLeaderElector, el agente elector actual, tiene métodos de devolución de llamada arriba y abajo.
5.PartitionLeaderElector, PartitionLeaderSelector
6.TopicDeletionManager, TopicDeletionManager
7. Comunicación por lotes de Leader al broker ControllerBrokerRequestBatch. La máquina de estado de caché procesa las solicitudes generadas y las envía de manera uniforme.
8. KafkaScheduler para operaciones de equilibrio del controlador, solo válido cuando el broker es el líder.
ZK registra información importante sobre el clúster de Kafka, como todos los nodos intermediarios del clúster, todas las particiones del tema y la información de réplica de la partición (conjunto de réplicas, réplica principal, conjunto de réplicas sincronizadas) . Cada agente tiene un controlador Para administrar todo el clúster, Kafka utiliza el modelo de elección zk para seleccionar un "controlador central" o "controlador maestro" para todo el clúster. Este controlador es en realidad un nodo de agente, excepto el. funciones de los agentes generales. Además de la función de intermediario general, también tiene una función de elección del líder de partición. El controlador central gestiona la información de todos los nodos y gestiona la elección y el reequilibrio de los nodos del clúster y los líderes de partición registrando varios eventos de escucha con ZK. Los eventos externos actualizarán los datos en ZK Una vez que los datos en ZK cambien, todos los controladores deben responder de manera diferente.
La conmutación por error significa que el agente del líder falla y el líder se transfiere a un agente diferente. El proceso de transferencia es el proceso de volver a seleccionar al líder.
Después de reelegir al líder, debe registrar los permisos apropiados para el agente y llamar al método onControllerFailover() de ZookeeperLeaderElector. En este método, se inicializarán y comenzarán una serie de componentes para completar varias operaciones del líder. Los detalles específicos son los siguientes y, en realidad, son muy similares a la inicialización del controlador.
1. Registrar oyentes relacionados con la gestión de particiones
2. Registrar oyentes relacionados con la gestión de temas
3. Registrar oyentes de cambio de agente
p>
4. Reinicialice ControllerContext,
5. Inicie el controlador y otros agentes que se comunican con ControllerChannelManager
6. Cree un objeto TopicDeletionManager para eliminar temas e inícielo. .
7. Inicie la máquina de estado de partición y la máquina de estado de réplica
8. Sondee cada tema y agregue un PartitionModificationsListener para escuchar los cambios de partición
9. Si Cuando se configura la operación programada de equilibrio de partición, cree una tarea programada de equilibrio de partición (el valor predeterminado es 300 segundos) para verificar y realizar la operación.
Además de iniciar estos componentes, el método onControllerFailover también realiza las siguientes operaciones:
1. +1 en el valor /controller_epoch y lo actualiza al ControllerContext
2. Compruebe si desea reasignar la partición inicial y realizar operaciones relacionadas
3. Compruebe si necesita seleccionar la réplica preferida como líder y realizar operaciones relacionadas
4. Envíe elementos de actualización a todos los agentes en las solicitudes de datos del clúster de Kafka.
Echemos un vistazo al método onControllerResignation llamado al cancelar los permisos del líder
1. Cancele los permisos del controlador en este método. Deje de escuchar a los oyentes correspondientes en zookeeper para conocer la partición y la réplica.
2. Cierre el componente de inicio
3. Finalmente, ControllerContext registra el valor de la versión del controlador como cero y establece el broker actual en RunnignAsBroker, convirtiéndolo en un broker normal.
Al iniciar el proceso a través del controlador, ya deberíamos saber que, en esencia, escucha los nodos relevantes en zookeeper y desencadena acciones cuando los nodos cambian.
Cuando un nuevo agente se une al clúster, se dice que se conecta. En cambio, cuando un agente se apaga y sale del clúster, se dice que es un agente fuera de línea.
El broker se conecta:
1. Cuando se inicia el nuevo broker, escribe datos en /brokers/ids
2. BrokerChangeListener escucha los cambios. Llame a ControllerChannelManager.addBroker() en el nuevo nodo en línea para completar la inicialización de la nueva capa de red proxy en línea
3. Llame a KafkaController.onBrokerStartup() para manejar el proceso
3.5 Restaurar eliminación debido a un nuevo subproceso de operación de subproceso proxy que está en línea pero suspendido
El agente está fuera de línea:
1. Encuentre la colección de nodos fuera de línea
2. Sondee los fuera de línea nodos y llame a controllerChannelManager.removeBroker() y cierre la conexión de red de cada nodo fuera de línea. Borre la cola de mensajes del nodo fuera de línea y cierre la solicitud del nodo fuera de línea
3. Sondee el nodo fuera de línea y llame a KafkaController.onBrokerFailure para su procesamiento
4. Envíe mensajes a todos los brokers supervivientes en el cluster Enviar una solicitud updateMetadataRequest
Como sugiere el nombre, el coordinador es responsable de coordinar el trabajo. Como se describe en esta sección, el coordinador se utiliza para coordinar la distribución del trabajo a los consumidores. En pocas palabras, es el trabajo de inicialización después de que el consumidor comienza y antes del consumo normal. Los consumidores confían en que el coordinador funcione correctamente.
Hay dos tipos principales de coordinadores:
1.ConsumerCoordinator
2.GroupCoordinator
Hay otras razones para presentar un coordinador en el proceso histórico kafka. Resulta que la información del consumidor depende del guardián del zoológico para el almacenamiento. Cuando el agente o el consumidor cambia, se activa el equilibrio del consumidor. En este momento, cada consumidor se comunica con el guardián del zoológico de forma independiente, lo que fácilmente puede causar un efecto de manada y un cerebro dividido. pregunta.
Para solucionar estos problemas, Kafka presentó al coordinador. El lado del servidor es GroupCoordinator y el lado del consumidor es ConsumerCoordinator. Cuando se inicia cada agente, crea una instancia de GroupCoordinator, que es responsable de administrar el grupo de consumidores parcial (equilibrio de carga del clúster) y la compensación de consumo de cada consumidor del grupo. Cuando se crea una instancia de cada consumidor, también se crea una instancia de un objeto ConsumerCoordinator, que es responsable de la comunicación entre cada consumidor del mismo grupo de consumidores y el GroupCoordinator anterior del lado del servidor. El siguiente es un diagrama esquemático:
ConsumerCoordinator puede considerarse como una clase proxy para que operen los consumidores (de hecho, no lo es). Muchas operaciones de los consumidores son manejadas por ConsumerCoordinator.
El coordinador del consumidor es principalmente responsable de las siguientes tareas:
1. Actualizar los metadatos almacenados en caché por el consumidor
2. Solicitar al coordinador del grupo que se una al grupo
3. Procesamiento correspondiente después de que el consumidor se une al grupo
4. Solicitud para salir del grupo de consumidores
5. Enviar la compensación al coordinador del grupo
6. Mantener la conciencia de conexión con el coordinador del grupo a través de latidos del corazón
7.
7. El coordinador del consumidor seleccionado como LÍDER por el coordinador del grupo es el responsable para la asignación de particiones del consumidor. Los resultados de la asignación se envían al coordinador del grupo.
8. Los consumidores que no son LEADER distribuyen los resultados de forma sincrónica a través del coordinador de consumidores y el coordinador de grupo.
En la siguiente figura se muestra una descripción de los componentes y principales dependencias del coordinador de consumidores:
Como se puede observar, estos componentes son equivalentes al trabajo realizado por el coordinador de consumidores. .
El coordinador de grupo es responsable de atender diversas solicitudes del coordinador de consumidores. Proporciona la siguiente funcionalidad:
Se crean instancias de los coordinadores de grupo cuando se inicia el corredor y cada coordinador de grupo es responsable de administrar una parte del grupo de consumidores. Los componentes de los que depende se muestran en la siguiente figura:
Estos componentes también corresponden a las funciones del coordinador del grupo. No entraremos en detalles.
El siguiente diagrama muestra cómo un consumidor inicia el proceso de seleccionar un líder de grupo y unirse a un grupo.
El proceso de unir un consumidor a un grupo es un buen ejemplo de cómo el coordinador de consumidores y el coordinador de grupo trabajan juntos. El consumidor líder se hace cargo del trabajo de partición, lo que reduce en gran medida la presión sobre el clúster de Kafka. El coordinador del grupo mantiene sincronizados a los consumidores del mismo grupo. La correspondencia entre consumidores y particiones persiste en los temas internos de Kafka.
Cuando un consumidor consume, mantendrá localmente la posición (compensación) en la que consume, es decir, la compensación, para que sepa por dónde empezar la próxima vez que consuma. Esto es suficiente si el entorno general no cambia. Sin embargo, una vez que la operación de equilibrio del consumidor o la partición cambia, el consumidor ya no corresponde a la partición original y el desplazamiento de cada consumidor no se sincronizará con el servidor, por lo que el trabajo del consumidor anterior no se puede continuar.
Por lo tanto, la única forma es ser administrado centralmente por GroupCoordinator, enviando periódicamente compensaciones de consumidores al servidor y luego, después de redistribuir las particiones, cada consumidor lee el desplazamiento de su partición correspondiente del volumen de GroupCoordinator y continuar el trabajo de su predecesor en la nueva partición.
El siguiente diagrama ilustra el problema de no enviar compensaciones al servidor:
Inicialmente, el consumidor 0 consume las particiones 0 y 1, luego, cuando el nuevo consumidor 2 se une al grupo, las particiones se redistribuyen. El consumidor 0 ya no consume la partición 2. En cambio, el consumidor 2 consume la partición 2. Sin embargo, dado que los consumidores no pueden comunicarse entre sí, todos los consumidores 2 no saben por dónde empezar a consumir.
Por lo tanto, los consumidores deben enviar periódicamente sus compensaciones de consumo al servidor, de modo que después de la operación de repartición, cada consumidor pueda encontrar la compensación consumida por la partición asignada a sí mismo en la cantidad del servidor y continuar consumiendo.
Dado que Kafka tiene alta disponibilidad y escalabilidad horizontal, cuando aparece una nueva partición o se agrega un nuevo consumidor al grupo, la partición correspondiente al consumidor debe reasignarse, por lo que si hay un problema con el desplazamiento envío, los mensajes se consumirán repetidamente o se perderán. ¡Preste especial atención al momento y método de envío de la compensación!
1. Confirmar compensaciones automáticamente
Establezca enable.auto.commit en verdadero y establezca el período, que por defecto es 5 segundos. Cada vez que el consumidor llama al método poll() para sondear mensajes, verifica si el desplazamiento no se ha confirmado durante más de 5 segundos. Si es así, confirma el desplazamiento devuelto por la última encuesta.
Esto es muy conveniente, pero traerá el problema del consumo repetido. Si se activa un reequilibrio 3 segundos después de que se confirmó el último desplazamiento y el servidor almacenó el mismo desplazamiento que el último compromiso, una vez que se complete el reequilibrio, los nuevos consumidores comenzarán desde el último desplazamiento confirmado. Se recuperarán los mensajes y se consumirán los mensajes. dentro de estos 3 segundos se consumirá repetidamente.
2. Confirmar compensaciones manualmente
Establezca enable.auto.commit en falso. Llame manualmente a commitSync () en el programa para confirmar el desplazamiento y confirmar el último desplazamiento devuelto por el método de sondeo.
commitSync() confirmará el desplazamiento sincrónicamente y el programa principal se bloqueará hasta que se envíe el desplazamiento. Esto limita el rendimiento del programa. Si se reduce la frecuencia de envío, fácilmente puede provocar un consumo repetido.
Aquí podemos usar commitAsync() para confirmar el desplazamiento de forma asincrónica. commitAsync no vuelve a intentarlo, un error es un fracaso. commitAsync no vuelve a intentarlo porque cuando vuelve a intentar la confirmación, es posible que haya otras compensaciones confirmadas con éxito con compensaciones más grandes. Si vuelve a intentar la confirmación con éxito en este momento, la compensación más pequeña sobrescribirá la cantidad de desplazamiento mayor. Luego, si se produce un reequilibrio en este momento, el nuevo consumidor consumirá los mensajes nuevamente.