Cómo determinar el número de particiones, claves y subprocesos de consumo en Kafka, y soluciones a problemas que no son de consumo
¿Cómo determinar el número de particiones?
"¿Cuántas particiones debo elegir? --Si es miembro de la comunidad china de Kafka, a menudo se encontrará con esta pregunta. Desafortunadamente, no parece que tengamos una respuesta muy autorizada. Esto no No es sorprendente, después de todo, generalmente no hay una respuesta fija para este tipo de preguntas. El sitio web oficial de Kafka lo anuncia como un "sistema de mensajería distribuida de alto rendimiento", es decir, un motor de mensajería distribuida de alto rendimiento. Alto rendimiento. ¿Qué? Kafka abandona el mecanismo de almacenamiento en caché del montón de Java en la parte inferior, utiliza el almacenamiento en caché de páginas a nivel del sistema operativo y convierte operaciones de escritura aleatoria en escrituras secuenciales. Sin embargo, este es solo un aspecto, lo que mejora enormemente el rendimiento de IO. la capacidad de optimización independiente tiene un límite superior. ¿Cómo mejorar aún más el rendimiento mediante la expansión horizontal o incluso la expansión lineal? Kafka usa particiones para lograr esto dividiendo los mensajes temáticos en múltiples particiones y distribuyéndolos entre diferentes intermediarios. ya sea productores o consumidores)
Los productores y consumidores de Kafka pueden operar en paralelo en múltiples subprocesos, y cada subproceso procesa una partición de datos. En realidad, es la unidad más pequeña para ajustar el paralelismo de Kafka. Para los productores, en realidad utiliza múltiples subprocesos. para iniciar conexiones de socket a diferentes particiones del intermediario al mismo tiempo y enviar mensajes a estas particiones para los consumidores. Todos los subprocesos de consumidores en el mismo grupo de consumidores están asignados para consumir una determinada partición del tema (cómo determinar el número de consumidores); subprocesos, lo explicaremos más adelante) (Cómo determinar el número de subprocesos consumidores, lo explicaremos más adelante. Entonces, cuantas más particiones tenga un tema, mayor será el rendimiento que teóricamente puede lograr el clúster
Pero es). ¿Es mejor tener más particiones? Aparentemente no, ya que cada partición tiene la suya propia:
Primero, el cliente/servidor necesita usar más memoria
Primero, hablemos de Situación del cliente. Después de la introducción de la versión Java, Kafka 0.8.2 agregó un parámetro. Este productor tiene un parámetro de tamaño de lote, que por defecto es 16 KB. Una vez que esté lleno, almacenará en caché los mensajes. Los mensajes se publicarán en lotes, pero es obvio que puede mejorar el rendimiento. Este parámetro está en el nivel de partición. Cuantas más particiones haya, más memoria ocupará esta parte del caché. del caché ocupará aproximadamente 157 MB de memoria. Entonces, ¿qué pasa con el lado del consumidor? Dejando de lado la memoria necesaria para obtener los datos, hablemos simplemente de la sobrecarga de subprocesos. Si todavía asumimos que hay 10,000 particiones, la cantidad de subprocesos de consumidor. debe coincidir con el número de particiones (que es el máximo para el rendimiento del consumidor en la mayoría de los casos (configuración óptima), entonces el cliente consumidor debe crear 10,000 subprocesos y aproximadamente 10,000 sockets para obtener datos de partición). No se puede subestimar la sobrecarga del cambio de subprocesos en sí.
La sobrecarga en el lado del servidor no es pequeña. Si lee el código fuente de Kafka, puede encontrar que muchos componentes en el lado del servidor mantienen cachés a nivel de partición en la memoria, como el controlador, FetcherManager, etc. ., por lo que cuantas más particiones, más tiempo llevará. El coste de este caché es mayor.
II. Gastos generales de procesamiento de archivos
Cada partición tiene su propio directorio en el sistema de archivos subyacente.
Generalmente hay dos archivos en este directorio: base_offset.log y base_offset.index, que el controlador de Kafak y ReplicaManager almacenan para cada agente. Obviamente, cuantas más particiones tenga, más controladores de archivos necesitará abrir y eventualmente podrá exceder el límite ulimit -n.
En tercer lugar, reducir la alta disponibilidad
Kafka garantiza una alta disponibilidad a través del mecanismo de replicación. Esto se logra manteniendo una cierta cantidad de réplicas para cada partición (replica_factor especifica la cantidad de réplicas). Cada copia se almacena en un corredor diferente. Una de las réplicas actúa como réplica líder y es responsable de procesar las solicitudes de productores y consumidores. Las otras réplicas actúan como réplicas seguidoras y el controlador Kafka las sincroniza con el líder. Si el agente donde se encuentra el líder se cuelga, el controlador lo detectará y volverá a seleccionar un nuevo líder con la ayuda del cuidador del zoológico; habrá una breve ventana de indisponibilidad, pero en la mayoría de los casos es posible solo unos pocos milisegundos. Pero si tiene 10.000 particiones y 10 corredores, eso significa que hay un promedio de 1.000 particiones en cada corredor. En este punto, después de que el agente se cuelga, el cuidador del zoológico y el controlador deben realizar la elección del líder para estas 1000 particiones inmediatamente. Inevitablemente, esto llevará más tiempo que un número muy pequeño de elecciones de líderes de partición y, por lo general, no es linealmente acumulativo. La situación es aún peor si el agente es también el responsable del tratamiento.
Después de decir tantas “tonterías”, mucha gente debe haberse impacientado. Entonces usted puede preguntarse, ¿cómo determinar en última instancia el número de particiones? La respuesta es: depende. Básicamente, todavía necesitas pasar por una serie de experimentos y pruebas para descubrirlo. Por supuesto, las pruebas deben basarse en el rendimiento. Aunque este artículo de LinkedIn realiza una prueba comparativa de Kafka, sus resultados en realidad significan poco para usted, porque los resultados de diferentes pruebas de hardware, software y carga seguramente serán diferentes. A menudo me encuentro con este problema: el sitio web oficial dice que puede alcanzar los 10 MB por segundo, pero ¿por qué mi productor solo tiene 1 MB por segundo? --Sin mencionar las condiciones del hardware, finalmente se descubrió que el cuerpo del mensaje que usó era de 1 KB, mientras que la prueba comparativa en el sitio web oficial midió 100 B, por lo que no hay comparabilidad. Sin embargo, aún puede seguir ciertos pasos para intentar determinar la cantidad de particiones: cree un tema con solo 1 partición y luego pruebe el rendimiento del productor y el rendimiento del consumidor de este tema. Supongamos que sus valores son Tp y Tc (unidad: MB/s) respectivamente. Luego, suponiendo que el rendimiento objetivo total es Tt, entonces el número de particiones = Tt / max(Tp, Tc)
Tp es el rendimiento del productor. Probar un productor suele ser fácil porque la lógica es tan simple como enviar mensajes directamente a Kafka. La prueba de Tc generalmente es más específica de la aplicación porque el valor de Tc depende de lo que se hace con el mensaje, por lo que probar Tc suele ser complicado.
Además, Kafka no escala linealmente (como lo hace cualquier sistema, en realidad), por lo que es mejor planificar un poco más las particiones para que sea más fácil escalar en el futuro.
Asignación de particiones de mensajes
De forma predeterminada, Kafka asigna particiones según la clave del mensaje entregado (es decir, hash(key) numPartitions), como se muestra a continuación:
def partición(clave: Cualquiera, numPartitions: Int).Int = {
Utils.abs(key.hashCode) numPartitions
}
Esto asegurará tener los mismos mensajes para una clave siempre se enrutan a la misma partición.
Si no especifica una clave, ¿cómo determina Kafka a qué partición se enviará el mensaje?
if(key == null) { // Si no especificó la clave
val id = sendPartitionPerTopicCache.get(topic) // Primero, vea si Kafka tiene cachés existentes ID de partición
id match {
case Some(partitionId) =gt
particionId // Si hay uno, use el ID de partición
<; p > case None =gt; // Si no, use el ID de particiónval availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) // Descubra en qué broker están los líderes de todas las particiones disponibles
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No hay líder para ninguna partición en el tema " tema)
val index = Utils.Random.Random. nextInt = Aleatorio.Aleatorio.abs(Aleatorio.Aleatorio.siguienteInt)