Red de conocimiento informático - Material del sitio web - Cómo ejecutar trabajos de HadoopMR en MaxCompute

Cómo ejecutar trabajos de HadoopMR en MaxCompute

MaxCompute (anteriormente ODPS) tiene su propio modelo de programación e interfaz MapReduce. En pocas palabras, la entrada y salida de esta interfaz son tablas en MaxCompute y los datos procesados ​​​​se organizan en forma de registro. Puede describir bien el proceso de procesamiento de datos en Table. Sin embargo, en comparación con Hadoop en la comunidad, la interfaz de programación es bastante diferente. Si los usuarios de Hadoop desean migrar el trabajo MR de Hadoop original a la ejecución MR de MaxCompute, deben reescribir el código MR, usar la interfaz MaxCompute para compilar y depurar, y luego convertirlo en un paquete Jar después de que se ejecute normalmente antes de ejecutarlo en el Plataforma MaxCompute. Este proceso es muy engorroso y requiere mucha mano de obra de desarrollo y pruebas. Sería un método ideal si el código original de Hadoop MR pudiera ejecutarse en la plataforma MaxCompute sin ninguna modificación o con una pequeña cantidad de modificación.

Ahora la plataforma MaxCompute proporciona una herramienta de adaptación de HadoopMR a MaxCompute MR, que ha logrado hasta cierto punto compatibilidad a nivel binario de los trabajos de Hadoop MR, es decir, los usuarios pueden especificar Con alguna configuración, puede tomar el Paquete jar MR que se ejecutó originalmente en Hadoop y lo ejecutó directamente en MaxCompute. Actualmente, el complemento se encuentra en la etapa de prueba y actualmente no puede admitir comparadores definidos por el usuario ni tipos de claves personalizados. A continuación se utiliza el programa WordCount como ejemplo para presentar el uso básico de este complemento.

Los pasos básicos para utilizar este complemento para ejecutar un trabajo de HadoopMR en la plataforma MaxCompute son los siguientes:

1. Descargue el complemento HadoopMR.

Descargue el complemento, el nombre del paquete es hadoop2openmr- 1.0.jar. Tenga en cuenta que este jar ya contiene las dependencias relacionadas de la versión hadoop-2.7.2. No incluya dependencias de hadoop en el paquete jar del trabajo para evitar conflictos de versiones. .

2. Prepare el paquete jar del programa HadoopMR.

Compile y exporte el paquete jar WordCount: wordcount_test.jar. El código fuente del programa wordcount es el siguiente:

<. p>paquete com.aliyun.odps.mapred.example.hadoop;

importar org.apache.hadoop.conf.Configuration;

importar org.apache.hadoop.fs.Path ;

importar org.apache.hadoop.io.IntWritable;

importar org.apache.hadoop.io.Text;

importar org.apache.hadoop .mapreduce.Job;

importar org.apache.hadoop.mapreduce.Mapper;

importar org.apache.hadoop.mapreduce.Reducer; .apache.hadoop.mapreduce.lib.input.FileInputFormat;

importar org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importar java.io.IOException;

importar java.util.StringTokenizer;

clase pública WordCount {

clase pública estática TokenizerMapper

extiende Mapperlt; , IntWritablegt; {

IntWritable estático final privado uno = nuevo IntWritable(1);

palabra de texto privado = nuevo texto();

mapa vacío público ( Clave de objeto, valor de texto, contexto de contexto

) arroja IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

mientras ( itr.hasMoreTokens() ) {

palabra.set(itr.nextToken());

context.write(palabra, uno);

}

}

}

clase estática pública IntSumReducer

extiende Reducerlt; Text, IntWritable, Text, IntWritablegt; p>resultado privado de IntWritable = nuevo IntWritable();

reducción de vacío público

e(Clave de texto, Iterablelt; IntWritablegt; valores,

Contexto

) arroja IOException, InterruptedException {

int sum = 0; p>for (IntWritable val: valores) {

suma = val.get();

}

resultado.set(suma);

context.write(clave, resultado);

}

}

public static void main(String[] args) lanza una excepción {

Configuración conf = nueva Configuración();

Trabajo trabajo = Job.getInstance(conf, "recuento de palabras");

job.setJarByClass(WordCount.class );

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(trabajo, nueva ruta(args[) 0]));

FileOutputFormat.setOutputPath(trabajo, nueva ruta(args[1]));

System.exit(job.waitForCompletion(true)? 0: 1) ;

}

}

3. Preparación de datos de prueba

Crear tabla de entrada y tabla de salida

crear tabla si no existe wc_in(cadena de línea);

crear tabla si no existe wc_out(cadena de clave, cnt bigint);

Importar datos a la tabla de entrada a través del túnel

El contenido de datos del archivo de texto data.txt que se importará es el siguiente:

hola maxcompute

hola mapreduce

Por ejemplo, puede utilice el siguiente comando para importar data.txt Importe datos a wc_in,

túnel de carga data.txt wc_in;

4 Prepare la configuración de la relación de mapeo entre la tabla y la ruta del archivo hdfs<. /p>

El archivo de configuración se llama: wordcount-table-res.conf

{

"file:/foo": {

" resolver": {

"resolver": "c.TextFileResolver"

,

"properties": {

"text.resolver.columns.combine.enable": "true",

"text.resolver.seperator" : "\t"

}

},

"tableInfos": [

{

" tblName": "wc_in",

"partSpec": {},

"label": "__default__"

}

],

"matchMode": "exacto"

},

"file:/bar": {

"resolver" : {

"resolver": "openmr.resolver.BinaryFileResolver",

"properties": {

"binary.resolver.input.key.class ": "org.apache.hadoop.io.Text",

"binary.resolver.input.value.class": "org.apache.hadoop.io.LongWritable"

}

},

"tableInfos": [

{

"tblName": "wc_out",

"partSpec": {},

"label": "__default__"

}

],

"matchMode ": "difuso"

}

}