Cómo aplicar dos datos a un reductor
1.1.InputFormat genera InputSplits y llama a RecordReader para convertir estas unidades lógicas (InputSplits) en entradas para la tarea de mapeo. Entre ellos, InputSplit es la representación lógica de la unidad de entrada más pequeña procesada por la tarea de mapeo.
1.2. Llame a la clase Job en el código del cliente para establecer parámetros y ejecutar el programa MapReduce en el clúster de hadoop.
1.3. Cree una instancia de la clase Mapper en el Trabajo y pase la configuración de los parámetros a través del objeto MapContext. Los parámetros se pueden configurar llamando a Job.getConfiguration().set("myKey", "myVal").
1.4.El método run() de Mapper llamará a su propio setup() para establecer parámetros.
1.5. El método run() de Mapper llama al método map(KeyInType, ValInType, Context) para procesar secuencialmente los pares clave/valor ingresados en InputSplit. También puede utilizar el método Context.write(KeyOutType, ValOutType) del parámetro Context para generar los resultados del procesamiento. Los programas de usuario también pueden usar Contexto para establecer información de estado, etc. Al mismo tiempo, los usuarios también pueden usar Job.setCombinerClass(Class) para configurar el combinador para agregar los datos del estado intermedio generados por el mapa localmente en el Mapper, reduciendo así los datos pasados al reductor.
1.6.El método run() de Mapper llama al método cleanup().
Algunas notas:
Todos los resultados intermedios se agrupan automáticamente mediante el marco MapReduce y luego se pasan al reductor para generar los resultados finales. Los usuarios pueden configurar el comparador a través de Job.setGroupingComparatorClass(Class). Si no se configura este comparador, no se ordenarán todos los datos con la misma clave.
Los resultados del Mapper se ordenan y dividen en R fragmentos (R es el número de reductores).
Los datos del estado intermedio generalmente se almacenan en el formato simple de (key-len, key, value-len, value). Los programas de usuario pueden especificar un CompressionCodec para comprimir datos de estado intermedio.
El número de asignaciones está determinado por el tamaño total de los datos de entrada. En términos generales, el paralelismo es bueno cuando un nodo informático tiene entre 10 y 100 tareas de mapeo. Si la carga computacional de la CPU es pequeña, también es posible realizar un promedio de 300 tareas de mapeo por nodo de cómputo. Sin embargo, la inicialización de cada tarea lleva tiempo, por lo que cada tarea no se puede dividir demasiado y, en promedio, se ejecuta al menos una tarea cada minuto. El número de mapas se puede configurar utilizando el parámetro mapreduce.job.maps. Además, la cantidad de tareas está controlada por el método InputFormat.getSplits(), que el usuario puede anular.
------------------------------------------- ----- --------------------------------------------- ----- ---------------------------------- ----------- ------- -
Reducer se divide principalmente en tres pasos:
1.Shuffle
En este paso, Reducer obtiene el estado intermedio de los datos relevantes del nodo Mapper.
2. Ordenar
Mientras se barajan, los datos adquiridos también deben ordenarse. Durante este proceso, puede utilizar Job.setGroupingComparatorClass(Class) para realizar una clasificación secundaria de datos con la misma clave.
3.Reduce
La secuencia de llamada de Reduce es similar a Map y también se implementa llamando a setup(), reduce() y cleanup() a través de run(). método.
El número de Reducciones se puede configurar a través de Job.setNumReduceTasks(int). En términos generales, el número de reductores es de 0,95 a 1,75 veces el número de nodos.
El número de Reducciones también se puede establecer en 0, de modo que la salida del mapa se escriba directamente en el sistema de archivos.
Reducir también puede utilizar la función de reinicio de marca. En resumen, usted marca los datos del estado intermedio producidos por el mapa a medida que lo recorre y luego usa la función de reinicio en un momento apropiado para regresar a la posición marcada más recientemente. Por supuesto, esto tiene ciertas limitaciones; en el siguiente ejemplo, debe cambiar el nombre del Iterador de reducción en el método Reducir a MarkableIterator para usarlo.
public void reduce(Clave IntWritable, Iterablelt; IntWritablegt; valores, contexto de contexto) arroja IOException, InterruptedException {
MarkableIteratorlt; IntWritablegt; mitr = new IntWritablegt; ());
//Marcar posición
mitr.mark();
while (mitr.hasNext()) {
i = mitr.next();
//Realizar el procesamiento necesario
}
//Restablecer
reset( );
//Reiterar. Dado que se llama a mark antes de la primera
// llamada mitr.next() en este ejemplo, ahora iteraremos sobre todos
//valores
mientras (mitr.hasNext()) {
i = mitr.next();
//Realizar el procesamiento necesario
}
}
}
3.>
3. Particionador
La mitad de la salida del mapa de los datos del estado del particionador está basada en particiones. en función del número de restauraciones, normalmente mediante hash.
La implementación predeterminada es HashPartitioner.
4. Informar del progreso
Los programas MapReduce pueden informar el estado de la aplicación a través del contexto del asignador o reductor.
5. Trabajo
Cuando escribimos un programa MapReduce, normalmente está en la función principal. Cree un objeto Job, establezca su JobName, luego configure las rutas de entrada y salida, configure nuestras clases Mapper y Reducer, establezca el InputFormat y el tipo de salida correcto, etc. Luego enviaremos al JobTracker usando job.waitForCompletion(), esperaremos a que el trabajo se ejecute y regrese; este es el proceso general de configuración del trabajo. JobTracker inicializará el trabajo, obtendrá los fragmentos de entrada y luego asignará las tareas al TaskTracker una por una para su ejecución. TaskTracker obtiene la tarea a través del valor de retorno del latido y luego TaskTracker inicia la JVM para ejecutar la tarea recibida.
Job en realidad proporciona las funciones de configuración del trabajo, obtención de configuración del trabajo, envío de trabajos, así como funciones de seguimiento del progreso del trabajo y control de trabajos. La clase Job hereda de la clase JobContext. JobContext proporciona la función de obtener la configuración del trabajo, como ID del trabajo, clase Mapper del trabajo, clase Reductor, formato de entrada, formato de salida, etc. Excepto el ID del trabajo, todos son de sólo lectura. La clase Job basada en JobContext proporciona funciones para configurar información de configuración del trabajo, rastrear el progreso, así como interfaces para enviar trabajos y métodos para controlar trabajos.
Un objeto Job tiene dos estados, DEFINE y RUNNING. El objeto Job está en el estado DEFINE cuando se crea. Si y sólo si el objeto Job está en el estado DEFINE, se puede utilizar para configurar. ciertas configuraciones del trabajo, como Reducir el número de tareas, clase InputFormat, clase Mapper del trabajo. Clase de particionador, etc., estas configuraciones se logran estableciendo la información de configuración conf; cuando el trabajo se envía a través de enviar (), el estado del objeto Trabajo se establecerá en EN EJECUCIÓN. La configuración anterior no se puede configurar, el trabajo está en la etapa de programación. En el estado EN EJECUCIÓN del trabajo, podemos obtener el progreso del trabajo, maptask y reducir la tarea a través de *Progress() en el código. Estas funciones se obtienen a través de la información del objeto RunningJob, que es el trabajo que se está ejecutando. en el trabajo real. Un conjunto de interfaces de acceso, como Progress.
En waitForCompletion(), primero use submit() para enviar el trabajo y luego espere a que info.waitForCompletion() devuelva que el trabajo se ha ejecutado. Submit() primero verificará si la nueva API. se usa correctamente. Esto se puede lograr configurando UseNewAPI() para ver si las propiedades de la versión anterior están configuradas. Connect() luego se conecta al JobTracker y envía el trabajo. El trabajo enviado real es un objeto JobClient, que devuelve un objeto RunningJob que rastrea el progreso del trabajo y contiene el ID del trabajo establecido por JobTracker.
La función getCounter() se utiliza para devolver la lista de contadores del trabajo. Estos contadores se utilizan para recopilar estadísticas del trabajo, como el número de tareas de mapa fallidas y el número de registros generados por reducción. , etc. Consta de contadores integrados y contadores definidos por el usuario.
Los contadores definidos por el usuario se pueden utilizar para recopilar información específica requerida por el usuario. Primero, cada tarea transfiere periódicamente los contadores al TaskTracker y, finalmente, el TaskTracker los transfiere al JobTracker para su recopilación. Esto significa que el contador es global.