Qué archivo de encabezado usar en Scala para manejar el tipo RDD
1. Introducción a RDD:
RDD, Resilient Distributed Dataset, es una colección distribuida de elementos. En Spark, todas las operaciones de datos implican la creación de RDD, la transformación de RDD existentes y la llamada de valores de operación de RDD. Detrás de todas estas operaciones, Spark distribuye automáticamente los datos en el RDD en todo el clúster y paraleliza las operaciones.
Un RDD en Spark es una colección inmutable de objetos distribuidos. Cada RDD se divide en particiones que se ejecutan en diferentes nodos del clúster y puede contener cualquier tipo de objeto desde Python, Java, Scala o incluso objetos definidos por el usuario.
Los usuarios pueden crear RDD de dos maneras: leyendo un conjunto de datos externos o distribuyendo una colección de objetos en el controlador (como una lista o un conjunto) dentro del controlador.
Las operaciones de transformación en RDD se evalúan de forma diferida, lo que significa que cuando llamamos a una operación de transformación en un RDD, la operación no se ejecuta inmediatamente. En cambio, Spark registrará internamente información sobre la operación solicitada. En lugar de pensar en los RDD como conjuntos de datos que contienen datos específicos, piense en cada RDD como una lista de instrucciones sobre cómo calcular los datos que construimos mediante operaciones de transformación. Las operaciones de lectura de datos en RDD también son lentas; los datos solo se leen cuando es necesario. Tanto las operaciones de conversión como las de lectura se pueden ejecutar varias veces.
2. Crear conjunto de datos RDD
(1) Leer conjunto de datos externos
val input = sc.textFile(inputFileDir)
(2) Distribuya colecciones de objetos, tomando la lista como ejemplo
vallines = sc.textFile(inputFileDir)
(3) Distribuya conjuntos de datos RDD a RDD
(4) Distribuya el conjunto de datos RDD en RDD. paraleloize(List("hola mundo", "esto es una prueba"));
3 operación RDD
(1) Operación de conversión
Para ser. operación de conversión de filtro implementada:
vallines =sc.parallelize(List(" error:a", "error:b", "error:c", "test"));
val errores=lines.filter(line => line.contains("error"));
errors.collect().foreach(println);
Salida:
error:a
error:b
error:c
Como puede ver, la lista contiene la palabra error Las entradas de la tabla han sido filtrados correctamente. filter(line => line. contains("advertencias"));
val unionLines =errors.union(advertencias);
unionLines.collect().foreach(println);
Salida:
error:a
error:b
error:c
advertencia:a
Como puede ver, todos los elementos de error y advertencia de la lista original se han filtrado.
(3) Obtener algunos o todos los elementos del conjunto de datos RDD
① Obtener algunos elementos del conjunto de datos RDD .take(int num) ?Lista de valores de retorno
Obtenga los primeros elementos numéricos en el conjunto de datos RDD.
/**
* Obtiene los primeros elementos de RDD. Actualmente, esta operación * escanea las particiones una por una, por lo que
* será lenta si se requiere una gran cantidad de particiones. En este caso, utilice Collect() para obtener el RDD completo: JList[T]
Programa de muestra: Pickup
unionLines.take(2).foreach(println);< / p>
Salida:
error:a
error:b
Como puede ver, genera los 2 elementos principales de unionLines del conjunto de datos RDD p>
② Obtenga el conjunto de datos RDD. Collect() devuelve una lista de valores
Programa de ejemplo:
val all = unionLines.collect();
all.foreach(println) ;
Recorre cada elemento de unionLines en el conjunto de datos RDD de salida
4. Pasa la función a Spark
En Scala, podemos incorporar las funciones definidas, Las referencias a métodos o métodos estáticos se pasan a Spark, al igual que otras API funcionales en Scala. Hay algunos otros detalles que debemos considerar, incluido que la función pasada y los datos a los que hace referencia deben ser serializables (implemente la interfaz serializable de Java). Además, de manera similar a Python, pasar el método o campo de un objeto también contendrá una referencia al objeto completo. Podemos evitar que todo el objeto contenga el campo requerido colocándolo en una variable local.
clase searchFunctions (val consulta:String){
def isMatch(s: String):Boolean = {
s.contains(query)
}
def getMatchFunctionReference(rdd: RDD[String]) :RDD[String] = {
// Problema: isMach representa esto. isMatch, por lo que necesitamos pasar todo this
rdd.filter(isMatch)
}
def getMatchesFunctionReference(rdd: RDD[String ]) :RDD[String] = {
// Problema: la consulta representa esto.
flatMap(line=>line.split(query))
}
def getMatchesNoReference(rdd:RDD[String]):RDD[String] ={
//es seguro tomar solo los campos que necesitamos en las variables locales
val query1=this.query;
rdd.flatMap(x=>x.split(query1) p> p>
)
}
}
5. Operación de transformación para cada elemento:
Mapa de operación de transformación( ) toma una función, la aplica a cada elemento del RDD y utiliza el resultado devuelto por la función como el elemento correspondiente en el RDD resultante. Palabras clave: transformar
La operación de transformación filter() recibe una función y coloca los elementos en el RDD que cumplen con los requisitos de la función en el nuevo RDD devuelto. Palabras clave: filtro
El diagrama de ejemplo es el siguiente:
①map()
Calcular el cuadrado de cada valor en RDD
val rdd=sc.parallelize(List(1,2,3,4));
val resultado=rdd.map(valor => ; valor*valor);
println( result.collect().mkString(","));
Salida:
1,4,9,16
filtro()
②?Eliminar el elemento con valor 1 en la colección RDD:
val rdd=sc.parallelize(List(1,2,3,4));
val resultado=rdd.filter(valor => valor!=1);
println(result. Collect().mkString(","));
Resultado: p >
Resultado:
2,3,4
También podemos usar la función de transferencia, por ejemplo:
Función:
def filterFunction(valor:Int):Booleano = {
valor! =1
}
Uso:
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.filter(filterFunction);
println(result.collect().mkString(","));
③ A veces, queremos ingresar elementos generar múltiples elementos de salida. La operación que logra esto se llama flatMap(). Similar a map(), la función que proporcionamos para flatMap() se aplicará a cada elemento del RDD de entrada individualmente. Sin embargo, lo que se devuelve no es un elemento, sino un iterador que devuelve una secuencia de valores. El RDD de salida no está compuesto de iteradores. Lo que obtenemos es un RDD que contiene todos los elementos accesibles para cada iterador.
Un uso simple de flatMap() es dividir la cadena de entrada en palabras, como esta:?
val rdd=sc.parallelize(List("Hola mundo", "hola tú", "mundo que amo usted"));
val result=rdd.flatMap(line => line.split(" "));
println(result.collect().mkString(" \ n"));
Salida:
hola
mundo
hola
tú
mundo
te
te amo
6. Establecer operaciones
En RDD Establecer operaciones
Funciones
Utilice
RDD1.distinct()
para generar un nuevo RDD que contenga solo elementos distintos.
RDD1.union(RDD2)
Devuelve un RDD que contiene todos los elementos de los dos RDD
RDD1.intersection(RDD2)
Devuelve solo elementos que existen en ambos RDD
RDD1.substr(RDD2)
Devuelve un RDD que solo contiene todos los elementos en el primer RDD pero no todos los elementos en el segundo RDD, se requieren mashups de datos.
Operaciones de conjuntos en conjuntos cartesianos:
RDD1.cartesian(RDD2)
Devuelve el conjunto cartesiano de dos conjuntos de datos RDD
Programa ejemplo: generar un conjunto cartesiano de conjuntos RDD {1,2} y {1,2}
val rdd1=sc.parallelize(List(1,2));
val rdd2=sc.parallelize(List(1,2));
val rdd=rdd1.cartesian(rdd2);
println(rdd.mkString("/n ")) ;
Salida:
(1,1)
(1,2)
(2,1)
(2,2)
(2,2)
7. Operación
(1) Restaurar operación
Reducir () recibe una función como parámetro, que procesará datos de tipos de elementos en dos RDD y devolverá nuevos elementos del mismo tipo. Un ejemplo simple es la función +, que puede usarse para acumular nuestro RDD. Con reduce(), puede calcular fácilmente la suma de todos los elementos en un RDD, la cantidad de elementos y otros tipos de operaciones de agregación.
El siguiente es un ejemplo de un programa que encuentra la suma de todos los elementos de un conjunto de datos RDD:
val rdd=sc.parallelize(List(1,2,3, 4,5,6, 7,8,9,10));
val resultados=rdd.reduce((x,y) =>x+y);
println(resultados);
Salida: 55
(2) La operación fold()
recibe una función con la misma firma que la recibida por reduce(), más un valor inicial, como resultado de la primera llamada para cada partición. El valor inicial que proporcione debe ser el elemento unitario de la operación que proporcione, es decir, múltiples cálculos de este valor inicial utilizando su función no cambiarán el resultado (por ejemplo, + corresponde a 0, * corresponde a 1, o la operación de concatenación corresponde a la lista vacía).
Ejemplo de programa:
①Calcule la suma de todos los elementos en el conjunto de datos RDD:
zeroValue=0;//El valor inicial al sumar es 0.
p>val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val resultados= rdd.fold( 0)((x,y) =>x+y);
println(results);
② Calcula el producto de todos los elementos en el conjunto de datos RDD :
zeroValue=1;//Para encontrar el producto, el valor inicial es 1.
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val resultados=rdd. fold(1)((x,y) =>x*y);
println(resultados);
(3) Operación de agregación ()
El tipo de valor de retorno de la función agregada() no tiene que ser el mismo que el tipo de RDD en el que se opera.
De manera similar a fold(), cuando usamos agregado() necesitamos proporcionar un valor inicial para el tipo de retorno que esperamos. A continuación, los elementos del RDD se fusionan en el acumulador mediante una función. Dado que cada nodo se acumula localmente, en última instancia es necesario proporcionar una segunda función para combinar los acumuladores par por par.
El siguiente es un programa de muestra:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10) ) ;
val resultado=rdd.aggregate((0,0))(
(acc,valor) =& gt;(acc._1+valor,acc._2+ 1 ),
(acc1,acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)
)
val promedio =result._1/result._2;
println(average)
Salida: 5
El retorno final es una Tupla2
Tabla: Operaciones básicas de RDD realizadas en RDD que contienen datos {1,2,3,3}
Resultados del ejemplo de uso del nombre de función
collect() Devuelve todo elementos de RDD rdd. .count() 4
countByValue() El número de veces que aparece cada elemento en RDD rdd.countByValue() {(1,1),
(2,1),
(3,2)
}