Red de conocimiento informático - Material del sitio web - Cómo convertir MapReduce a Spark

Cómo convertir MapReduce a Spark

MapReduce to Spark

Spark es un motor informático similar a MapReduce. Propone un método en memoria para resolver la dificultad de la lenta velocidad de lectura del disco de MapReduce. Además, se basa en el estilo de programación funcional y la API de Scala. , lo que lo hace muy eficiente al realizar cálculos paralelos.

Dado que Spark usa RDD (Resilient Distributed Result Dataset) para calcular datos, que es diferente de Map() y Reduce() de MapReduce

, es difícil usar Mapper y Reducer. API, que también es un obstáculo para convertir MapReduce a Spark. Convierta MapReduce a Spark.

En comparación con los métodos map() y reduce() en Scala o Spark en comparación con los métodos map() y reduce() en Hadoop MapReduce, la API de Hadoop

MapReduce es más flexible y complejo. Las siguientes son algunas características de Hadoop MapReduce:

Los mapeadores y reductores generalmente usan pares clave-valor como entrada y salida;

Una clave corresponde a una reducción para un reductor;

p>

Cada asignador o reductor puede emitir pares clave-valor (como 0, 1) como salida para cada salida;

Los asignadores y reductores pueden emitir claves o valores arbitrarios, en lugar del conjunto de datos estándar. métodos;

Los objetos mapeador y reductor tienen un ciclo de vida cada vez que se llama a map() y reduce(). Admiten el método setup() y el método cleanup(), que se pueden utilizar para operaciones antes de procesar datos por lotes.

Imagínese un escenario en el que necesitamos contar el número de caracteres por línea en un archivo de texto. En Hadoop

MapReduce, debemos proporcionar un par clave-valor al método Mapper, donde la clave se usa como el número de filas en la fila y el valor es el número de caracteres en la fila.

Listado 9.

Método MapReduce Función de mapa

clase pública LineLengthCountMapper

extiende Mapperlt;

@Override

mapa vacío protegido (Número de línea de escritura larga, línea de texto, contexto de contexto)

arroja IOException, InterruptedException {

contexto. escribir(nuevo IntWritable(line.getLength()), nuevo IntWritable(1));

}

}

}

El código del Listado 9 muestra que, dado que los asignadores y reductores solo manejan pares clave-valor, para la clase

p>

son todos caracteres en la línea. El código después de cambiar a Spark se muestra en el Listado 10.

Listado 10. Función Mapa de Spark

lines.map(line =gt; (line.length, 1))

En Spark, la entrada es Resilient conjuntos de datos distribuidos

. Spark no requiere pares clave-valor, sino que utiliza tuplas de Scala, que se crean utilizando la sintaxis (a, b) (línea.longitud,

1). La operación map() en el código anterior es una tupla RDD, (line.length,

1). Cuando un RDD contiene tuplas, se basa en otros métodos como reduceByKey(), que es importante para regenerar la funcionalidad MapReduce.

El código que se muestra en el Listado 11 es Hadoop MapReduce que cuenta el número de caracteres en cada línea y luego lo genera como Reducir. IntWritable, IntWritablegt; {

@Override

protected void reduce(IntWritable length, Iterablelt; IntWritablegt; counts, Context context)

lanza IOException, InterruptedException {

int suma = 0;

for (Recuento IntWritable: recuentos) {

suma = count.get(); /p>

contexto.write(longitud, nuevo IntWritable(suma));

}

}

}

El código correspondiente dentro de Spark se muestra en el Listado 12.

Listado 12.

Función de reducción de Spark

val lengthCounts = lines.map(line =gt; (line.length, 1)).reduceByKey( _ _)

La API RDD de Spark tiene un método reduce().

Ahora necesitamos contar el número de palabras que comienzan con una letra mayúscula. Para cada línea de texto, es posible que el asignador necesite contar muchos pares clave-valor, como se muestra en el Listado 13. IntWritablegt; {

@Override

mapa vacío protegido (Número de línea de escritura larga, línea de texto, contexto de contexto)

arroja IOException, InterruptedException {

for (palabra de cadena: line.toString().split(" "))){

if (Character.isUpperCase(word.charAt(0))){

contexto .write(nuevo Texto(palabra), nuevo IntWritable(1));

}

}

}

}

}

Dentro de Spark, el código correspondiente se muestra en el Listado 14.

Listado 14. Contando el número de caracteres en Spark

lines.flatMap(

_.split(" ").filter(word=gt;Character . isUpperCase(word(0))).map(word=gt;(word,1))

)

MapReduce se basa en el método Map

. , pero este enfoque no funciona aquí porque cada entrada debe corresponder a una salida, en cuyo caso cada fila puede ocupar una gran cantidad de salida. Por el contrario, el método Map

en Spark es más simple; el método

de Spark primero resume cada fila de datos en una matriz de salida, que puede estar vacía o contener muchos valores, en última instancia como un RDD

como salida. Esto es lo que hace el método flatMap(), filtra las palabras en cada línea de texto después de convertirlas en tuplas dentro de la función.

Dentro de Spark, el método reduceByKey() se puede utilizar para contar el número de letras que aparecen en cada artículo. Si tuviéramos que contar el número de letras mayúsculas en cada artículo, el programa en MapReduce sería similar al Listado 15.

Listado 15. Método MapReduce

clase pública CountUppercaseReducer

extiende Reducerlt; Text, IntWritable, Text, IntWritablegt; {

@Override

protected void reduce(Texto palabra, Iterablelt; recuentos, contexto de contexto)

arroja IOException, InterruptedException {

int sum = 0

for (Recuento de IntWritable: recuentos) {

suma = count.get();

}

contexto.write(new Text(word.toString().toUpperCase()), new IntWritable(suma)) ;

}

}

En Spark, el código se muestra en el Listado 16.

Listado 16. Métodos Spark

groupByKey().map { case (word, ones) =gt; (word.toUpperCase, ones.sum) }

El método groupByKey() es responsable de recopilar claves Todos los valores no deben usarse en un método de reducción. En este caso, las claves se convierten a letras mayúsculas y los valores se suman directamente. Sin embargo, tenga en cuenta que si una clave está asociada con muchos valores, puede ocurrir un error de falta de memoria.

Spark proporciona una forma sencilla de convertir el valor correspondiente a una clave, entregando así el flujo del método de reducción a Spark y evitando excepciones OOM.

reduceByKey(_ _).map { case (palabra, total) =gt (word.toUpperCase, total)

}

MapReduce setup() El método es procesar la entrada antes de que comience el método del mapa. Un escenario de aplicación común es conectarse a una base de datos, en cuyo caso el método cleanup() liberará los recursos utilizados en el método setup().

Lista 17.Método MapReduce

Clase pública SetupCleanupMapper extiende Mapperlt; LongWritable, Text, Text, IntWritablegt; {

Conexión privada dbConnection; p> @Override

configuración de vacío protegido (contexto contextual) {

dbConnection = ... ;;

}

.. .

@Override

limpieza de vacíos protegidos (contexto contextual) {

dbConnection.close()

}

;

}