Red de conocimiento informático - Material del sitio web - Cómo leer y escribir HDFS usando la API de Java

Cómo leer y escribir HDFS usando la API de Java

paquete?com.wyc.hadoop.fs;

importar?java.io.BufferedInputStream;

importar?java.io.FileInputStream;

importar?java.io.FileNotFoundException;

importar?java.io.IOException;

importar?java.io.InputStream;

importar ?java.io.OutputStream;

importar?java.net.URI;

importar?java.util.Date;

importar?org.apache. hadoop.conf.Configuration;

importar?org.apache.hadoop.fs.BlockLocation;

importar?org.apache.hadoop.fs.FSDataOutputStream;

importar?org.apache.hadoop.fs.FileStatus;

importar?org.apache.hadoop.fs.FileSystem;

importar?org.apache.hadoop.fs.Path ;

importar?org.apache.hadoop.hdfs.DistributedFileSystem;

importar?org.apache.hadoop.hdfs.protocol.DatanodeInfo;

¿importar? org.apache.hadoop.io.IOUtils;

importar?org.apache.hadoop.util.Progressable;

¿clase pública?FSOptr?{

/** *?@param?args */

public?static?void?main(String[]?args)?throws?Exception?{

//?TODO? ¿Método?stub generado automáticamente

Configuración?conf?=?new?Configuration();

makeDir(conf);

rename(conf);

delete(conf);

}

//?Crear directorio de archivos

privado?static?void?makeDir(¿Configuración? conf)?throws?Exception?{

FileSystem?fs?=?FileSystem.get(conf);

Ruta?dir?=?new?Path("/user/hadoop /data/20140318");

boolean?result?=?fs.mkdirs(dir);//?Crear carpeta

System.out.println("make?dir ? :"?+?resultado);

//?Crea un archivo y escribe el contenido

Path?dst?=?new?Path("/user/hadoop/data / 20140318/tmp");

<

p>byte[]?buff?=?"hola,hadoop!".getBytes();

FSDataOutputStream?outputStream?=?fs.create(dst);

outputStream. write(buff,?0,?buff.length);

outputStream.close();

FileStatus?files[]?=?fs.listStatus(dst);

p>

¿para?(FileStatus?archivo?:?archivos)?{

System.out.println(file.getPath());

}

fs.close();

}

//?Renombrar archivo

privado?static?void?rename(Configuración ?conf)? throws?Exception?{

FileSystem?fs?=?FileSystem.get(conf);

Ruta?oldName?=?new?Path("/usuario/ hadoop/data/ 20140318/1.txt");

Ruta?newName?=?new?Path("/user/hadoop/data/20140318/2.txt");

fs.rename(oldName,?newName);

FileStatus?files[]?=?fs.listStatus(new?Path(

"/user/hadoop/data/ 20140318")) ;

para?(FileStatus?file?:?files)?{

System.out.println(file.getPath());

}

fs.close();

}

//?Eliminar archivos

@SuppressWarnings("deprecation")

privado?static?void?delete(Configuration?conf)?throws?Exception?{

FileSystem?fs?=?FileSystem.get(conf);

¿Ruta?=?new?Path("/user/hadoop/data/20140318");

if?(fs.isDirectory(ruta))?{

FileStatus?files[ ]?=?fs.listStatus(ruta);

para?(FileStatus?file?:?files)?{

fs.delete(file.getPath( ));

}

}?else?{

fs.delete(ruta);

}

// ?O

fs.delete(ruta,?true

fs.close();

}

/** *?Descargar, descargar el archivo hdfs al disco local*? *?@param?localSrc1 *La dirección del archivo local, es decir, la ruta del archivo*?@param?hdfsSrc1 *La dirección del archivo almacenado en hdfs*/

público?boolean?sendFromHdfs(S

tring?hdfsSrc1,?String?localSrc1)?{

Configuración?conf?=?new?Configuration();

Sistema de archivos?fs?=?null;

intentar?{

fs?=?FileSystem.get(URI.create(hdfsSrc1),?conf);

Ruta?hdfs_path?=?new?Path(hdfsSrc1) ;

Ruta?local_path?=?new?Path(localSrc1);

fs.copyToLocalFile(hdfs_path,?local_path;

return?true;

}?catch?(IOException?e)?{

e.printStackTrace();

}

return?false;

}

/** *?Cargar, copiar el archivo local al sistema hdfs*? *?@param?localSrc *Dirección del archivo local, es decir, la ruta del archivo *?@ param?hdfsSrc *Dirección del archivo almacenado en hdfs*/

public?boolean?sendToHdfs1(String?localSrc,?String?hdfsSrc)?{

InputStream?in;

¿intentar?{

in?=?new?BufferedInputStream(new?FileInputStream(localSrc));

Configuración?conf?=?new?Configuration() ;/ /?Obtener el objeto de configuración

FileSystem?fs;?//?Sistema de archivos

probar?{

fs?=?FileSystem.get( URI.create (hdfsSrc),?conf);

//?Flujo de salida, crear un flujo de salida

OutputStream?out?=?fs.create(new?Path(hdfsSrc) ),

new?Progressable()?{

//?Anular el método de progreso

public?void?progress()?{

//System.out.println("¡Terminamos de cargar un archivo con un tamaño y capacidad de búfer establecidos! ");

}

});

//?Conecte dos flujos para formar un canal de modo que el flujo de entrada transmita datos al flujo de salida,

IOUtils.copyBytes(in,?out,?10240,?true);?//?in es el objeto de flujo de entrada, out es el objeto de flujo de salida, 4096 es el tamaño del búfer y verdadero significa cerrar la transmisión después de cargar

return?true;

}?catch?(IOException?e)?{

e.printStackTrace();

}

p>

}?catch?(FileNotFoundException?e)?{

e.printStackTrace();

}

return?false;

p>

}

/** *?Move*? *?La ruta donde se almacenó originalmente @param?old_st *?@param?new_stLa ruta a la que se movió*/

public?b

oolean?moveFileName(String?old_st,?String?new_st)?{

¿intentar?{

//?Descargar al servidor local

boolean?down_flag ? =?sendFromHdfs(old_st,?"/home/hadoop/documents/temp");

Configuración?conf?=?new?Configuration();

FileSystem?fs? = ?null;

//?Eliminar archivo fuente

intenta?{

fs?=?FileSystem.get(URI.create(old_st),? conf );

Ruta?hdfs_path?=?new?Path(old_st);

fs.delete(hdfs_path);

}?catch?(IOException ? e)?{

e.printStackTrace();

}

//?Transferir localmente desde el servidor a la nueva ruta

new_st ?=?new_st?+?old_st.substring(old_st.lastIndexOf("/"));

boolean?uplod_flag?=?sendToHdfs1("/home/hadoop/documentation/temp", ?new_st)

si?(down_flag?&?uplod_flag)?{

return?true;

}

}? ¿captura? (¿Excepción?e)?{

e.printStackTrace();

}

return?false;

}

//?copiar archivos locales a hdfs

private?static?void?CopyFromLocalFile(Configuration?conf)?throws?Exception?{

FileSystem?fs? =?FileSystem .get(conf);

Ruta?src?=?new?Path("/home/hadoop/word.txt");

Ruta?dst?= ?new? Ruta("/user/hadoop/data/");

fs.copyFromLocalFile(src,?dst);

fs.close();

}

//?Obtener todos los subdirectorios y subarchivos en el directorio dado

private?static?void?getAllChildFile(Configuration?conf)?throws?Exception?{

FileSystem?fs?=?FileSystem.get(conf);

Ruta?path?=?new?Path("/user/hadoop");

getFile(ruta,?fs);

}

privado?estático?void?getFile(Ruta?ruta,?FileSystem?fs)lanza?Excepción?{

FileStatus[]?fileStatus?=?fs.listStatus(ruta);

for?(int?i?=?0;?i?

us.length;?i++)?{

if?(fileStatus[i].isDir())?{

Ruta?p?=?new?Path(fileStatus[i ].getPath().toString());

getFile(p,?fs);

}?else?{

System.out.println (fileStatus[i].getPath().toString());

}

}

}

//Juzga el archivo ¿Existe?

privado?static?boolean?isExist(Configuration?conf,String?path)throws?Exception{

FileSystem?fileSystem?=?FileSystem.get(conf) ;

return?fileSystem.exists(new?Path(path));

}

//Obtener todos los datos del nodo host del clúster HDFS

privado?static?void?getAllClusterNodeInfo(Configuration?conf)throws?Exception{

FileSystem?fs?=?FileSystem.get(conf);

DistributedFileSystem?hdfs ?= ?(DistributedFileSystem)fs;

DatanodeInfo[]?dataNodeStats?=?hdfs.getDataNodeStats();

String[]?names?=?new?String[dataNodeStats. longitud] ;

System.out.println("lista?de?todos?los?nodos?en?HDFS?cluster:");?//print?info

para (int ?i=0;?i?

nombres[i]?=?dataNodeStats[i].getHostName();

System.out.println(nombres[i]);?//print?info

}

}

//obtener?las?ubicaciones?de ?un ?archivo?en?HDFS

privado?estático?void?getFileLocation(Configuration?conf)throws?Exception{

FileSystem?fs?=?FileSystem.get(conf) ;

Ruta?f?=?new?Path("/user/cluster/dfs.txt");

FileStatus?filestatus?=?fs.getFileStatus(f);

p>

BlockLocation[]?blkLocations?=?fs.getFileBlockLocations(filestatus,0,filestatus.getLen());

int?blkCount?=?blkLocations. longitud;

for(int?i=0;?i?

S

tring[]?hosts?=?blkLocations[i].getHosts();

//¿Hacer?algo?con?the?block?hosts

System.out.println( hosts);

}

} //get?HDFS?file?last?modification?time

privado?static?void?getModificationTime(Configuration?conf )lanza?Excepción{

FileSystem?fs?=?FileSystem.get(conf);

Ruta?f?=?new?Path("/user/cluster/dfs. txt");

FileStatus?filestatus?=?fs.getFileStatus(f);

long?modificationTime?=?filestatus.getModificationTime();?//?medido?in ?milisegundos?desde?la?época

Fecha?d?=?new?Date(modificationTime);

System.out.println(d);

}

}