Red de conocimiento informático - Conocimiento informático - Cómo determinar el número de particiones, claves y subprocesos de consumo de Kafka

Cómo determinar el número de particiones, claves y subprocesos de consumo de Kafka

R. Cuanta más memoria necesite usar el cliente/servidor

Primero, hablemos del cliente. Kafka 0.8.2 agregó un nuevo productor después de introducir la versión Java. El productor tiene un parámetro tamaño de lote, cuyo valor predeterminado es 16 KB. Almacena en caché los mensajes para cada partición y, una vez llena, los mensajes empaquetados se envían en lotes. Almacenará en caché la información de cada partición y, una vez llena, enviará la información en lotes. Este parece un buen diseño para mejorar el rendimiento. Sin embargo, obviamente, dado que este parámetro está en el nivel de partición, cuantas más particiones haya, más memoria ocupará esta parte del caché. Suponiendo 10.000 particiones, esta parte del caché ocupará aproximadamente 157 MB de memoria según la configuración predeterminada. ¿Qué pasa con el lado del consumidor? Dejemos de lado la memoria necesaria para recuperar los datos y hablemos únicamente de la sobrecarga del subproceso. Si aún asumimos que hay 10,000 particiones y la cantidad de subprocesos del consumidor debe coincidir con la cantidad de particiones (que es la mejor configuración para el rendimiento del consumidor en la mayoría de los casos), entonces el cliente consumidor tendría que crear 10,000 subprocesos y también aproximadamente 10,000. Es necesario crear sockets para obtener datos de partición. No se puede subestimar la sobrecarga del cambio de subprocesos.

La sobrecarga en el lado del servidor no es pequeña. Si lee el código fuente de Kafka, puede encontrar que muchos componentes en el lado del servidor mantienen cachés a nivel de partición en la memoria, como el controlador, FetcherManager, etc. ., por lo que cuantas más particiones, más tiempo llevará. El coste de este caché es mayor.

II. Gastos generales de procesamiento de archivos

Cada partición tiene su propio directorio en el sistema de archivos subyacente. Generalmente hay dos archivos en este directorio: base_offset.log y base_offset.index, que el controlador de Kafak y ReplicaManager almacenan para cada agente. Obviamente, cuantas más particiones tenga, más controladores de archivos necesitará abrir y eventualmente podrá exceder el límite ulimit -n.

En tercer lugar, reducir la alta disponibilidad

Kafka garantiza una alta disponibilidad a través del mecanismo de replicación. Esto se logra manteniendo una cierta cantidad de réplicas para cada partición (replica_factor especifica la cantidad de réplicas). Cada copia se almacena en un corredor diferente. Una de las réplicas actúa como réplica líder y es responsable de procesar las solicitudes de productores y consumidores. Las otras réplicas actúan como réplicas seguidoras y el controlador Kafka las sincroniza con el líder. Si el agente donde se encuentra el líder se cuelga, el controlador lo detectará y volverá a seleccionar un nuevo líder con la ayuda del cuidador del zoológico; habrá una breve ventana de indisponibilidad, pero en la mayoría de los casos es posible solo unos pocos milisegundos. Pero si tiene 10.000 particiones y 10 corredores, eso significa que hay un promedio de 1.000 particiones en cada corredor. En este punto, después de que el agente se cuelga, el cuidador del zoológico y el controlador deben realizar la elección del líder para estas 1000 particiones inmediatamente. Inevitablemente, esto llevará más tiempo que un número muy pequeño de elecciones de líderes de partición y, por lo general, no es linealmente acumulativo. La situación es aún peor si el agente es también el responsable del tratamiento.

