Cómo extender InputFormat de Hadoop a otros delimitadores
En la práctica, a menudo nos encontramos con situaciones en las que los registros contienen varias líneas, como por ejemplo:
lt;docgt;
....
lt;/docgt;
lt;doc gt;
.. .
lt;/doc gt;
. En este punto, es necesario ampliar TextInputFormat para lograr este objetivo.
Echemos un vistazo primero a la implementación original:
Java
clase pública TextInputFormat extiende FileInputFormatlt; @Override
public RecordReaderlt; LongWritable, Textgt;
createRecordReader(InputSplit split,
TaskAttemptContext context) {
// De forma predeterminada, textinputformat.record .delimiter = '/n' (establecido en el archivo de configuración)
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter"); p>
byte[] recordDelimiterBytes = null;
if (null ! = delimitador)
recordDelimiterBytes = delimitador.getBytes();
return new LineRecordReader(recordDelimiterBytes);
}
@Override
booleano protegido isSplitable(contexto JobContext, archivo de ruta) {
códec CompressionCodec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
p>la clase pública TextInputFormat extiende FileInputFormat lt; LongWritable, Text gt; {
@Override
public RecordReader lt; p>
createRecordReader (división de InputSplit,
contexto TaskAttemptContext) {
// De forma predeterminada, textinputformat.record.delimiter = '/n' (establecido en el archivo de configuración)
Delimitador de cadena = contexto . getConfiguration() get (
"textinputformat.record.delimiter" );
byte [ ] recordDelimiterBytes =. nulo;
p>
si (nulo! = delimitador)
recordDelimiterBytes = delimitador
.getBytes ();
devuelve nuevo LineRecordReader (recordDelimiterBytes);
}
@Override
bosoleano protegido isSplitable (contexto JobContext, Archivo de ruta) {
CompressionCodec codec =
new CompressionCodecFactory (contexto .getConfiguration() .getCodec (archivo);
return codec == null; p>
p>
}
}
Según el código anterior, no es difícil encontrar que el carácter de nueva línea en realidad es generado por "textinputformat.record .delimiter"
Entonces tenemos una solución:
(1) Configure textinputformat.record.delimiter directamente en el Trabajo en "lt;/docgt;\n". Tal programa es más complicado y fácil. Afecta la ejecución normal de otros códigos.
(2) Hereda TextInputFormat y usa delimitadores personalizados al devolver LineRecordReader.
Este artículo utiliza el segundo método, el código es el siguiente:
Java
clase pública DocInputFormat extiende TextInputFormat {
estática privada final String RECORD_ DELIMITER = "lt;/docgt;\n";
@Override
public RecordReaderlt; Escribible, Textgt; createRecordReader(
InputSplit split, TaskAttemptContext tac) {
byte[] recordDelimiterBytes = null;
recordDelimiterBytes = RECORD_DELIMITER.getBytes();
devuelve nuevo LineRecordReader(recordDelimiterBytes);
}
@Override
public boolean isSplitable(contexto JobContext, archivo de ruta) {
códec CompressionCodec = new CompressionCodecFactory(
contexto .getConfiguration()).getCodec(archivo);
códec de retorno == null;
}
}
clase pública DocInputFormat extiende TextInputFormat {
cadena final estática privada RECORD_DELIMITER = "lt;/docgt;\n";
@Override
public RecordReader lt; , Texto gt ; createRecordReader (
InputSplit split, TaskAttemptContext tac ) {
byte [ ] recordDelimiterBytes = null ;
recordDelimiterBytes = RECORD_DELIMITER .getBytes ( );
devuelve nuevo LineRecordReader (recordDelimiterBytes);
}
@Override
public boolean isSplitable (contexto JobContext, archivo de ruta) { p>
CódecCompressionCodec = new CompressionCodecFactory (
context. getConfiguration()).
getCodec (file)
return codec == null
}
}
Cabe señalar que InputFormat es solo a Los archivos HDFS sin formato se dividen en registros de cadenas, y si su