Red de conocimiento informático - Problemas con los teléfonos móviles - Significación de datos de Spark y sus soluciones

Significación de datos de Spark y sus soluciones

Este artículo presenta la distorsión de datos de Spark y sus soluciones, desde los aspectos básicos hasta los aspectos más profundos de los peligros, fenómenos y causas de la distorsión de datos.

En primer lugar, ¿qué es la distorsión de datos?

Para los sistemas de big data distribuidos como Spark/Hadoop, una gran cantidad de datos no da miedo, pero la distorsión de datos sí lo es.

Idealmente, para sistemas distribuidos, a medida que aumenta el tamaño del sistema (número de nodos), el tiempo total de aplicación disminuirá linealmente. Si una máquina tarda 120 minutos en procesar un gran lote de datos, cuando el número de máquinas aumenta a 3, el tiempo ideal es 120/3 = 40 minutos. Sin embargo, para implementar un esquema distribuido donde el tiempo de ejecución de cada máquina es 1/N del de una sola máquina, es necesario asegurarse de que el número de tareas por máquina sea igual. Desafortunadamente, en muchos casos, la distribución de tareas es desigual, incluso tan desigual que la mayoría de las tareas se asignan a una sola máquina, mientras que a la mayoría de las otras máquinas se les asigna solo una pequeña porción del número total de tareas. Por ejemplo, una máquina maneja 80 tareas y las otras dos máquinas manejan 10 tareas cada una.

El mayor problema de los entornos distribuidos es que no hay demasiadas tareas. Esto significa que en lugar de escalar linealmente, hay un efecto de atajo: el tiempo pasado en un escenario está determinado por la tarea más lenta.

Dado que todas las tareas en una etapa realizan el mismo cálculo, la diferencia en el tiempo empleado por diferentes tareas está determinada principalmente por la cantidad de datos procesados ​​por la tarea, en lugar de por la diferencia en la potencia informática de los diferentes nodos informáticos. . Por lo tanto, para aprovechar la computación paralela en sistemas distribuidos, se debe resolver el problema de la distorsión de datos.

En segundo lugar, el daño de la distorsión de datos

Cuando se produce una distorsión de datos, algunas tareas toman mucho más tiempo que otras, lo que hace que el tiempo total invertido sea demasiado grande y no pueda utilizarse por completo. Aproveche la computación paralela en sistemas distribuidos.

Además, cuando se produce una distorsión de datos, la cantidad de datos procesados ​​por algunas tareas es demasiado grande, lo que provocará una memoria insuficiente, lo que provocará fallas en la tarea y luego provocará que falle toda la aplicación.

3. Asimetría de los datos

Cuando encuentre los siguientes fenómenos, es probable que los datos estén sesgados:

4. Razón de la asimetría de los datos

En una combinación aleatoria, debe extraer las mismas claves en cada nodo para una tarea en el nodo para procesarlas como agregación o esto se puede lograr mediante agregación o unión por clave, etc. Si la cantidad de datos correspondientes a una determinada clave es particularmente grande, los datos estarán sesgados. Por ejemplo, si la mayoría de las palabras clave corresponden a 10 datos y una sola palabra clave corresponde a un millón de datos, entonces la mayoría de las tareas pueden asignarse a solo 10 datos y ejecutarse en un segundo, mientras que se puede asignar una sola tarea. a cien Diez mil datos y se ejecuta durante una o dos horas.

Por lo tanto, cuando se produce una desviación de datos, los trabajos de Spark se ejecutarán muy lentamente e incluso pueden causar un desbordamiento de memoria porque la tarea procesa demasiados datos.

5. Identificación y ubicación del problema

1. A través de Spark Web UI

Vea la cantidad de datos asignados a cada tarea en la etapa de ejecución actual a través de Spark Web UI (tamaño/registros de lectura aleatoria) para determinar aún más si las tareas están distribuidas de manera desigual.

Una vez que sabemos en qué etapa ocurren los datos sesgados, debemos averiguar qué parte del código corresponde a la etapa donde ocurre el sesgo según el principio de división de etapas. Esta parte del código definitivamente tendrá. un mensaje similar al operador Shuffle. Puedes ver la distribución de claves con countByKey.

2. Estadísticas de claves

También puedes muestrear el número de apariciones de claves.

Debido a la gran cantidad de datos, el muestreo se puede utilizar para muestrear los datos, contar el número de ocurrencias y eliminar los primeros en orden según la cantidad de ocurrencias:

Si se encuentra la mayoría de los datos Si la distribución es relativamente uniforme y los datos individuales son un orden de magnitud mayor que otros datos, significa que los datos están sesgados.

VI.Cómo aliviar la asimetría de datos

Ideas básicas

Idea 1. Filtrar datos anormales

Si la clave para la asimetría de datos es Datos anormales, luego simplemente filtrelos.

El primer paso es analizar las palabras clave para determinar qué palabras clave están provocando el sesgo. El método específico se presentó anteriormente y no se repetirá aquí.

A continuación analiza los registros correspondientes a estas claves:

