Cómo utilizar JobControl de Hadoop
Importar org .Apache .fs .
Importar org .Apache io . writable;
Importar org.Apache.Hadoop.io.text;
Importar org.Apache.Hadoop.io.writable comparador;
Importar org.Apache .Hadoop .MapReduce
Importar org. p>Importar organización. Apache Hadoop. .MapReduce.lib.control de trabajo.
importar .hadoop. .lib salida. formato de salida del archivo;
/**
* La versión de Hadoop es 1.2.
*Entorno JDK 1.6
* Utiliza la nueva API de ControlledJob+JobControl.
*Completa la tarea de combinación
*La primera tarea es contar la frecuencia de las palabras.
*La segunda tarea es ordenar en orden descendente.
*
*Si utiliza el trabajo de MapReduce para completarlo, debe ejecutar 2 tareas de MR.
*Pero si usamos JobControl+ControlledJob, se puede usar.
*Los trabajos relacionados con el tipo DAG se completan en clases.
*
*
* @autor Qin Dongliang
*
*
*
* ***/
Clase pública MyHadoopControl {
/***
*
*Mapper 1 del *trabajo MapReduce
*
* longwritetable1 representa el valor de la clave de entrada, que por defecto es el desplazamiento de posición del texto.
*El contenido específico de cada línea de texto 2
*El tipo de clave generada por el texto 3.
* texto 4 La salida del tipo de valor.
*
* */
Clase estática privada SumMapper extiende Mapper & ltLongWritable, Text, Text, IntWritable & gt{
Privado texto t = nuevo Texto();
privado int escribible uno = nuevo int escribible(1);
/**
*
*Frecuencia de palabra de salida de fase de mapa
*
*
* **/
@overlay
Mapa vacío protegido (clave LongWritable, valor de texto, contexto)
Lanza IOException, InterruptedException {
string data = tostring();
string. palabras[]= datos . split(";");
if(palabra[0]. trim()!=null){
t set(" +palabras[. 0]); //Asignar valor k
one . set(integer . parse int(words[1]);
context.write(t, one);
}
}
}
/**
Reductor 1 para trabajos de MapReduce
*Responsable de la acumulación y salida de frecuencia de palabras
*
* **/
La clase estática privada SumReduce extiende Reducer & ltText, IntWritable, IntWritable, Text & gt {
//Objeto de frecuencia de palabras de almacenamiento
private int writable iw = new int writable();
@overwrite
protected void reducir(Clave de texto, Iterable & ltIntWritable & gt valor, contexto)
Lanza IOException, InterruptedException {
int sum = 0;
for(IntWritable count: value){
sum+= count . get();//Frecuencia acumulada de palabras
}
iw .
context.write(iw, key); //Datos de salida
}
}
/**
Mapeador ordenado del trabajo 2 de MapReduce
*
* **/
La clase estática privada SortMapper extiende Mapper & ltLongWritable, Text, IntWritable, Text & gt {
int writable iw = new int writable(); //Frecuencia de palabras de almacenamiento
Texto privado t = new Text() //Texto de almacenamiento
@Override
El mapa vacío protegido (clave LongWritable, valor de texto, contexto de contexto) arroja IOException, InterruptedException {
String palabras[]=value.toString (). split(" ");
System.out.println("longitud de la matriz:"+palabras . longitud);
System.
out . println("Texto leído por mapa: "+value . tostring());
system out . = = = = >+palabras[1]);
if(palabras[0]!=null){
iw . ). trim()));
t.set(words[1].trim());
context.write(iw, t); , clasificación de claves predeterminada.
}
}
}
/**
*Reducer ordenado por trabajo MapReduce 2< / p>
*
* **/
La clase estática privada SortReduce extiende Reducer & ltIntWritable, Text, Text, IntWritable & gt{
/ * *
*
*Salida de contenido ordenado
*
* **/
@overwrite< / p>
Reducción de vacío protegido (clave IntWritable, Iterable y ltText y valor gt, contexto)
Lanza IOException, InterruptedException {
for(Text t: value){ p>
context.write(t, clave); //Salida ordenada k, v.
}
}
}
/***
* Componente de clasificación, en operación de clasificación, debes usar
* para ordenar por palabra clave en orden descendente.
*
* **/
La clase estática pública DescSort extiende WritableComparator{
public DescSort() {
super(IntWritable.class, true); //Registrar componente de clasificación
}
@override
Public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
int arg4, int arg5) {
return -super.compare(arg0, arg1, arg2, arg3, arg4, arg 5 ); //Tenga en cuenta que el signo menos se utiliza para completar el orden descendente.
}
@override
public int compare(objeto a, objeto b) {
return -super.compare(a, b); //Tenga en cuenta que el signo menos se utiliza para completar el orden descendente.
}
}
/**
*Categoría de controlador
*
* **/
Public static void main(String[] args) lanza una excepción {
JobConf conf = new JobConf(myhadoop control . class);
conf.set("mapred.job.tracker", "192.168.75. 130:9001");
conf.set jar("TT.jar");
system . out . println(" modo:"+conf . get(" mapred . job . tracker "));;
/**
*
*Configuración del trabajo 1
*Estadística de frecuencia de palabras
*
* **/
Trabajo Trabajo 1 = Nuevo trabajo ( conf, "join 1 ");
trabajo 1. setjarbyclass(myhadoop control. class);
trabajo 1. setmapperclass(summapper. class);
job 1 . sereducerclass(sumreduce . class);
job 1 . setmapoutputkeyclass(text . class); //La salida clave en la etapa de mapeo
setmapoutputvalueclass(int writable). .clase); //El valor de salida de la etapa de mapeo.
trabajo 1. setoutputkey class(int writable . class); //Salida clave en la etapa de reducción
trabajo 1. setoutputvalueclass(text . class); //Reducir el valor de salida del nivel; .
//Añadir contenedor de control
trabajo controlado ctrl trabajo 1 = nuevo trabajo controlado(conf);
ctrl trabajo 1. setjob(trabajo 1);
formato de entrada de archivo . addinputpath(trabajo 1, nueva ruta(" HDFS://192.168.75 . 130:9000/root/input "));
sistema de archivos fs = archivo system . get(conf);
Ruta op = nueva ruta (" HDFS://192.168.75 . 130:9000/root/op "); existe(op)){
fs.delete(op, true);
System.out.println("¡¡¡Esta ruta de salida existe y ha sido eliminada!!!");
}
formato de salida del archivo . setoutputpath(trabajo 1, op)/**=============; ==================================================== =========*/
/**
*
*Configuración del trabajo 2
*Ordenar
*
* **/
Trabajo trabajo2 = nuevo trabajo(conf, "join 2");
trabajo 2. setjarbyclass(myhadoop control. class);
//trabajo 2. setinputformatclass(textinputformat. class);
trabajo 2. setmapper class(sort mapper. class);
trabajo 2 . sereducerclass(sort reduce . class);
trabajo 2 . setsortcompratorclass(desc sort . class); // Ordenar por clave en orden descendente
trabajo 2 . setmapoutputkeyclass(int writable . class); //La salida clave de la etapa de mapeo
job 2. setmapoutputvalueclass(text. class); //El valor de salida de la etapa de mapeo.
trabajo 2. setoutputkey class(text. class); //Salida clave en la etapa de reducción
trabajo 2. setoutputvalueclass(int writable. class); //Reducir el valor de salida del nivel; .
//Tarea 2 Agregar contenedor de control
trabajo controlado ctrl trabajo 2 = nuevo trabajo controlado(conf);
ctrl trabajo 2. establecer trabajo(trabajo 2) );
/***
*
*Establece dependencias directas para múltiples trabajos.
*Escribe lo siguiente:
*Indica que el inicio del trabajo2 depende de la finalización del trabajo1.
*
* **/
ctrl trabajo 2. adddependingjob(ctrl trabajo 1);
//La ruta de entrada es el anterior La ruta de salida de un trabajo.
formato de entrada de archivo . addinputpath(trabajo 2, nueva ruta(" HDFS://192.168.75 . 130:9000/root/op/part * ");
sistema de archivos fs2 = sistema de archivos . get(conf);
Ruta op2 = nueva ruta (" HDFS://192.168.75 . 130:9000/root/op2 "); (fs2.exists(op2)){
fs2.delete(op2, true);
System.out.println("¡¡¡Esta ruta de salida existe y ha sido eliminada!!! ");
}
formato de salida del archivo . setoutputpath(trabajo 2, op2);
//system . exit(trabajo 2 . waitforcompletion(true) ? 0: 1);