Después de decir tantas “tonterías”, mucha gente debe haberse impacientado. Entonces usted puede preguntarse, ¿cómo determinar en última instancia el número de particiones? La respuesta es: depende. Básicamente, todavía es necesario pasar por una serie de experimentos y pruebas para determinarlo. Por supuesto, las pruebas deben basarse en el rendimiento. Aunque este artículo de LinkedIn realiza una prueba comparativa de Kafka, sus resultados en realidad significan poco para usted, porque los resultados de diferentes pruebas de hardware, software y carga seguramente serán diferentes. A menudo me encuentro con este problema: el sitio web oficial dice que puede alcanzar los 10 MB por segundo, pero ¿por qué mi productor solo tiene 1 MB por segundo? --Sin mencionar las condiciones del hardware, finalmente se descubrió que el cuerpo del mensaje que usó era de 1 KB, mientras que la prueba comparativa en el sitio web oficial midió 100 B, por lo que no hay comparabilidad. Sin embargo, aún puede seguir ciertos pasos para intentar determinar la cantidad de particiones: cree un tema con solo 1 partición y luego pruebe el rendimiento del productor y el rendimiento del consumidor de este tema. Supongamos que sus valores son Tp y Tc (unidad: MB/s) respectivamente.

Luego, suponiendo que el rendimiento objetivo total es Tt, entonces el número de particiones = Tt / max(Tp, Tc)

Tp es el rendimiento del productor. Probar un productor suele ser fácil porque la lógica es tan simple como enviar mensajes directamente a Kafka. La prueba de Tc generalmente es más específica de la aplicación porque el valor de Tc depende de lo que se hace con el mensaje, por lo que probar Tc suele ser complicado.

Además, Kafka no escala linealmente (en realidad, cualquier sistema lo hace), por lo que es una buena idea planificar sus particiones un poco más para que pueda escalar más fácilmente en el futuro.

Asignación de particiones de mensajes

De forma predeterminada, Kafka asigna particiones según la clave del mensaje entregado (es decir, hash(key) % numPartitions), de la siguiente manera:

def partición(clave: Cualquiera, numPartitions: Int).Int = {

Utils.abs(key.hashCode) % numPartitions

}

Esto Asegúrese de que los mensajes con la misma clave siempre se enruten a la misma partición. Sin especificar un valor clave, ¿cómo determina Kafka a qué partición se enviará el mensaje?

Copiar código

if(key == null) { // Si no especifica la clave

val id = sendPartitionPerTopicCache.get(topic) // Ver ¿Kafka almacena en caché cualquier ID de partición existente?

id coincide con {

case Some(partitionId) =>

partitionId // Si lo hay, úselo

case None => // Si no,

val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) // Encuentre el corredor donde se encuentran los líderes de todas las particiones disponibles

if (availablePartitions.isEmpty)

Lanza una nueva LeaderNotAvailableException("No hay líder para ninguna partición en el tema " + tema)

val index = Utils.abs(Random . nextInt) % availablePartitions.size // Seleccione aleatoriamente uno de la lista

val particionId = availablePartitions(index).partitionId

sendPartitionPerTopicCache.put (tema, particionId) // Actualizar caché para Úselo directamente la próxima vez

partitionId

}

}

Copia el código

Puedes ver que Kafka simplemente busca aleatoriamente una partición a la que enviar el mensaje sin clave y luego agrega ese número de partición al caché para uso directo más adelante (por supuesto),

Cómo configurar el número de subprocesos del consumidor

Mi opinión personal es que si tienes N particiones, es mejor mantener el número de subprocesos en N. Esto suele ser suficiente para conseguir el efecto deseado, pero no supone ningún problema. Mantener el número de subprocesos por debajo de N generalmente maximiza el rendimiento. Las configuraciones superiores a N solo desperdiciarán recursos del sistema ya que los subprocesos adicionales no se asignarán a ninguna partición. Veamos cómo Kafka asigna los hilos.

Las particiones de un tema solo pueden ser consumidas por un único subproceso de consumidor del mismo grupo de consumidores, pero lo contrario no es cierto, es decir, un único subproceso de consumidor puede consumir datos de varias particiones, por ejemplo, el ConsoleConsumer proporcionado por Kafka. De forma predeterminada, solo un subproceso consume datos de todas las particiones. --De hecho, ConsoleConsumer puede utilizar funciones comodín para consumir datos de varios temas al mismo tiempo, pero esto no es relevante para este artículo.