Solución

Para los casos 1 y 2, filtra los datos directamente.

El tercer caso requiere un manejo especial, que detallaremos a continuación.

Idea n.º 2: aumentar el paralelismo de la reproducción aleatoria

Spark utiliza de forma predeterminada un HashPartitioner (en lugar de una Hash aleatoria) para particionar los datos durante la reproducción aleatoria. Si el paralelismo no se configura correctamente, se puede asignar una gran cantidad de datos correspondientes a diferentes claves a la misma tarea, lo que hace que esa tarea procese muchos más datos que otras tareas, sesgando así los datos.

Si ajusta el paralelismo de Shuffle para que diferentes claves originalmente asignadas a la misma tarea se asignen a diferentes tareas, entonces puede reducir la cantidad de datos que la tarea original necesita procesar, mitigando así el problema de sesgo de datos. Efecto tabla corta.

(1) Proceso de operación

Las operaciones RDD se pueden configurar directamente en el operador de operación que debe mezclarse o usar spark.default.parallelism. Para Spark SQL, también puede establecer el paralelismo usando SET spark.sql.shuffle.partitions=[num_tasks]. Los parámetros predeterminados están controlados por diferentes administradores de clústeres.

dataFrame y sparkSql pueden configurar el parámetro spark.sql.shuffle.partitions=[num_tasks] para controlar la concurrencia de la reproducción aleatoria.

(2) Escenarios aplicables

Se asigna una gran cantidad de claves diferentes a la misma tarea, lo que hace que la tarea tenga demasiados datos.

(3) Solución

Ajustar el paralelismo. Esto normalmente se logra aumentando el grado de paralelismo, pero a veces también se puede lograr disminuyendo el grado de paralelismo.

(4) Ventajas

Fácil de implementar, basta con ajustar los parámetros. Los problemas se pueden resolver con un coste mínimo. En términos generales, si los datos están sesgados, puede probar este método varias veces y, si el problema no se resuelve, probar otros métodos.

(5) Desventajas

Tiene menos escenarios aplicables y solo permite que cada tarea ejecute menos claves diferentes. No hay forma de resolver la asimetría causada por una única clave que es particularmente grande. Si el tamaño de algunas claves es muy grande, incluso si una tarea la ejecuta sola, se verá afectada por la asimetría de datos. Además, este método sólo puede aliviar la distorsión de los datos, pero no puede eliminar por completo el problema. En la práctica, este enfoque es generalmente ineficaz.

Idea 3. Particionador personalizado

(1) Principio

Utilice un particionador personalizado (el valor predeterminado es HashPartitioner) para asignar diferentes claves originalmente asignadas a la misma tarea a diferentes tareas.

Por ejemplo, utilizamos un particionador personalizado en el operador groupByKey:

(2) Escenario

Se asigna una gran cantidad de claves diferentes a la misma tarea , lo que lleva a una sobrecarga de tareas.

(3) Solución

Utilice una clase de implementación de Partitioner personalizada en lugar del HashPartitioner predeterminado para intentar distribuir todas las claves diferentes por igual a diferentes tareas.

(4) Ventajas

No afectará al diseño paralelo original.

(5) Desventajas

Los escenarios de aplicación son limitados y solo pueden dispersar diferentes claves. No es adecuado para escenarios donde el conjunto de datos correspondiente a la misma clave es muy grande. Además, es necesario personalizar el particionador en función de las características de los datos, lo cual no es lo suficientemente flexible.

Idea 4. Convertir unión del lado reducido en unión del lado del mapa

A través del mecanismo de transmisión de Spark, la unión del lado reducido se convierte en unión del lado del mapa, lo que significa que Spark ahora ya no se mezcla entre nodos. Conéctese directamente a través de archivos locales; . Esto significa que Spark ahora ya no mezcla entre nodos, sino que une directamente archivos locales, eliminando por completo la distorsión de datos causada por la mezcla.

Donde A es un marco de datos más pequeño que se puede almacenar en la memoria del ejecutor.

(1) Escenario

El conjunto de datos en un lado de la conexión es lo suficientemente pequeño como para cargarse en el controlador y transmitirse a cada ejecutor mediante el método de transmisión.

(2) Solución

Introduzca los datos del conjunto de datos pequeños en el controlador en código Java/Scala y luego transmita los datos del conjunto de datos pequeños al ejecutor a través del esquema de transmisión, o Ajuste el umbral de transmisión antes de usar SQL para que sea lo suficientemente grande como para que la transmisión sea efectiva. Alternativamente, antes de usar SQL, ajuste el umbral de transmisión a un valor lo suficientemente grande como para que la transmisión surta efecto.

(3) Ventajas

Evitar la mezcla elimina por completo las condiciones de sesgo de datos, lo que mejora enormemente el rendimiento.

(4) Desventajas

Dado que los datos pequeños se envían primero a cada ejecutor a través de Broadcase, el conjunto de datos de la parte que participa en Join debe ser lo suficientemente pequeño y adecuado principalmente para el escenario de unión Adecuado para escenarios de agregación, las condiciones de aplicación son limitadas.

