Sparkrdd analiza el código fuente
RDD y marco de datos
Marco de datos RDD
La figura anterior refleja intuitivamente la diferencia entre DataFrame y RDD. Aunque el RDD [Persona] de la izquierda toma Persona como parámetro de tipo, el marco Spark en sí no lo comprende.
La estructura interna de la clase Persona. El marco de datos de la derecha proporciona información estructural detallada, lo que permite a Spark
SQL saber claramente qué columnas están contenidas en el conjunto de datos y cuál es el nombre y tipo de cada columna. DataFrame tiene más información sobre la estructura de datos, es decir, el esquema. Se distribuye RDD.
Una colección de objetos Java. Un marco de datos es una colección de objetos de fila distribuidos. Además de proporcionar operadores más completos que RDD, DataFrame también tiene una característica más importante que mejora la eficiencia de ejecución.
Reduzca la lectura de datos y optimice los planes de ejecución, como la inserción de filtros, el corte, etc.
Mejorar la eficiencia de ejecución
RDD
La API es funcional y enfatiza la inmutabilidad. En la mayoría de los casos tiende a crear nuevos objetos en lugar de modificar los antiguos. Aunque esta característica brinda una API limpia y ordenada, también permite que se ejecuten aplicaciones Spark.
Las fechas de fila tienden a crear una gran cantidad de objetos temporales, lo que ejerce presión sobre el GC. Sobre la base de la API
RDD existente, por supuesto, puede usar el método mapPartitions para sobrecargar el método de creación de datos en un único fragmento RDD, reutilizando objetos variables para reducir la asignación de objetos y GC.
Gastos generales, pero esto sacrifica la legibilidad del código y requiere que los desarrolladores tengan cierta comprensión del mecanismo de tiempo de ejecución de Spark, y el umbral es alto. Spark, por otro lado,
SQL reutiliza objetos dentro del marco tanto como sea posible, lo que rompe la inmutabilidad internamente, pero cuando los datos se devuelven al usuario, se convierten en datos inmutables. Desarrollado con
API DataFrame, puede disfrutar eficazmente de estos efectos de optimización.
Reducir las lecturas de datos
La forma más rápida de analizar big data es ignorarlo. "Ignorar" aquí no significa hacer la vista gorda, sino podar adecuadamente de acuerdo con las condiciones de la consulta.
La poda de partición mencionada anteriormente cuando se analizan las tablas particionadas es una de ellas: cuando las condiciones del filtro de consulta involucran columnas de partición, podemos usar las condiciones de consulta para particionar directorios que definitivamente no contienen los datos de destino. reducir la IO.
Para algunos formatos de datos "inteligentes", Spark
SQL también puede podar en función de las estadísticas adjuntas al archivo de datos. En pocas palabras, en este formato de datos, los datos se guardan en segmentos y cada segmento de datos tiene un valor máximo, un valor mínimo, el número de valores nulos, etc.
Algunas estadísticas básicas. Cuando un determinado segmento de datos en el nombre de la tabla de información estadística claramente no contiene datos de destino que cumplan con las condiciones de la consulta, puede omitir directamente el segmento de datos (por ejemplo, el valor máximo de un determinado segmento en la columna de enteros A es 100 y consulta.
Las condiciones de consulta requieren más de 200).
Además, Spark SQL también puede aprovechar al máximo las ventajas de los formatos de almacenamiento de columnas como RCFile, ORC y Parquet para escanear únicamente. las columnas realmente involucradas en la consulta e ignorar los datos de otras columnas.
Realización de la optimización
Ejemplo de análisis de datos de población
Para ilustrar la optimización de consultas, veamos el ejemplo de análisis de datos de población que se muestra en la imagen de arriba. Construya dos marcos de datos para concatenar el gráfico y, después de concatenarlos, realice la operación de filtrado nuevamente. Por ejemplo
Si el plan de implementación se implementa sin cambios, la eficiencia de la implementación final no será alta. Dado que la unión es una operación costosa, también puede producir conjuntos de datos más grandes.
Si pudiéramos filtrar
por debajo de la unión, filtrando primero el marco de datos y luego uniendo el conjunto de resultados más pequeño filtrado, podríamos acortar efectivamente el tiempo de ejecución. Y el optimizador de consultas de Spark
SQL hace esto. En resumen, la optimización de planes de consultas lógicas es el proceso de reemplazar operaciones de alto costo por operaciones de bajo costo mediante transformaciones equivalentes basadas en álgebra relacional.
En el proceso de convertir el plan de ejecución optimizado en un plan de ejecución física, las condiciones de filtrado se pueden enviar a la fuente de datos de acuerdo con las características de la fuente de datos específica. El filtro desaparece del plan de ejecución física situado más a la derecha porque se disuelve en el nodo de exploración de la tabla que realiza la operación de lectura final.
Para el desarrollador promedio, el objetivo de un optimizador de consultas es que incluso una consulta subóptima escrita por un programador sin experiencia puede convertirse en una forma eficiente y ejecutarse tanto como sea posible.
RDD y conjuntos de datos
Los conjuntos de datos están representados por planes de ejecución lógica de Catalyst y los datos se almacenan en forma binaria codificada, por lo que operaciones como ordenar y barajar se pueden realizar sin deserialización.
La creación de conjuntos de datos requiere un codificador explícito, que serializa objetos a binario y asigna el esquema del objeto a tipos SparkSQl. Sin embargo, RDD se basa en un mecanismo de reflexión en tiempo de ejecución.
A través de los dos puntos anteriores, el rendimiento del conjunto de datos es mucho mejor que el de RDD.
Marco de datos y conjunto de datos
El conjunto de datos se puede considerar como un caso especial de DataFrame. La principal diferencia es que cada registro del conjunto de datos almacena un valor fuertemente tipado en lugar de una fila. Por lo tanto, tiene las siguientes tres características:
Los conjuntos de datos se pueden verificar en tiempo de compilación.
Y es una interfaz de programación orientada a objetos. Tome el recuento de palabras como ejemplo:
//Marco de datos
//Cargue un archivo de texto e interprete cada línea como java.lang.String
val ds = sqlcontext leer .text("/home/spark/1.6/lines "). como [cadena]
val resultado = ds
. Plan(_. Split(" ") // Dividir por espacios en blanco
. Filter(_! = "") // Filtrar palabras vacías
. toDF() // Convertir para DataFrame para realizar agregación/clasificación
//Cuenta el número de apariciones de cada palabra
agg(count("* ") as " numOccurances ")
Las versiones posteriores de DataFrame heredarán DataSet, y DataFrame es la interfaz de Spark SQL
val word count=
ds.flatMap(_. split(" ")
.filter(_!= "")
.groupBy(_.toLowerCase()) //Pasamos una función lambda en lugar de una agrupación de expresiones por columnas (es decir, $ "valor ")
. Count()
DataFrame y DataSet se pueden convertir entre sí, df.as[ElementType] puede convertir DataFrame en DataSet y ds.toDF() puede convertir Convert DataSet en marco de datos