Red de conocimiento informático - Material del sitio web - Cómo utilizar JobControl de Hadoop

Cómo utilizar JobControl de Hadoop

Paquete com. Prueba de Hadoop.

Importar org .Apache .fs .

Importar org .Apache io . writable;

Importar org.Apache.Hadoop.io.text;

Importar org.Apache.Hadoop.io.writable comparador;

Importar org.Apache .Hadoop .MapReduce

Importar org. p>Importar organización. Apache Hadoop. .MapReduce.lib.control de trabajo.

importar .hadoop. .lib salida. formato de salida del archivo;

/**

* La versión de Hadoop es 1.2.

*Entorno JDK 1.6

* Utiliza la nueva API de ControlledJob+JobControl.

*Completa la tarea de combinación

*La primera tarea es contar la frecuencia de las palabras.

*La segunda tarea es ordenar en orden descendente.

*

*Si utiliza el trabajo de MapReduce para completarlo, debe ejecutar 2 tareas de MR.

*Pero si usamos JobControl+ControlledJob, se puede usar.

*Los trabajos relacionados con el tipo DAG se completan en clases.

*

*

* @autor Qin Dongliang

*

*

*

* ***/

Clase pública MyHadoopControl {

/***

*

*Mapper 1 del *trabajo MapReduce

*

* longwritetable1 representa el valor de la clave de entrada, que por defecto es el desplazamiento de posición del texto.

*El contenido específico de cada línea de texto 2

*El tipo de clave generada por el texto 3.

* texto 4 La salida del tipo de valor.

*

* */

Clase estática privada SumMapper extiende Mapper & ltLongWritable, Text, Text, IntWritable & gt{

Privado texto t = nuevo Texto();

privado int escribible uno = nuevo int escribible(1);

/**

*

*Frecuencia de palabra de salida de fase de mapa

*

*

* **/

@overlay

Mapa vacío protegido (clave LongWritable, valor de texto, contexto)

Lanza IOException, InterruptedException {

string data = tostring();

string. palabras[]= datos . split(";");

if(palabra[0]. trim()!=null){

t set(" +palabras[. 0]); //Asignar valor k

one . set(integer . parse int(words[1]);

context.write(t, one);

}

}

}

/**

Reductor 1 para trabajos de MapReduce

*Responsable de la acumulación y salida de frecuencia de palabras

*

* **/

La clase estática privada SumReduce extiende Reducer & ltText, IntWritable, IntWritable, Text & gt {

//Objeto de frecuencia de palabras de almacenamiento

private int writable iw = new int writable();

@overwrite

protected void reducir(Clave de texto, Iterable & ltIntWritable & gt valor, contexto)

Lanza IOException, InterruptedException {

int sum = 0;

for(IntWritable count: value){

sum+= count . get();//Frecuencia acumulada de palabras

}

iw .

context.write(iw, key); //Datos de salida

}

}

/**

Mapeador ordenado del trabajo 2 de MapReduce

*

* **/

La clase estática privada SortMapper extiende Mapper & ltLongWritable, Text, IntWritable, Text & gt {

int writable iw = new int writable(); //Frecuencia de palabras de almacenamiento

Texto privado t = new Text() //Texto de almacenamiento

@Override

El mapa vacío protegido (clave LongWritable, valor de texto, contexto de contexto) arroja IOException, InterruptedException {

String palabras[]=value.toString (). split(" ");

System.out.println("longitud de la matriz:"+palabras . longitud);

System.

out . println("Texto leído por mapa: "+value . tostring());

system out . = = = = >+palabras[1]);

if(palabras[0]!=null){

iw . ). trim()));

t.set(words[1].trim());

context.write(iw, t); , clasificación de claves predeterminada.

}

}

}

/**

*Reducer ordenado por trabajo MapReduce 2< / p>

*

* **/

La clase estática privada SortReduce extiende Reducer & ltIntWritable, Text, Text, IntWritable & gt{

/ * *

*

*Salida de contenido ordenado

*

* **/

@overwrite< / p>

Reducción de vacío protegido (clave IntWritable, Iterable y ltText y valor gt, contexto)

Lanza IOException, InterruptedException {

for(Text t: value){

context.write(t, clave); //Salida ordenada k, v.

}

}

}

/***

* Componente de clasificación, en operación de clasificación, debes usar

* para ordenar por palabra clave en orden descendente.

*

* **/

