Cómo implementar el marco informático mapreduce para implementar la iteración de manera efectiva
Desde su aparición, MapReduce se ha convertido en el trabajo líder del paradigma informático Apache Hadoop. Es perfecto para cada trabajo para el que está diseñado: procesamiento de registros a gran escala, operaciones por lotes ETL, etc.
A medida que el alcance del uso de Hadoop continúa expandiéndose, ha quedado claro que MapReduce no es el mejor marco para todos los cálculos. Hadoop 2 utiliza el administrador de recursos YARN como componente de nivel superior, brindando la posibilidad de conectar otros motores informáticos. La introducción de arquitecturas que no son de MapReduce, como Impala, le da a la plataforma la capacidad de admitir SQL interactivo.
Hoy en día, Apache Spark es otra alternativa de este tipo y se conoce como un paradigma informático de propósito general más allá de MapReduce. Quizás tengas curiosidad: MapReduce ha sido tan útil durante mucho tiempo, ¿cómo podría reemplazarse de repente? Después de todo, todavía queda mucho trabajo de ETL por hacer en Hadoop, incluso si la plataforma ya tiene otras capacidades en tiempo real.
Afortunadamente, es completamente posible volver a implementar cálculos similares a MapReduce en Spark. Pueden ser más sencillos de mantener y, en algunos casos, más rápidos, gracias a la optimización de Spark del vaciado de datos al disco. La reimplementación por parte de Spark del paradigma de programación MapReduce no es más que un regreso a sus raíces. Spark imita la API y el estilo de programación funcional de Scala. La idea de MapReduce proviene del lenguaje de programación funcional LISP.
Aunque la abstracción principal de Spark es RDD (Resilient Distributed Dataset), que implementa operaciones de mapa, reducción y otras, estas no son simulaciones directas de la API Mapper o Reducer de Hadoop. Estos cambios a menudo se convierten en un obstáculo para que los desarrolladores migren de las clases Mapper y Reducer a Spark en paralelo.
En comparación con las funciones de mapa y reducción implementadas en lenguajes funcionales clásicos en Scala o Spark, las API Mapper y Reducer proporcionadas por el Hadoop original son más flexibles y complejas. Es posible que estas diferencias no sean obvias para los desarrolladores que están acostumbrados a MapReduce. El siguiente comportamiento es para la implementación de Hadoop en lugar del concepto abstracto de MapReduce:
·Mapper y Reducer siempre usan pares clave-valor como entrada. y salida.
·Cada Reductor reduce Valor según Clave.
·Cada Mapper y Reducer puede producir 0, 1 o más pares clave-valor para cada conjunto de entradas.
· Mapper y Reducer pueden generar claves y valores arbitrarios, y no se limitan a subconjuntos y transformaciones de la entrada.
El ciclo de vida de los objetos Mapper y Reducer puede abarcar múltiples operaciones de mapa y reducción. Admiten métodos de configuración y limpieza, que se llaman antes de que comience el procesamiento de registros por lotes y después de que finalice.
Este artículo mostrará brevemente cómo reproducir el proceso anterior en Spark. ¡Descubrirá que no es necesario traducir Mapper y Reducer palabra por palabra!
Pares clave-valor como tuplas
Supongamos que necesitamos calcular la longitud de cada línea en un texto grande e informar el número de líneas para cada longitud. En HadoopMapReduce, primero usamos un Mapper para generar un par clave-valor con la longitud de la fila como clave y 1 como valor.
la clase pública LineLengthMapper extiende
Mapperlt LongWritable, Text, IntWritable, IntWritablegt; {
@Override
mapa vacío protegido (LongWritable; número de línea, línea de texto, contexto de contexto)
lanza IOException, InterruptedException {
context.write(new IntWritable(line.getLength()), new IntWritable(1)); p>
p>
}
}
Vale la pena señalar que los asignadores y reductores solo operan en pares clave-valor. Por lo tanto, TextInputFormat proporciona entrada a LineLengthMapper. De hecho, la posición en el texto se usa como clave (rara vez se usa de esta manera, pero siempre se necesita algo como clave), y la línea de texto es un par clave-valor.
La implementación de Spark correspondiente:
lines.map(line =gt; (line.length, 1))
En Spark, la entrada es solo String RDD compuesto por pares clave-valor en lugar de pares clave-valor. La representación de pares clave-valor en Spark es una tupla de Scala, que se crea utilizando la sintaxis (A, B). El resultado de la operación de mapa anterior es un RDD de tuplas (Int, Int). Cuando un RDD contiene muchas tuplas, adquiere múltiples métodos como reduceByKey, que será crucial para reproducir el comportamiento de MapReduce.
Reducir
reduce() y reduceBykey()
Para contar los pares clave-valor de longitudes de fila, cada longitud debe usarse como clave en el Reductor para calcular La suma de sus números de fila se utiliza como valor.
clase pública LineLengthReducer extiende
Reducerlt; IntWritable, IntWritable, IntWritable, IntWritablegt; longitud, Iterablelt; IntWritablegt; recuentos,
Contexto) arroja IOException, InterruptedException {
int sum =
for (IntWritable count: recuentos) {
suma = count.get();
}
context.write(longitud, nuevo IntWritable(suma));
}
}
La implementación correspondiente al Mapper y Reducer anterior en Spark solo requiere una línea de código:
val lengthCounts = lines.map(line = gt; (line .length, 1)).reduceByKey(_ _)
La API RDD de Spark tiene un método de reducción, pero reducirá todos los pares clave-valor a un solo valor. Este no es el comportamiento de Hadoop MapReduce, la contraparte en Spark es ReduceByKey.
Además, el método Reduce de Reducer recibe un flujo de valores múltiples y produce 0, 1 o más resultados. Y reduceByKey, acepta una función que convierte dos valores en un valor. En este caso, es una función de suma simple que asigna dos números a su suma. La persona que llama puede utilizar esta función asociada para reducir varios valores a un solo valor. En comparación con el método Reducer, es una API más simple y precisa que reduce el valor según la clave.
Mapper
map() y flatMap()
Ahora, considere un algoritmo que cuenta el número de palabras que comienzan con una letra mayúscula. Para cada línea de texto de entrada, Mapper puede producir 0, 1 o más pares clave-valor.
clase pública CountUppercaseMapper extiende
Mapperlt LongWritable, Text, Text, IntWritablegt; {
@Override
mapa vacío protegido (LongWritable; número de línea, línea de texto, contexto de contexto)
lanza IOException, InterruptedException {
for (palabra de cadena: line.toString().split(" ")) {
if (Character.isUpperCase(word.charAt(0))) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}
El método de escritura correspondiente para Spark:
lines.flatMap( p>
_.split(" ").filter(palabra =gt; Character.isUpperCase(palabra(0))).map(palabra =gt; (palabra, 1))
) p>
La función simple de mapa de Spark no es adecuada para este escenario, porque el mapa solo puede producir una única salida para cada entrada, pero en este ejemplo una línea necesita producir múltiples salidas. Por lo tanto, en comparación con lo que admite MapperAPI, la función de mapa de Spark tiene una semántica más simple y un alcance de aplicación más limitado.
La solución de Spark es primero asignar cada fila a un conjunto de valores de salida, que pueden ser nulos o tener varios valores. Luego se aplanará usando la función flatMap. Las palabras de la matriz se filtran y se convierten en tuplas en la función. En este ejemplo, lo que realmente imita el comportamiento de Mapper es flatMap, no map.
groupByKey()
Escribir un reductor que cuente el número de veces es simple. En Spark, reduceByKey se puede usar para contar el número total de cada palabra.
Por ejemplo, por alguna razón, cada palabra en el archivo de salida debe mostrarse como una letra mayúscula y su número. En MapReduce, la implementación es la siguiente:
la clase pública CountUppercaseReducer se extiende
.Reducerlt; Texto, IntWritable, Texto, IntWritablegt; {
@Override
reducción de vacío protegido (palabra de texto, Iterablelt; IntWritablegt; recuentos, contexto de contexto)
lanza IOException, InterruptedException {
int suma = 0;
for (IntWritable count: counts) {
sum = count.get();
}
context
.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
}
}
Pero reduceByKey no puede funcionar solo en Spark porque conserva la clave original. Para simular en Spark, necesitamos algunas operaciones más parecidas a la API Reducer. Sabemos que el método de reducción de Reducer acepta una clave y un conjunto de valores, y luego completa un conjunto de transformaciones. groupByKey y una operación de mapa continua pueden lograr este objetivo:
groupByKey().map { case (word, ones) =gt (word.toUpperCase, ones.sum) }
groupByKey solo recopila todos los valores de una determinada clave y no proporciona la función de reducción. En base a esto, se puede realizar cualquier transformación en la clave y un rango de valores. Aquí, la clave se convierte a letras mayúsculas y los valores se suman directamente.
setup() y cleanup()
En MapReduce, Mapper y Reducer pueden declarar un método de configuración que se ejecutará antes de procesar la entrada para asignar recursos costosos, como conexiones de bases de datos, y en Al mismo tiempo, se pueden liberar recursos utilizando la función de limpieza.
La clase pública SetupCleanupMapper extiende
Mapperlt; LongWritable, Text, Text, IntWritablegt; {
Conexión privada dbConnection;
@Override
configuración de vacío protegido(Contexto) {
dbConnection = ...
}
...
<; p>@Overridelimpieza de vacíos protegidos (contexto contextual) {
dbConnection.close();
}
} p>
Los métodos map y flatMap en Spark solo pueden operar en una entrada a la vez y no proporcionan un método para ejecutar código antes y después de convertir una gran cantidad de valores. Parece que el código de configuración y limpieza sí puede. colocarse directamente en la función Sparkmap Antes y después de llamar:
val dbConnection = ...
lines.map(... dbConnection.createStatement(...) .. .)
dbConnection.close() // ¡Incorrecto!
Sin embargo, este método no es factible porque:
· Coloca el objeto dbConnection en el cierre. de la función de mapa. Esto debe ser serializable (por ejemplo, a través de java.io.Serializable). Los objetos como las conexiones de bases de datos generalmente no se pueden serializar.
· Map es una transformación, no una operación, y retrasa la ejecución. El objeto de conexión no se puede cerrar rápidamente.
· Aun así, solo puede cerrar la conexión en el controlador, en lugar de liberar la conexión de recursos asignada por la versión de copia serializada.
De hecho, ni map ni flatMap son la función correspondiente más cercana a Mapper en Spark. La función correspondiente más cercana a Mapper en Spark es el muy importante método mapPartitions(). Este método no solo puede completar un solo valor. mapeo de valor único, también es posible completar el mapeo de un conjunto de valores a otro conjunto de valores, muy similar al método de mapeo por lotes (mapa masivo). Esto significa que el método mapPartitions() puede asignar recursos localmente al principio y liberarlos al final del mapeo por lotes.
Agregar un método de configuración es simple, agregar un método de limpieza es más difícil, porque detectar que la conversión está completa aún es difícil.
Por ejemplo, esto funciona:
lines.mapPartitions { valueIterator =gt;
val dbConnection = ... // OK
val transformIterator = valueIterator .map (... dbConnection ...)
dbConnection.close() // ¡Aún es incorrecto! Puede que no se haya evaluado el iterador
transformedIterator
}
Un paradigma completo debería verse así:
lines.mapPartitions { valueIterator =gt;
if (valueIterator.isEmpty) {
Iterator[ ...]()
} else {
val dbConnection = ...
valueIterator.map { item =gt;
p>val transformItem = ...
if (!valueIterator.hasNext) {
dbConnection.close()
}
transformedItem
}
}
}
Aunque la última traducción del código está destinada a ser menos elegante que la primera, puede De hecho, se completará el trabajo.
El método flatMapPartitions no existe, sin embargo, se puede lograr el mismo efecto llamando a mapPartitions, seguido de una llamada a flatMap(a= gt; a).
El Reductor con configuración y limpieza solo necesita seguir el código anterior y usar groupByKey seguido de una función mapPartition.
No te preocupes, espera, hay más
Los desarrolladores de MapReduce señalarán que hay más API que aún no se han mencionado:
· MapReduce admite un tipo especial de Reductor, también llamado Combinador, que puede reducir el tamaño de los datos mezclados desde el Mapper.
· También admite particiones personalizadas a través de Partitioner y agrupaciones personalizadas a través de Grouping Comparator.
· El objeto Context otorga acceso a Counter API y sus estadísticas acumulativas.
· El Reductor siempre puede ver las claves ordenadas durante su ciclo de vida.
· MapReduce tiene su propio esquema de serialización grabable.
· Mapper y Reducer pueden emitir múltiples conjuntos de resultados al mismo tiempo.
· MapReduce tiene decenas de parámetros de ajuste.
Hay muchas formas de implementar estos escenarios en Spark, usando API como Accumulator, métodos como groupBy y agregando argumentos de partición a varios de estos métodos, serialización Java o Kryo, almacenamiento en caché y más. Debido a limitaciones de espacio, no se presentarán en detalle en este artículo.
Cabe señalar que el concepto de MapReduce sigue siendo útil. Es solo que ahora hay una implementación más poderosa que utiliza lenguajes funcionales para adaptarse mejor a su funcionalidad.
Comprender las diferencias entre la API Spark RDD y la API Mapper y Reducer original puede ayudar a los desarrolladores a comprender mejor cómo funcionan todas estas funciones y a comprender cómo utilizar Spark para su beneficio.