Cómo leer y escribir HDFS usando la API de Java
paquete?com.wyc.hadoop.fs;
importar?java.io.BufferedInputStream;
importar?java.io.FileInputStream;
importar?java.io.FileNotFoundException;
importar?java.io.IOException;
importar?java.io.InputStream;
importar ?java.io.OutputStream;
importar?java.net.URI;
importar?java.util.Date;
importar?org.apache. hadoop.conf.Configuration;
importar?org.apache.hadoop.fs.BlockLocation;
importar?org.apache.hadoop.fs.FSDataOutputStream;
importar?org.apache.hadoop.fs.FileStatus;
importar?org.apache.hadoop.fs.FileSystem;
importar?org.apache.hadoop.fs.Path ;
importar?org.apache.hadoop.hdfs.DistributedFileSystem;
importar?org.apache.hadoop.hdfs.protocol.DatanodeInfo;
¿importar? org.apache.hadoop.io.IOUtils;
importar?org.apache.hadoop.util.Progressable;
¿clase pública?FSOptr?{
/** *?@param?args */
public?static?void?main(String[]?args)?throws?Exception?{
//?TODO? ¿Método?stub generado automáticamente
Configuración?conf?=?new?Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
//?Crear directorio de archivos
privado?static?void?makeDir(¿Configuración? conf)?throws?Exception?{
FileSystem?fs?=?FileSystem.get(conf);
Ruta?dir?=?new?Path("/user/hadoop /data/20140318");
boolean?result?=?fs.mkdirs(dir);//?Crear carpeta
System.out.println("make?dir ? :"?+?resultado);
//?Crea un archivo y escribe el contenido
Path?dst?=?new?Path("/user/hadoop/data / 20140318/tmp");
<p>byte[]?buff?=?"hola,hadoop!".getBytes();
FSDataOutputStream?outputStream?=?fs.create(dst);
outputStream. write(buff,?0,?buff.length);
outputStream.close();
FileStatus?files[]?=?fs.listStatus(dst); p>
p>
¿para?(FileStatus?archivo?:?archivos)?{
System.out.println(file.getPath());
}
fs.close();
}
//?Renombrar archivo
privado?static?void?rename(Configuración ?conf)? throws?Exception?{
FileSystem?fs?=?FileSystem.get(conf);
Ruta?oldName?=?new?Path("/usuario/ hadoop/data/ 20140318/1.txt");
Ruta?newName?=?new?Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName,?newName);
FileStatus?files[]?=?fs.listStatus(new?Path(
"/user/hadoop/data/ 20140318")) ;
para?(FileStatus?file?:?files)?{
System.out.println(file.getPath());
}
fs.close();
}
//?Eliminar archivos
@SuppressWarnings("deprecation")
privado?static?void?delete(Configuration?conf)?throws?Exception?{
FileSystem?fs?=?FileSystem.get(conf);
¿Ruta?=?new?Path("/user/hadoop/data/20140318");
if?(fs.isDirectory(ruta))?{
FileStatus?files[ ]?=?fs.listStatus(ruta);
para?(FileStatus?file?:?files)?{
fs.delete(file.getPath( ));
}
}?else?{
fs.delete(ruta);
}
// ?O
fs.delete(ruta,?true
fs.close();
}
/** *?Descargar, descargar el archivo hdfs al disco local*? *?@param?localSrc1 *La dirección del archivo local, es decir, la ruta del archivo*?@param?hdfsSrc1 *La dirección del archivo almacenado en hdfs*/
público?boolean?sendFromHdfs(S
tring?hdfsSrc1,?String?localSrc1)?{
Configuración?conf?=?new?Configuration();
Sistema de archivos?fs?=?null;
intentar?{
fs?=?FileSystem.get(URI.create(hdfsSrc1),?conf);
Ruta?hdfs_path?=?new?Path(hdfsSrc1) ;
Ruta?local_path?=?new?Path(localSrc1);
fs.copyToLocalFile(hdfs_path,?local_path;
return?true;
}?catch?(IOException?e)?{
e.printStackTrace();
}
return?false;
}
/** *?Cargar, copiar el archivo local al sistema hdfs*? *?@param?localSrc *Dirección del archivo local, es decir, la ruta del archivo *?@ param?hdfsSrc *Dirección del archivo almacenado en hdfs*/
public?boolean?sendToHdfs1(String?localSrc,?String?hdfsSrc)?{
InputStream?in;
¿intentar?{
in?=?new?BufferedInputStream(new?FileInputStream(localSrc));
Configuración?conf?=?new?Configuration() ;/ /?Obtener el objeto de configuración
FileSystem?fs;?//?Sistema de archivos
probar?{
fs?=?FileSystem.get( URI.create (hdfsSrc),?conf);
//?Flujo de salida, crear un flujo de salida
OutputStream?out?=?fs.create(new?Path(hdfsSrc) ),
new?Progressable()?{
//?Anular el método de progreso
public?void?progress()?{
//System.out.println("¡Terminamos de cargar un archivo con un tamaño y capacidad de búfer establecidos! ");
}
});
//?Conecte dos flujos para formar un canal de modo que el flujo de entrada transmita datos al flujo de salida,
IOUtils.copyBytes(in,?out,?10240,?true);?//?in es el objeto de flujo de entrada, out es el objeto de flujo de salida, 4096 es el tamaño del búfer y verdadero significa cerrar la transmisión después de cargar p>
return?true;
}?catch?(IOException?e)?{
e.printStackTrace();
}
p>
}?catch?(FileNotFoundException?e)?{
e.printStackTrace();
}
return?false;
p>
}
/** *?Move*? *?La ruta donde se almacenó originalmente @param?old_st *?@param?new_stLa ruta a la que se movió*/
public?b
oolean?moveFileName(String?old_st,?String?new_st)?{
¿intentar?{
//?Descargar al servidor local
boolean?down_flag ? =?sendFromHdfs(old_st,?"/home/hadoop/documents/temp");
Configuración?conf?=?new?Configuration();
FileSystem?fs? = ?null;
//?Eliminar archivo fuente
intenta?{
fs?=?FileSystem.get(URI.create(old_st),? conf );
Ruta?hdfs_path?=?new?Path(old_st);
fs.delete(hdfs_path);
}?catch?(IOException ? e)?{
e.printStackTrace();
}
//?Transferir localmente desde el servidor a la nueva ruta
new_st ?=?new_st?+?old_st.substring(old_st.lastIndexOf("/"));
boolean?uplod_flag?=?sendToHdfs1("/home/hadoop/documentation/temp", ?new_st)
si?(down_flag?&?uplod_flag)?{
return?true;
}
}? ¿captura? (¿Excepción?e)?{
e.printStackTrace();
}
return?false;
}
//?copiar archivos locales a hdfs
private?static?void?CopyFromLocalFile(Configuration?conf)?throws?Exception?{
FileSystem?fs? =?FileSystem .get(conf);
Ruta?src?=?new?Path("/home/hadoop/word.txt");
Ruta?dst?= ?new? Ruta("/user/hadoop/data/");
fs.copyFromLocalFile(src,?dst);
fs.close();
}
//?Obtener todos los subdirectorios y subarchivos en el directorio dado
private?static?void?getAllChildFile(Configuration?conf)?throws?Exception?{ p>
FileSystem?fs?=?FileSystem.get(conf);
Ruta?path?=?new?Path("/user/hadoop");
getFile(ruta,?fs);
}
privado?estático?void?getFile(Ruta?ruta,?FileSystem?fs)lanza?Excepción?{
FileStatus[]?fileStatus?=?fs.listStatus(ruta);
for?(int?i?=?0;?i?
us.length;?i++)?{
if?(fileStatus[i].isDir())?{
Ruta?p?=?new?Path(fileStatus[i ].getPath().toString());
getFile(p,?fs);
}?else?{
System.out.println (fileStatus[i].getPath().toString());
}
}
}
//Juzga el archivo ¿Existe?
privado?static?boolean?isExist(Configuration?conf,String?path)throws?Exception{
FileSystem?fileSystem?=?FileSystem.get(conf) ;
return?fileSystem.exists(new?Path(path));
}
//Obtener todos los datos del nodo host del clúster HDFS
privado?static?void?getAllClusterNodeInfo(Configuration?conf)throws?Exception{
FileSystem?fs?=?FileSystem.get(conf);
DistributedFileSystem?hdfs ?= ?(DistributedFileSystem)fs;
DatanodeInfo[]?dataNodeStats?=?hdfs.getDataNodeStats();
String[]?names?=?new?String[dataNodeStats. longitud] ;
System.out.println("lista?de?todos?los?nodos?en?HDFS?cluster:");?//print?info
para (int ?i=0;?i?
nombres[i]?=?dataNodeStats[i].getHostName();
System.out.println(nombres[i]);?//print?info
}
}
//obtener?las?ubicaciones?de ?un ?archivo?en?HDFS
privado?estático?void?getFileLocation(Configuration?conf)throws?Exception{
FileSystem?fs?=?FileSystem.get(conf) ;
Ruta?f?=?new?Path("/user/cluster/dfs.txt");
FileStatus?filestatus?=?fs.getFileStatus(f);
p>
BlockLocation[]?blkLocations?=?fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int?blkCount?=?blkLocations. longitud;
for(int?i=0;?i?
S
tring[]?hosts?=?blkLocations[i].getHosts();
//¿Hacer?algo?con?the?block?hosts
System.out.println( hosts);
}
} //get?HDFS?file?last?modification?time
privado?static?void?getModificationTime(Configuration?conf )lanza?Excepción{
FileSystem?fs?=?FileSystem.get(conf);
Ruta?f?=?new?Path("/user/cluster/dfs. txt");
FileStatus?filestatus?=?fs.getFileStatus(f);
long?modificationTime?=?filestatus.getModificationTime();?//?medido?in ?milisegundos?desde?la?época
Fecha?d?=?new?Date(modificationTime);
System.out.println(d);
}
}