Idea 5. Dividir unión y unión

Esta idea es muy simple: dividir una unión en una unión de conjunto de datos sesgado y una unión de conjunto de datos no sesgado, y finalmente realizar unión:

(1) Escenarios aplicables

Ambas tablas son demasiado grandes para usar la unión del lado del mapa. El tamaño de los datos de varias claves en un RDD es demasiado grande, mientras que el conjunto de claves en el otro RDD está distribuido de manera más uniforme.

(2) Solución

En el RDD con datos sesgados, extraiga el conjunto de datos correspondiente a la clave sesgada por separado, agregue un prefijo aleatorio y luego agregue cada conjunto de datos en otro RDD Los datos se combinan con prefijos aleatorios para formar un nuevo RDD (equivalente a aumentar los datos a N veces el tamaño original, donde N es el número total de prefijos aleatorios). Luego concatena los dos datos y elimina el prefijo. Luego conecte los datos restantes que no contienen la clave sesgada y finalmente combine los conjuntos de resultados de las dos conexiones mediante la unión para obtener todos los resultados de la conexión.

(3) Ventajas

En comparación con Map Join, es más adaptable a la unión de grandes conjuntos de datos y, si los recursos son suficientes, las partes sesgadas y no sesgadas de los datos. set Se puede ejecutar en paralelo, lo que mejora significativamente la eficiencia. La expansión de datos solo apunta a la parte sesgada de los datos, lo que aumenta el consumo de recursos hasta cierto punto.

(4) Desventajas

Si hay muchos puntos clave inclinados y la expansión de datos en el otro lado es muy grande, esta solución no es adecuada. Además, si maneja las claves sesgadas y no sesgadas por separado, deberá escanear el conjunto de datos dos veces, lo que agregará gastos generales.

Idea 6: eliminar las claves de la tabla grande y extender la tabla pequeña N veces el valor de Joan

Si hay muchas claves sesgadas, divida estas grandes cantidades de claves sesgadas antes El método no tiene mucho sentido. En este momento, un método más apropiado es agregar directamente prefijos aleatorios a todos los conjuntos de datos con datos asimétricos y luego realizar un producto cartesiano de todo el conjunto de datos con prefijos aleatorios con otro conjunto de datos sin datos muy asimétricos (es decir, como expandir el volumen de datos N veces).

Este es en realidad un caso especial o una simplificación del método anterior.

(1) Escenario

Un conjunto de datos tiene una gran cantidad de claves sesgadas, mientras que el otro conjunto de datos tiene una distribución de datos más uniforme.

(2) Ventajas

Aplicable a la mayoría de escenarios.

(3) Desventajas

El conjunto de datos debe ampliarse N veces, lo que aumentará el consumo de recursos.

Idea 7: Agregación local en el lado del mapa

Agregue una función combinadora en el lado del mapa para la agregación local. Agregar un combinador equivale a una restauración temprana, que agrega las mismas claves en el asignador, lo que reduce la cantidad de datos en el proceso de reproducción aleatoria y la cantidad de cálculo en el lado de la restauración. Este enfoque puede mitigar eficazmente los problemas de sesgo de datos, pero no es muy efectivo si las claves que causan el sesgo de datos se distribuyen entre diferentes mapeadores.

Idea #8: Agregación local salada Agregación global desalada

La idea central de este esquema es realizar una agregación en dos etapas. 1) (2_hola, 1) (1_hola, 1) (2_hola, 1). Luego realice operaciones de agregación como reduceByKey en los datos después de ingresar números aleatorios y realice una agregación parcial. Luego, el resultado de la agregación parcial será (1_hola, 2) (2_hola, 2) (3_hola, 1). Luego elimine el prefijo de cada clave y se convertirá en (hola, 2) (hola, 2) (hola, 1), y luego realice la agregación global nuevamente y obtendrá el resultado final, como (hola, 5) .

Sin embargo, ejecutar mapreduce dos veces es ligeramente peor que ejecutarlo una vez.

VII. Sesgo de datos en Hadoop

En Hadoop, los programas Mapreduce y Hive se utilizan directamente cerca de los usuarios, aunque Hive también es el último MR que se ejecuta (al menos por ahora). (La informática de memoria de Hive no es popular), pero después de todo, la lógica del contenido escrito es muy diferente: uno es un programa y el otro es Sql, por lo que todavía hay una ligera diferencia aquí.

La principal manifestación del sesgo de datos en Hadoop es que la fase ruduce permanece en 99,99 y no puede finalizar hasta 99,99.

Aquí, si observa en detalle el registro o la interfaz de monitoreo, encontrará:

Experiencia: la desviación de datos de Hive generalmente ocurre en Sql Group y On, y está más profundamente vinculada a la lógica de datos.

Métodos de optimización

A continuación se enumeran algunos métodos e ideas. Simplemente consulte el sitio web oficial para conocer los parámetros y el uso específicos.

Descripción

8.