Antes de discutir la estrategia de distribución, hablemos primero de KafkaStream: es la clase clave del consumidor y proporciona métodos transversales para que el programa del consumidor llame para consumir datos. Mantiene una cola de bloqueo, por lo que cuando no llegan nuevos mensajes, el consumidor estará en un estado bloqueado, lo que demuestra que el programa del consumidor está esperando la llegada de nuevos mensajes. --Por supuesto, también puede configurar un tiempo de espera para el consumidor; consulte el uso del parámetro consumer.timeout.ms.

Kafka proporciona dos estrategias de asignación: rango y roundrobin, especificadas por partición.assignment.strategy, y la predeterminada es rango. En este artículo, sólo discutiremos la estrategia de rango. El llamado rango es en realidad la fase distribuida uniformemente.

Tomemos un ejemplo para comprenderlo. Supongamos que hay 10 particiones, P0 ~ P9, y el número de subprocesos consumidores es 3, C0 ~ C2. Entonces, ¿a qué partición está asignado cada subproceso?

C0 partición de consumo 0, 1, 2, 3

C1 partición de consumo 4, 5, 6

C2 partición de consumo 7, 8, 9

El algoritmo exacto es:

Copiar código

val nPartsPerConsumer = curPartitions.size / curConsumers.size / curPartitions.size / curConsumers.sizesize // Garantizado para cada consumidor El número mínimo de particiones consumidas

val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // ¿Cuántas particiones quedan que se pueden asignar individualmente al comienzo del hilo?

...

for (consumerThreadId <- consumerThreadIdSet) { // Para cada hilo de consumidor

val myConsumerPosition = curConsumers.indexOf( consumerThreadId) // Calcula que el hilo está en [0, n- 1]

assert(myConsumerPosition >= 0)

// startPart es el número de particiones iniciales que consumirá este hilo

val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)

// nParts es el número total de particiones que consumirá este hilo****

val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

...

}

Copiar código

En este ejemplo, nPartsPerConsumer es 10/3 = 3 y nConsumersWithExtraPart es 10%3 = 1, lo que indica que se garantiza que cada subproceso obtendrá al menos 3 particiones, y la partición restante debe asignarse a varios subprocesos individualmente al principio. Es por eso que C0 ocupa 4 particiones y los siguientes 2 subprocesos ocupan 3 particiones cada uno. Para obtener detalles sobre este proceso, consulte la información de captura de pantalla de depuración a continuación:

ctx.myTopicThreadIds

p>nPartsPerConsumer = 10 / 3 = 3

nConsumersWithExtraPart = 10 % 3 = 1

Primera vez:

myConsumerPosition = 1

startPart = 1 * 3 + min(1, 1) = 4 -- es decir, leído desde la partición 4

nPartes = 3 + ( if (1 + 1 > 1) 0 else 1 ) = 3 lee 3 particiones, es decir, lee desde la partición 4.

e.,4,5,6

La segunda vez:

myConsumerPosition = 0

startPart = 3 * 0 + min(1, 0) = 0 --- Leer desde la partición 0

nPartes = 3 + ( if (0 + 1 > 1) 0 else 1) = 4 Leer 4 particiones, es decir, 0,1,2,3

La tercera vez:

myConsumerPosition = 2

startPart = 3 * 2 + min(2, 1) = 7 --- Comienza a leer datos de la partición 7

nPartes = 3 + if (2 + 1 > 1) 0 else 1) = 3 Leer 3 particiones, 7, 8, 9

Hasta ahora, las 10 Todas las particiones han sido asignadas

En otras palabras, a menudo necesito un subproceso CONSUMIDOR específico para consumir la partición especificada, no otras particiones. Francamente, Kafka no proporciona actualmente estrategias de asignación personalizadas. Esto es difícil de hacer, pero pensándolo bien, tal vez estemos esperando que Kafka haga demasiado; después de todo, es solo un motor de mensajería, y agregar lógica de consumo de mensajes a Kafka probablemente no sea lo que Kafka debería estar haciendo.