Red de conocimiento informático - Problemas con los teléfonos móviles - Pequeño resumen de tres tipos de operadores de chispa

Pequeño resumen de tres tipos de operadores de chispa

De hecho, quería explicar Spark hace mucho tiempo, pero no pude explicarlo sistemáticamente. Ahora todavía quiero hablar brevemente sobre mi propia comprensión de Spark.

En comparación con mapreduce, el modelo informático de Spark puede proporcionar funciones más poderosas. Utiliza un modelo iterativo. Después de procesar una etapa, podemos continuar procesando muchas etapas, no solo como mapreduce, solo hay dos etapas.

Spark se divide aproximadamente en tres tipos de operadores:

1. Operador de transformación de tipo de datos Valor. Esta transformación no activa el envío de trabajos. Los elementos de datos procesados ​​son de Valor. tipo.

? Aquí describiré 9 operadores: mapa, mapa plano, glom, unión, cartesiano (operación cartesiana), grupo por, filtro, distinto (eliminación de duplicaciones) y resta.

2. Operador de transformación de tipo de datos Key-Value. Esta transformación no activa el envío del trabajo. Los elementos de datos procesados ​​son datos de tipo Key-Value.

En cuanto a los operadores clave-valor, explicaré brevemente mapValues, combineByKey, reduceByKey, particionBy, cogroup, join, leftOutJoin y rightOutJoin.

3. Operador de acción, este tipo de operador activará SparkContext para enviar el trabajo.

Para los operadores de acción, foreach, recopilar, recopilarAsMap, reducirByKeyLocally, buscar, contar, rematar,

reducir, doblar, agregar. Eso es más o menos.

1. Operador de transformación del tipo de datos Valor

1) map

val a = sc.parallelize(List("dog", "salmon", " salmón", "rata", "elefante"), 3)

//rdd tiene 5 elementos, divídelos en 3 particiones

val b = a.map( _.length )//Utilice el método de paralelización para importar datos

val c = a.zip(b)

c?.collect

res0: Array[( String, Int)] = Array((perro, 3), (salmón, 6), (salmón, 6), (rata, 3), (elefante, 8))

El mapa es para mapear las operaciones Entrar cada valor.

El diagrama anterior es:

2) flatMap

val a = sc.parallelize(1 a 10, 5)?

//rdd tiene 10 elementos, divide del 1 al 10 en 5 particiones

a.flatMap(1 a _).collect

res47: Array[Int] = Array(1, 1 , 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6 , 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6 , 7, 8, 9, 10)

// Cada elemento de entrada se puede asignar a 0 o más elementos de salida, y el resultado finalmente se "aplana" y se genera

sc. paralelizar(Lista(1, 2, 3), 2).flatMap(x =gt; Lista(x, x, x)).collect

res85: Array[Int] = Array(1, 1 , 1, 2, 2, 2, 3, 3, 3)

flatMap convierte un valor en una matriz y luego lo divide.

El diagrama de operación de este ejemplo es:

3) restar

?val a = sc.parallelize(1 a 9, 3)

? val b = sc.parallelize(1 a 3, 3)

? val c = a.subtract(b)

c.collect

? res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)

Dibuje un diagrama para este ejemplo:

4) glom

p>

val a = sc.parallelize(1 a 100, 3)

a.glom.collect

res8: Array[Array[Int]] = Matriz(Matriz (1, 2, 3,..., 33), Matriz(34, 35,..., 65, 66), Matriz(67,..., 100))

Para este ejemplo Dibuja un diagrama:

5) unión

?val a = sc.parallelize(1 a 3, 1)

?val b = sc .parallelize( 5 a 7, 1)

?(a b).collect

?res0: Array[Int] = Array(1, 2, 3, 5, 6, 7 )

Dibuje un diagrama para este ejemplo:

6) cartesiano (operación cartesiana)

val x =sc.parallelize(List(1, 2, 3, 4, 5))

? val y =sc.parallelize(List(6, 7, 8, 9, 10))

? recoger

? res0: Matriz[(Int, Int)] =Matriz((1,6), (1,7), (1,8), (1,9), (1,10 ), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3 ,9),(3,10), (4,6), (5,6), (4,7), (5,7),

(4,8), ( 5, 8), (4,9), (4,10), (5,9), (5,10))

Dibuja un diagrama para este ejemplo:

7) groupBy (genera las claves correspondientes, junta las mismas claves)

?val a = sc.parallelize(1 a 9, 3)

?a.groupBy(x =gt ; { if (x 2 == 0)??"par" else "impar" }).collect

?res42: Array[(String, Seq[Int])] =Array((eve

n, ArrayBuffer(2, 4, 6, 8)), (impar, ArrayBuffer(1, 3, 5, 7, 9)))

Dibuja un diagrama para este ejemplo:

8) filtro

?val a = sc.parallelize(1 a 10, 3)

?val b = a.filter(_ 2 == 0)

?b.collect

?res3: Array[Int] = Array(2, 4, 6, 8, 10)

Dibuja un diagrama para este ejemplo:

9) distinto (eliminación)

