Cómo utilizar Spark para implementar programas MapReduce existentes
Este artículo mostrará brevemente cómo reproducir el proceso anterior en Spark. ¡Descubrirá que no es necesario traducir Mapper y Reducer palabra por palabra!
Pares clave-valor como tuplas
Supongamos que necesitamos calcular la longitud de cada línea en un texto grande e informar el número de líneas para cada longitud. En HadoopMapReduce, primero usamos un Mapper para generar un par clave-valor con la longitud de la fila como clave y 1 como valor.
la clase pública LineLengthMapper extiende
Mapperlt LongWritable, Text, IntWritable, IntWritablegt; {
@Override
mapa vacío protegido (LongWritable; número de línea, línea de texto, contexto de contexto)
lanza IOException, InterruptedException {
context.write(new IntWritable(line.getLength()), new IntWritable(1)); p>
p>
}
}
Vale la pena señalar que los asignadores y reductores solo operan en pares clave-valor. Por lo tanto, TextInputFormat proporciona entrada a LineLengthMapper. De hecho, la posición en el texto se usa como clave (rara vez se usa, pero siempre se necesita algo como clave), y el valor de la línea de texto es un par clave-valor.
La implementación de Spark correspondiente:
lines.map(line =gt; (line.length, 1))
En Spark, la entrada es solo String RDD compuesto por pares clave-valor en lugar de pares clave-valor. La representación de pares clave-valor en Spark es una tupla de Scala, que se crea utilizando la sintaxis (A, B). El resultado de la operación de mapa anterior es un RDD de tuplas (Int, Int). Cuando un RDD contiene muchas tuplas, adquiere múltiples métodos como reduceByKey, que será crucial para reproducir el comportamiento de MapReduce.
Reducir
reduce() y reduceBykey()
Para contar los pares clave-valor de longitudes de fila, cada longitud debe usarse como clave en el Reductor para calcular La suma de sus números de fila se utiliza como valor.
clase pública LineLengthReducer extiende
Reducerlt; IntWritable, IntWritable, IntWritable, IntWritablegt; longitud, Iterablelt; IntWritablegt; recuentos,
Contexto) arroja IOException, InterruptedException {
int sum =
for (IntWritable count: recuentos) {
suma = count.get();
}
context.write(longitud, nuevo IntWritable(suma));
}
}
La implementación correspondiente al Mapper y Reducer anterior en Spark solo requiere una línea de código:
val lengthCounts = lines.map(line = gt; (line .length, 1)).reduceByKey(_ _)
La API RDD de Spark tiene un método de reducción, pero reducirá todos los pares clave-valor a un solo valor. Este no es el comportamiento de Hadoop MapReduce, la contraparte en Spark es ReduceByKey.
Además, el método Reduce de Reducer recibe un flujo de valores múltiples y produce 0, 1 o más resultados. Y reduceByKey, acepta una función que convierte dos valores en un valor. En este caso, es una función de suma simple que asigna dos números a su suma. La persona que llama puede utilizar esta función asociada para reducir varios valores a un solo valor. En comparación con el método Reducer, es una API más simple y precisa que reduce el valor según la clave.
Mapper
map() y flatMap()
Ahora, considere un algoritmo que cuenta el número de palabras que comienzan con una letra mayúscula. Para cada línea de texto de entrada, Mapper puede producir 0, 1 o más pares clave-valor.
clase pública CountUppercaseMapper extiende
Mapperlt LongWritable, Text, Text, IntWritablegt; {
@Override
mapa vacío protegido (LongWritable; número de línea, línea de texto, contexto de contexto)
lanza IOException, InterruptedException {
for (palabra de cadena: line.toString().split(" ")) {
if (Character.isUpperCase(word.charAt(0))) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}
El método de escritura correspondiente para Spark:
lines.flatMap( p>
_.split(" ").filter(palabra =gt; Character.isUpperCase(palabra(0))).map(palabra =gt; (palabra, 1))
) p>
La función simple de mapa de Spark no es adecuada para este escenario, porque el mapa solo puede producir una única salida para cada entrada, pero en este ejemplo una línea necesita producir múltiples salidas. Por lo tanto, en comparación con lo que admite MapperAPI, la función de mapa de Spark tiene una semántica más simple y un alcance de aplicación más limitado.
La solución de Spark es primero asignar cada fila a un conjunto de valores de salida, que pueden ser nulos o tener varios valores. Luego se aplanará usando la función flatMap. Las palabras de la matriz se filtran y se convierten en tuplas en la función. En este ejemplo, lo que realmente imita el comportamiento de Mapper es flatMap, no map.
groupByKey()
Escribir un reductor que cuente el número de veces es simple. En Spark, reduceByKey se puede usar para contar el número total de cada palabra.
Por ejemplo, por alguna razón, cada palabra en el archivo de salida debe mostrarse como una letra mayúscula y su número. En MapReduce, la implementación es la siguiente:
la clase pública CountUppercaseReducer se extiende
.Reducerlt; Texto, IntWritable, Texto, IntWritablegt; {
@Override
reducción de vacío protegido (palabra de texto, Iterablelt; IntWritablegt; recuentos, contexto de contexto)
lanza IOException, InterruptedException {
int suma = 0;
for (IntWritable count: counts) {
sum = count.get();
}
context
.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
}
}
Pero reduceByKey no puede funcionar solo en Spark porque conserva la clave original. Para simular en Spark, necesitamos algunas operaciones más parecidas a la API Reducer. Sabemos que el método de reducción de Reducer acepta una clave y un conjunto de valores, y luego completa un conjunto de transformaciones. groupByKey y una operación de mapa continua pueden lograr este objetivo:
groupByKey().map { case (word, ones) =gt (word.toUpperCase, ones.sum) }
groupByKey solo recopila todos los valores de una determinada clave y no proporciona la función de reducción. En base a esto, se puede realizar cualquier transformación en la clave y un rango de valores. Aquí, las claves se convierten a letras mayúsculas y los valores se suman directamente.
setup() y cleanup()
En MapReduce, Mapper y Reducer pueden declarar un método de configuración que se ejecutará antes de procesar la entrada para asignar recursos costosos, como conexiones de bases de datos, y en Al mismo tiempo, se pueden liberar recursos utilizando la función de limpieza.
La clase pública SetupCleanupMapper extiende
Mapperlt; LongWritable, Text, Text, IntWritablegt; {
Conexión privada dbConnection;
@Override
configuración de vacío protegido(Contexto) {
dbConnection = ...
}
...
<; p>@Overridelimpieza de vacíos protegidos (contexto contextual) {
dbConnection.close();
}
} p>
Los métodos map y flatMap en Spark solo pueden operar en una entrada a la vez y no proporcionan un método para ejecutar código antes y después de convertir una gran cantidad de valores. Parece que el código de configuración y limpieza sí puede. colocarse directamente en la función Sparkmap Antes y después de llamar:
val dbConnection = ...
lines.map(... dbConnection.createStatement(...) .. .)
dbConnection.close() // ¡Incorrecto!
Sin embargo, este método no es factible porque:
· Coloca el objeto dbConnection en el cierre. de la función de mapa, que debe ser serializable (por ejemplo, a través de java.io.Serializable). Los objetos como las conexiones de bases de datos generalmente no se pueden serializar.
· Map es una transformación, no una operación, y retrasa la ejecución. El objeto de conexión no se puede cerrar rápidamente.
· Aun así, solo puede cerrar la conexión en el controlador, en lugar de liberar la conexión de recursos asignada por la versión de copia serializada.
De hecho, ni map ni flatMap son la función correspondiente más cercana a Mapper en Spark. La función correspondiente más cercana a Mapper en Spark es el muy importante método mapPartitions(). Este método no solo puede completar un solo valor. mapeo de valor único, también es posible completar el mapeo de un conjunto de valores a otro conjunto de valores, muy similar al método de mapeo por lotes (mapa masivo). Esto significa que el método mapPartitions() puede asignar recursos localmente al principio y liberarlos al final del mapeo por lotes.
Es sencillo agregar el método de configuración, pero es más difícil agregar el método de limpieza, porque aún es difícil detectar que la conversión se ha completado. Por ejemplo, esto funciona:
lines.mapPartitions { valueIterator =gt;
val dbConnection = ... // OK
val transformIterator = valueIterator .map (... dbConnection ...)
dbConnection.close() // ¡Aún es incorrecto! Puede que no se haya evaluado el iterador
transformedIterator
}
Reimprimir