Red de conocimiento informático - Material del sitio web - Cómo extender InputFormat de Hadoop a otros delimitadores

Cómo extender InputFormat de Hadoop a otros delimitadores

En Hadoop, el TextInputFormat de uso común utiliza nuevas líneas como separadores de registros.

En la práctica, a menudo nos encontramos con situaciones en las que los registros contienen varias líneas, como por ejemplo:

lt;docgt;

....

lt;/docgt;

lt;doc gt;

.. .

lt;/doc gt;

. En este punto, es necesario ampliar TextInputFormat para lograr este objetivo.

Echemos un vistazo primero a la implementación original:

Java

clase pública TextInputFormat extiende FileInputFormatlt; @Override

public RecordReaderlt; LongWritable, Textgt;

createRecordReader(InputSplit split,

TaskAttemptContext context) {

// De forma predeterminada, textinputformat.record .delimiter = '/n' (establecido en el archivo de configuración)

String delimiter = context.getConfiguration().get(

"textinputformat.record.delimiter");

byte[] recordDelimiterBytes = null;

if (null ! = delimitador)

recordDelimiterBytes = delimitador.getBytes();

return new LineRecordReader(recordDelimiterBytes);

}

@Override

booleano protegido isSplitable(contexto JobContext, archivo de ruta) {

códec CompressionCodec =

new CompressionCodecFactory(context.getConfiguration()).getCodec(file);

return codec == null;

}

}

p>

la clase pública TextInputFormat extiende FileInputFormat lt; LongWritable, Text gt; {

@Override

public RecordReader lt; p>

createRecordReader (división de InputSplit,

contexto TaskAttemptContext) {

// De forma predeterminada, textinputformat.record.delimiter = '/n' (establecido en el archivo de configuración)

Delimitador de cadena = contexto . getConfiguration() get (

"textinputformat.record.delimiter" );

byte [ ] recordDelimiterBytes =. nulo;

p>

si (nulo! = delimitador)

recordDelimiterBytes = delimitador

.getBytes ();

devuelve nuevo LineRecordReader (recordDelimiterBytes);

}

@Override

bosoleano protegido isSplitable (contexto JobContext, Archivo de ruta) {

CompressionCodec codec =

new CompressionCodecFactory (contexto .getConfiguration() .getCodec (archivo);

return codec == null;

p>

}

}

Según el código anterior, no es difícil encontrar que el carácter de nueva línea en realidad es generado por "textinputformat.record .delimiter"

Entonces tenemos una solución:

(1) Configure textinputformat.record.delimiter directamente en el Trabajo en "lt;/docgt;\n". Tal programa es más complicado y fácil. Afecta la ejecución normal de otros códigos.

(2) Hereda TextInputFormat y usa delimitadores personalizados al devolver LineRecordReader.

Este artículo utiliza el segundo método, el código es el siguiente:

Java

clase pública DocInputFormat extiende TextInputFormat {

estática privada final String RECORD_ DELIMITER = "lt;/docgt;\n";

@Override

public RecordReaderlt; Escribible, Textgt; createRecordReader(

InputSplit split, TaskAttemptContext tac) {

byte[] recordDelimiterBytes = null;

recordDelimiterBytes = RECORD_DELIMITER.getBytes();

devuelve nuevo LineRecordReader(recordDelimiterBytes);

}

@Override

public boolean isSplitable(contexto JobContext, archivo de ruta) {

códec CompressionCodec = new CompressionCodecFactory(

contexto .getConfiguration()).getCodec(archivo);

códec de retorno == null;

}

}

clase pública DocInputFormat extiende TextInputFormat {

cadena final estática privada RECORD_DELIMITER = "lt;/docgt;\n";

@Override

public RecordReader lt; , Texto gt ; createRecordReader (

InputSplit split, TaskAttemptContext tac ) {

byte [ ] recordDelimiterBytes = null ;

recordDelimiterBytes = RECORD_DELIMITER .getBytes ( );

devuelve nuevo LineRecordReader (recordDelimiterBytes);

}

@Override

public boolean isSplitable (contexto JobContext, archivo de ruta) {

CódecCompressionCodec = new CompressionCodecFactory (

context. getConfiguration()).

getCodec (file)

return codec == null

}

}

Cabe señalar que InputFormat es solo a Los archivos HDFS sin formato se dividen en registros de cadenas, y si su contiene otros datos estructurados, deberá implementar la lógica empresarial relacionada con la desalinización en el mapa para manejarlos.