val c =sc.parallelize(List("Gnu", "Gato", "Rata", "Perro", "Gnu", " Rata "), 2)

RDD tiene 6 elementos, divida estos seis elementos en 2 particiones

c.distinct.collect//Utilice la función distinta para elementos recurrentes Eliminar y luego formar una array

res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

Dibuja un diagrama para este ejemplo:

II , Operador de transformación del tipo de datos Key-Value

?1) mapValues

val a = sc.parallelize(List("perro", "tigre", "león", "gato", " pantera", "águila"), 2)

//RDD tiene 6 elementos, es decir, perro, león, gato..., divídelos en 2 particiones

val b = a .map(x =gt; (x.length, x))

b.mapValues("x" _ "x").collect

res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

Dibujar un diagrama para este ejemplo: 2) combineByKey

? val a =sc.parallelize(List("perro", "gato", "gnu", "salmón", "conejo", "pavo", "lobo", "oso", "abeja"), 3)

? val b = sc.parallelize(List(1, 1, 2, 2, 2, 1, 2, 2, 2) , 3)

? val c = b.zip(a)

val d = c.combineByKey(List(_), (x: Lista[Cadena], y: Cadena) =gt; y:: x, (x: Lista[Cadena], y: Lista[Cadena]) =gt::: y;

)

? d.collect

? res16: Array[(Int, List[String])] = Array((1, List(gato, perro, pavo)), ( 2. List(ñu, conejo, salmón, abeja, oso, lobo)))

? El diagrama de operación de este ejemplo es: 3) reduceByKey

val a = sc. paralelizar (List("perro", "tigre", "perro", "gato", "perro", "águila", "gato"), 2)

//rdd tiene 7 elementos, Dividirlos en 2 particiones

val b = a.map(x =gt; (x.length, x))

? // ¿Cómo usar reduceByKey(_ _)

? res87: Array[(Int, String)] = Array((3, perro), (1, tigre), (2, gato), ( 1, águila))

? El diagrama de operación de este ejemplo es: 4) particiónPor

Para los elementos KV en dos RDD, los elementos en la misma clave en cada RDD se dividen en Agregado en una colección. A diferencia de reduceByKey, fusiona elementos con la misma clave en dos RDD

5) cogrupo

Para elementos KV en dos RDD, cada RDD tiene la misma clave Los elementos se agregan en un colocar.

La diferencia con reduceByKey es fusionar elementos con la misma clave en dos RDD

? val a = sc.parallelize(List(1, 2, 1, 3), 1)

? val b = a.map((_, "b"))

? val c = a.map((_, "c"))

b. (c).collect

? res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2, (ArrayBuffer(b), ArrayBuffer(c)) ), (3, (ArrayBuffer(b), ArrayBuffer(c))), (1, (ArrayBuffer(b, b), ArrayBuffer(c, c))))

? El diagrama de operación es: 6) unirse

? val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

? val b = a.keyBy(_.length)

? val c =sc.parallelize(List("perro","gato","gnu","salmón"," conejo", "pavo", "lobo", "oso", "abeja"), 3)

? val d = c.keyBy(_.length)

? b .join(d).collect

? res0: Array[(Int, (String, String))] = Array((6, (salmón, salmón)), (6, (salmón, conejo) ), (6, (salmón, pavo)), (6, (salmón, salmón)), (6, (salmón, conejo)), (6, (salmón, pavo)), (3, (perro, perro) ), (3, (perro, gato)), (3, (perro, ñu)), (3, (perro, abeja)), (3, (rata, perro)), (3, (rata, gato) ), (3, (rata, gnu)), (3, (rata, abeja)))

? El diagrama de funcionamiento de este ejemplo es: 7) leftOutJoin

Cambiar el lado izquierdo de IZQUIERDA Se conservan todos los registros del nombre de la tabla 1 y se muestran los registros correspondientes al campo B en el nombre de la tabla 2 a la derecha y al nombre de la tabla 1.campo A

val a = sc.parallelize ( List("perro", "salmón", "salmón", "rata", "elefante"), 3)

val b = a.keyBy(_.length)