La clase estática pública DescSort extiende WritableComparator{

public DescSort() {

super(IntWritable.class, true); //Registrar componente de clasificación

}

@override

Public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,

int arg4, int arg5) {

return -super.compare(arg0, arg1, arg2, arg3, arg4, arg 5 ); //Tenga en cuenta que el signo menos se utiliza para completar el orden descendente.

}

@override

public int compare(objeto a, objeto b) {

return -super.compare(a, b); //Tenga en cuenta que el signo menos se utiliza para completar el orden descendente.

}

}

/**

*Categoría de controlador

*

* **/

Public static void main(String[] args) lanza una excepción {

JobConf conf = new JobConf(myhadoop control . class);

conf.set("mapred.job.tracker", "192.168.75. 130:9001");

conf.set jar("TT.jar");

system . out . println(" modo:"+conf . get(" mapred . job . tracker "));;

/**

*

*Configuración del trabajo 1

*Estadística de frecuencia de palabras

*

* **/

Trabajo Trabajo 1 = Nuevo trabajo ( conf, "join 1 ");

trabajo 1. setjarbyclass(myhadoop control. class);

trabajo 1. setmapperclass(summapper. class);

job 1 . sereducerclass(sumreduce . class);

job 1 . setmapoutputkeyclass(text . class); //La salida clave en la etapa de mapeo

setmapoutputvalueclass(int writable). .clase); //El valor de salida de la etapa de mapeo.

trabajo 1. setoutputkey class(int writable . class); //Salida clave en la etapa de reducción

trabajo 1. setoutputvalueclass(text . class); //Reducir el valor de salida del nivel; .

//Añadir contenedor de control

trabajo controlado ctrl trabajo 1 = nuevo trabajo controlado(conf);

ctrl trabajo 1. setjob(trabajo 1);

formato de entrada de archivo . addinputpath(trabajo 1, nueva ruta(" HDFS://192.168.75 . 130:9000/root/input "));

sistema de archivos fs = archivo system . get(conf);

Ruta op = nueva ruta (" HDFS://192.168.75 . 130:9000/root/op "); existe(op)){

fs.delete(op, true);

System.out.println("¡¡¡Esta ruta de salida existe y ha sido eliminada!!!");

}

formato de salida del archivo . setoutputpath(trabajo 1, op)/**=============; ==================================================== =========*/

/**

*

*Configuración del trabajo 2

*Ordenar

*

* **/

Trabajo trabajo2 = nuevo trabajo(conf, "join 2");

trabajo 2. setjarbyclass(myhadoop control. class);

//trabajo 2. setinputformatclass(textinputformat. class);

trabajo 2. setmapper class(sort mapper. class);

trabajo 2 . sereducerclass(sort reduce . class);

trabajo 2 . setsortcompratorclass(desc sort . class); // Ordenar por clave en orden descendente

trabajo 2 . setmapoutputkeyclass(int writable . class); //La salida clave de la etapa de mapeo

job 2. setmapoutputvalueclass(text. class); //El valor de salida de la etapa de mapeo.

trabajo 2. setoutputkey class(text. class); //Salida clave en la etapa de reducción

trabajo 2. setoutputvalueclass(int writable. class); //Reducir el valor de salida del nivel; .

//Tarea 2 Agregar contenedor de control

trabajo controlado ctrl trabajo 2 = nuevo trabajo controlado(conf);

ctrl trabajo 2. establecer trabajo(trabajo 2) );

/***

*

*Establece dependencias directas para múltiples trabajos.

*Escribe lo siguiente:

*Indica que el inicio del trabajo2 depende de la finalización del trabajo1.

*

* **/

ctrl trabajo 2. adddependingjob(ctrl trabajo 1);

//La ruta de entrada es el anterior La ruta de salida de un trabajo.

formato de entrada de archivo . addinputpath(trabajo 2, nueva ruta(" HDFS://192.168.75 . 130:9000/root/op/part * ");

sistema de archivos fs2 = sistema de archivos . get(conf);

Ruta op2 = nueva ruta (" HDFS://192.168.75 . 130:9000/root/op2 "); (fs2.exists(op2)){

fs2.delete(op2, true);

System.out.println("¡¡¡Esta ruta de salida existe y ha sido eliminada!!! ");

}

formato de salida del archivo . setoutputpath(trabajo 2, op2);

//system . exit(trabajo 2 . waitforcompletion(true) ? 0: 1);