Pequeño resumen de tres tipos de operadores de chispa
De hecho, quería explicar Spark hace mucho tiempo, pero no pude explicarlo sistemáticamente. Ahora todavía quiero hablar brevemente sobre mi propia comprensión de Spark.
En comparación con mapreduce, el modelo informático de Spark puede proporcionar funciones más poderosas. Utiliza un modelo iterativo. Después de procesar una etapa, podemos continuar procesando muchas etapas, no solo como mapreduce, solo hay dos etapas.
Spark se divide aproximadamente en tres tipos de operadores:
1. Operador de transformación de tipo de datos Valor. Esta transformación no activa el envío de trabajos. Los elementos de datos procesados son de Valor. tipo.
? Aquí describiré 9 operadores: mapa, mapa plano, glom, unión, cartesiano (operación cartesiana), grupo por, filtro, distinto (eliminación de duplicaciones) y resta.
2. Operador de transformación de tipo de datos Key-Value. Esta transformación no activa el envío del trabajo. Los elementos de datos procesados son datos de tipo Key-Value.
En cuanto a los operadores clave-valor, explicaré brevemente mapValues, combineByKey, reduceByKey, particionBy, cogroup, join, leftOutJoin y rightOutJoin.
3. Operador de acción, este tipo de operador activará SparkContext para enviar el trabajo.
Para los operadores de acción, foreach, recopilar, recopilarAsMap, reducirByKeyLocally, buscar, contar, rematar,
reducir, doblar, agregar. Eso es más o menos.
1. Operador de transformación del tipo de datos Valor
1) map
val a = sc.parallelize(List("dog", "salmon", " salmón", "rata", "elefante"), 3)
//rdd tiene 5 elementos, divídelos en 3 particiones
val b = a.map( _.length )//Utilice el método de paralelización para importar datos
val c = a.zip(b)
c?.collect
res0: Array[( String, Int)] = Array((perro, 3), (salmón, 6), (salmón, 6), (rata, 3), (elefante, 8))
El mapa es para mapear las operaciones Entrar cada valor.
El diagrama anterior es:
2) flatMap
val a = sc.parallelize(1 a 10, 5)?
//rdd tiene 10 elementos, divide del 1 al 10 en 5 particiones
a.flatMap(1 a _).collect
res47: Array[Int] = Array(1, 1 , 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6 , 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6 , 7, 8, 9, 10)
// Cada elemento de entrada se puede asignar a 0 o más elementos de salida, y el resultado finalmente se "aplana" y se genera
sc. paralelizar(Lista(1, 2, 3), 2).flatMap(x =gt; Lista(x, x, x)).collect
res85: Array[Int] = Array(1, 1 , 1, 2, 2, 2, 3, 3, 3)
flatMap convierte un valor en una matriz y luego lo divide.
El diagrama de operación de este ejemplo es:
3) restar
?val a = sc.parallelize(1 a 9, 3)
? val b = sc.parallelize(1 a 3, 3)
? val c = a.subtract(b)
c.collect
? res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)
Dibuje un diagrama para este ejemplo:
4) glom
p>val a = sc.parallelize(1 a 100, 3)
a.glom.collect
res8: Array[Array[Int]] = Matriz(Matriz (1, 2, 3,..., 33), Matriz(34, 35,..., 65, 66), Matriz(67,..., 100))
Para este ejemplo Dibuja un diagrama:
5) unión
?val a = sc.parallelize(1 a 3, 1)
?val b = sc .parallelize( 5 a 7, 1)
?(a b).collect
?res0: Array[Int] = Array(1, 2, 3, 5, 6, 7 ) p>
Dibuje un diagrama para este ejemplo:
6) cartesiano (operación cartesiana)
val x =sc.parallelize(List(1, 2, 3, 4, 5))
? val y =sc.parallelize(List(6, 7, 8, 9, 10))
? recoger
? res0: Matriz[(Int, Int)] =Matriz((1,6), (1,7), (1,8), (1,9), (1,10 ), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3 ,9),(3,10), (4,6), (5,6), (4,7), (5,7),
(4,8), ( 5, 8), (4,9), (4,10), (5,9), (5,10))
Dibuja un diagrama para este ejemplo:
7) groupBy (genera las claves correspondientes, junta las mismas claves)
?val a = sc.parallelize(1 a 9, 3)
?a.groupBy(x =gt ; { if (x 2 == 0)??"par" else "impar" }).collect
?res42: Array[(String, Seq[Int])] =Array((eve
n, ArrayBuffer(2, 4, 6, 8)), (impar, ArrayBuffer(1, 3, 5, 7, 9)))
Dibuja un diagrama para este ejemplo:
8) filtro
?val a = sc.parallelize(1 a 10, 3)
?val b = a.filter(_ 2 == 0)
?b.collect
?res3: Array[Int] = Array(2, 4, 6, 8, 10)
Dibuja un diagrama para este ejemplo:
9) distinto (eliminación)
val c =sc.parallelize(List("Gnu", "Gato", "Rata", "Perro", "Gnu", " Rata "), 2)
RDD tiene 6 elementos, divida estos seis elementos en 2 particiones
c.distinct.collect//Utilice la función distinta para elementos recurrentes Eliminar y luego formar una array
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)
Dibuja un diagrama para este ejemplo:
II , Operador de transformación del tipo de datos Key-Value
?1) mapValues
val a = sc.parallelize(List("perro", "tigre", "león", "gato", " pantera", "águila"), 2)
//RDD tiene 6 elementos, es decir, perro, león, gato..., divídelos en 2 particiones
val b = a .map(x =gt; (x.length, x))
b.mapValues("x" _ "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
Dibujar un diagrama para este ejemplo: 2) combineByKey
? val a =sc.parallelize(List("perro", "gato", "gnu", "salmón", "conejo", "pavo", "lobo", "oso", "abeja"), 3)
? val b = sc.parallelize(List(1, 1, 2, 2, 2, 1, 2, 2, 2) , 3)
? val c = b.zip(a)
val d = c.combineByKey(List(_), (x: Lista[Cadena], y: Cadena) =gt; y:: x, (x: Lista[Cadena], y: Lista[Cadena]) =gt::: y;
)
? d.collect
? res16: Array[(Int, List[String])] = Array((1, List(gato, perro, pavo)), ( 2. List(ñu, conejo, salmón, abeja, oso, lobo)))
? El diagrama de operación de este ejemplo es: 3) reduceByKey
val a = sc. paralelizar (List("perro", "tigre", "perro", "gato", "perro", "águila", "gato"), 2)
//rdd tiene 7 elementos, Dividirlos en 2 particiones
val b = a.map(x =gt; (x.length, x))
? // ¿Cómo usar reduceByKey(_ _)
? res87: Array[(Int, String)] = Array((3, perro), (1, tigre), (2, gato), ( 1, águila))
? El diagrama de operación de este ejemplo es: 4) particiónPor
Para los elementos KV en dos RDD, los elementos en la misma clave en cada RDD se dividen en Agregado en una colección. A diferencia de reduceByKey, fusiona elementos con la misma clave en dos RDD
5) cogrupo
Para elementos KV en dos RDD, cada RDD tiene la misma clave Los elementos se agregan en un colocar.
La diferencia con reduceByKey es fusionar elementos con la misma clave en dos RDD
? val a = sc.parallelize(List(1, 2, 1, 3), 1)
? val b = a.map((_, "b"))
? val c = a.map((_, "c"))
b. (c).collect
? res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2, (ArrayBuffer(b), ArrayBuffer(c)) ), (3, (ArrayBuffer(b), ArrayBuffer(c))), (1, (ArrayBuffer(b, b), ArrayBuffer(c, c))))
? El diagrama de operación es: 6) unirse
? val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
? val b = a.keyBy(_.length)
? val c =sc.parallelize(List("perro","gato","gnu","salmón"," conejo", "pavo", "lobo", "oso", "abeja"), 3)
? val d = c.keyBy(_.length)
? b .join(d).collect
? res0: Array[(Int, (String, String))] = Array((6, (salmón, salmón)), (6, (salmón, conejo) ), (6, (salmón, pavo)), (6, (salmón, salmón)), (6, (salmón, conejo)), (6, (salmón, pavo)), (3, (perro, perro) ), (3, (perro, gato)), (3, (perro, ñu)), (3, (perro, abeja)), (3, (rata, perro)), (3, (rata, gato) ), (3, (rata, gnu)), (3, (rata, abeja)))
? El diagrama de funcionamiento de este ejemplo es: 7) leftOutJoin
Cambiar el lado izquierdo de IZQUIERDA Se conservan todos los registros del nombre de la tabla 1 y se muestran los registros correspondientes al campo B en el nombre de la tabla 2 a la derecha y al nombre de la tabla 1.campo A
val a = sc.parallelize ( List("perro", "salmón", "salmón", "rata", "elefante"), 3)
val b = a.keyBy(_.length)
? val c =sc.parallelize(List("perro","gato","ñu","salmón","conejo","pavo","wo
lf", "oso", "abeja"), 3)
? val d = c.keyBy(_.length)
? b.leftOuterJoin(d).collect
? res1: Array[(Int, (String, Option[String]))] = Array((6, (salmón, Algunos(salmón))), (6, (salmón, Algunos(conejo) )), (6, (salmón, algunos (pavo))), (6, (salmón, algunos (salmón))), (6, (salmón, algunos (conejo))), (6, (salmón, algunos ( pavo))), (3, (perro, Algunos(perro))), (3, (perro, Algunos(gato))), (3, (perro, Algunos(gnu))), (3, (perro, Algunos (abeja))), (3, (rata, Algunos (perro))), (3, (rata, Algunos (gato))), (3, (rata, Algunos (ñu))), (3, ( rat, Some(bee))), (8, (elefante, None)))
? El diagrama de operación de este ejemplo es: 8) rightOutJoin (unión externa derecha)
? val a = sc.parallelize(List("perro", "salmón", "salmón", "rata", "elefante"), 3)
val b = a.keyBy(_. longitud )
? val c =sc.parallelize(List("perro", "gato", "ñu", "salmón", "conejo", "pavo", "lobo", "oso" , "abeja"), 3)
? val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
? res2: Array[(Int, (Option[String], String))] = Array((6, (Algunos (salmón), salmón)), (6, (Algunos (salmón), conejo)), (6, ( Algunos (salmón), pavo)), (6, (Algunos (salmón), salmón)), (6, (Algunos (salmón), conejo)), (6, (Algunos (salmón), pavo)), ( 3 , (Algunos(perro), perro)), (3, (Algunos(perro), gato)), (3, (Algunos(perro), ñu)), (3, (Algunos(perro), abeja)) , (3, (Algunos (rata), perro)), (3, (Algunos (rata), gato)), (3, (Algunos (rata), ñu)), (3, (Algunos (rata), abeja ) ), (4, (Ninguno, lobo)), (4, (Ninguno, oso)))
? El diagrama de operación de este ejemplo es:
3.
1) para cada
val c = sc.parallelize(List("gato", "perro", "tigre", "león", "ñu", "cocodrilo", "hormiga", "ballena", "delfín", "araña "), 3)//Utilice el método de paralelización para importar datos
c.foreach(x =gt; println(x "s son deliciosos"))//Procese un dato después de obtener un dato data
El diagrama de operación de este ejemplo es: 2) fold
val a = sc.parallelize(List(1, 2, 3), 3)
a.fold(0 )(_ _)
res59: Int = 6
Dibuja un diagrama para este ejemplo:
?3) agregado p>
val z = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
// primero imprimamos el contenido del RDD con las etiquetas de partición
def myfunc(index: Int, iter: Iterador[(Int)]): Iterador[Cadena] = {
iter.toList.map(x =gt; "[partID : " index ", val : " x "]").iterator
}
z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String ] = Array([partID:0, val: 1], [partID: 0, val: 2], [partID: 0, val: 3], [partID: 1, val: 4], [partID: 1, val : 5], [partID: 1, val: 6])
z.aggregate(0)(math.max(_, _), _ _)
res40: Int = 9
Dibuje un diagrama para este ejemplo: 4) recolectar
? val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog) ", "Gnu", "Rat"), 2)
? c.collect ?//Combinando dos particiones en una mediante el operador de recopilación
? res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
Dibuje un diagrama para este ejemplo: 5) CollectAsMap
En RDD
Si hay varios Valores en la misma Clave, el Valor posterior sobrescribirá el Valor anterior. El resultado final es que la Clave es única y corresponde a un Valor
val a = sc.parallelize(List. (1, 2, 1, 3), 1)
val b = a.zip(a)
b.collectAsMap
res1: scala.collection .Map [Int, Int] = Map(2 -gt; 2, 1 -gt; 1, 3 -gt; 3)
Dibuja un diagrama para este ejemplo: 6) reduceByKeyLocally
Esta función opera el valor V correspondiente a cada K en RDD [K, V] de acuerdo con la función de mapeo, y el resultado de la operación se asigna a un Mapa [K, V] en lugar de RDD [K, V].
val a =sc.parallelize(List("perro", "gato", "búho", "ñu", "hormiga"), 2)
val b = a .map(x =gt; (x.length, x))
b.reduceByKey(_ _).collect
res86: Array[(Int, String)] =Array ((3, dogcatowlgnuant))
Dibuja un diagrama para este ejemplo: 7) lookup
val a =sc.parallelize(List("perro", "tigre", "león ", "gato", "pantera", "águila"), 2)
val b = a.map(x =gt; (x.length, x))
b.lookup(5)
res0: Seq[String] = WrappedArray(tigre, eagle)
Dibuja un diagrama para este ejemplo: 8) count
val c = sc.parallelize(List("Gnu", "Gato", "Rata", "Perro"), 2)
c.count
res2: Long = 4
Dibuja un diagrama para este ejemplo: 9) top?
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)
res28: Array[Int] = Array(9, 8)
Dibuja un diagrama para este ejemplo: 10) reducir
val a = sc.parallelize(1 a 100, 3)
a.reduce(_ _)
res41: Int = 5050
Dibuje un diagrama para este ejemplo:
Este artículo es el resultado de una semana de escritura. No estoy muy familiarizado con él. Necesito entender un operador a la vez. muy preciso, pero está dentro del alcance de mi comprensión. Es difícil encontrar conocimiento sobre esto en el sitio web, por lo que la mayor parte se basa en el autoestudio. Mi comprensión de mapreduce y spark se limita a hacer referencia al conocimiento en línea más algunas de mis propias opiniones.
------------------------------------------------- ---- Referencia: blogs.com/MOBIN/p/5414490.html#12------------------------------- --- --------------------