? val c =sc.parallelize(List("perro","gato","ñu","salmón","conejo","pavo","wo

lf", "oso", "abeja"), 3)

? val d = c.keyBy(_.length)

? b.leftOuterJoin(d).collect

? res1: Array[(Int, (String, Option[String]))] = Array((6, (salmón, Algunos(salmón))), (6, (salmón, Algunos(conejo) )), (6, (salmón, algunos (pavo))), (6, (salmón, algunos (salmón))), (6, (salmón, algunos (conejo))), (6, (salmón, algunos ( pavo))), (3, (perro, Algunos(perro))), (3, (perro, Algunos(gato))), (3, (perro, Algunos(gnu))), (3, (perro, Algunos (abeja))), (3, (rata, Algunos (perro))), (3, (rata, Algunos (gato))), (3, (rata, Algunos (ñu))), (3, ( rat, Some(bee))), (8, (elefante, None)))

? El diagrama de operación de este ejemplo es: 8) rightOutJoin (unión externa derecha)

? val a = sc.parallelize(List("perro", "salmón", "salmón", "rata", "elefante"), 3)

val b = a.keyBy(_. longitud )

? val c =sc.parallelize(List("perro", "gato", "ñu", "salmón", "conejo", "pavo", "lobo", "oso" , "abeja"), 3)

? val d = c.keyBy(_.length)

b.rightOuterJoin(d).collect

? res2: Array[(Int, (Option[String], String))] = Array((6, (Algunos (salmón), salmón)), (6, (Algunos (salmón), conejo)), (6, ( Algunos (salmón), pavo)), (6, (Algunos (salmón), salmón)), (6, (Algunos (salmón), conejo)), (6, (Algunos (salmón), pavo)), ( 3 , (Algunos(perro), perro)), (3, (Algunos(perro), gato)), (3, (Algunos(perro), ñu)), (3, (Algunos(perro), abeja)) , (3, (Algunos (rata), perro)), (3, (Algunos (rata), gato)), (3, (Algunos (rata), ñu)), (3, (Algunos (rata), abeja ) ), (4, (Ninguno, lobo)), (4, (Ninguno, oso)))

? El diagrama de operación de este ejemplo es:

3.

1) para cada

val c = sc.parallelize(List("gato", "perro", "tigre", "león", "ñu", "cocodrilo", "hormiga", "ballena", "delfín", "araña "), 3)//Utilice el método de paralelización para importar datos

c.foreach(x =gt; println(x "s son deliciosos"))//Procese un dato después de obtener un dato data

El diagrama de operación de este ejemplo es: 2) fold

val a = sc.parallelize(List(1, 2, 3), 3)

a.fold(0 )(_ _)

res59: Int = 6

Dibuja un diagrama para este ejemplo:

?3) agregado

val z = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)

// primero imprimamos el contenido del RDD con las etiquetas de partición

def myfunc(index: Int, iter: Iterador[(Int)]): Iterador[Cadena] = {

iter.toList.map(x =gt; "[partID : " index ", val : " x "]").iterator

}

z.mapPartitionsWithIndex(myfunc).collect

res28: Array[String ] = Array([partID:0, val: 1], [partID: 0, val: 2], [partID: 0, val: 3], [partID: 1, val: 4], [partID: 1, val : 5], [partID: 1, val: 6])

z.aggregate(0)(math.max(_, _), _ _)

res40: Int = 9

Dibuje un diagrama para este ejemplo: 4) recolectar

? val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog) ", "Gnu", "Rat"), 2)

? c.collect ?//Combinando dos particiones en una mediante el operador de recopilación

? res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

Dibuje un diagrama para este ejemplo: 5) CollectAsMap

En RDD

Si hay varios Valores en la misma Clave, el Valor posterior sobrescribirá el Valor anterior. El resultado final es que la Clave es única y corresponde a un Valor

val a = sc.parallelize(List. (1, 2, 1, 3), 1)

val b = a.zip(a)

b.collectAsMap

res1: scala.collection .Map [Int, Int] = Map(2 -gt; 2, 1 -gt; 1, 3 -gt; 3)

Dibuja un diagrama para este ejemplo: 6) reduceByKeyLocally

Esta función opera el valor V correspondiente a cada K en RDD [K, V] de acuerdo con la función de mapeo, y el resultado de la operación se asigna a un Mapa [K, V] en lugar de RDD [K, V].

val a =sc.parallelize(List("perro", "gato", "búho", "ñu", "hormiga"), 2)

val b = a .map(x =gt; (x.length, x))

b.reduceByKey(_ _).collect

res86: Array[(Int, String)] =Array ((3, dogcatowlgnuant))

Dibuja un diagrama para este ejemplo: 7) lookup

val a =sc.parallelize(List("perro", "tigre", "león ", "gato", "pantera", "águila"), 2)

val b = a.map(x =gt; (x.length, x))

b.lookup(5)

res0: Seq[String] = WrappedArray(tigre, eagle)

Dibuja un diagrama para este ejemplo: 8) count

val c = sc.parallelize(List("Gnu", "Gato", "Rata", "Perro"), 2)

c.count

res2: Long = 4

Dibuja un diagrama para este ejemplo: 9) top?

val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)

c.top(2)

res28: Array[Int] = Array(9, 8)

Dibuja un diagrama para este ejemplo: 10) reducir

val a = sc.parallelize(1 a 100, 3)

a.reduce(_ _)

res41: Int = 5050

Dibuje un diagrama para este ejemplo:

Este artículo es el resultado de una semana de escritura. No estoy muy familiarizado con él. Necesito entender un operador a la vez. muy preciso, pero está dentro del alcance de mi comprensión. Es difícil encontrar conocimiento sobre esto en el sitio web, por lo que la mayor parte se basa en el autoestudio. Mi comprensión de mapreduce y spark se limita a hacer referencia al conocimiento en línea más algunas de mis propias opiniones.

------------------------------------------------- ---- Referencia: blogs.com/MOBIN/p/5414490.html#12------------------------------- --- --------------------