Cómo ejecutar un trabajo mapreduce en el sistema de archivos localAnálisis del código fuente de envío del trabajo mapreduce Cuando escribimos un programa mapreduce, primero debemos escribir la función map y la función reducir. Después de completar la escritura del asignador y el reductor, configure el trabajo; una vez completada la configuración del trabajo, llame al método job.submit () para completar el envío del trabajo. Entonces, pensemos en ello, ¿cómo completa finalmente el trabajo el envío del trabajo (trabajo)? Si lo piensa en términos generales, el trabajo debe estar conectado al jobtracker de alguna manera, porque solo de esta manera el trabajo puede enviarse al jobtracker para su programación y ejecución. También es importante considerar cómo entregar mapeadores y reductores (es decir, archivos jar que escribimos nosotros mismos) al jobtracker. Una de las formas más sencillas e intuitivas es transferirlos directamente al jobtracker a través de un socket, que a su vez los transfiere al tasktracker (nota: mapreduce no utiliza este método). El tercer factor a considerar es cómo Jobtracker personaliza la configuración del trabajo en tareas de mapa y tareas reducidas. Primero, el envío del trabajo se completa a través de jobclient en la clase job y, finalmente, jobclient completa la función de interacción con jobtracker. En el constructor de jobclient, el establecimiento de la conexión con jobtracker se completa llamando a rpc. Después de completar el establecimiento, el cliente de trabajo primero debe determinar la ubicación de los archivos relacionados con el trabajo (mencionamos anteriormente que mapreduce no usa jar, es decir, la forma en que otros archivos se transfieren a jobtracker, pero guarda estos archivos en hdfs y puede hacerlo de acuerdo con al usuario Se almacenan varias copias de la configuración). En cuanto a la asignación del directorio de almacenamiento, se asigna llamando a rpc para acceder a jobtracker. Veamos el código de asignación de jobtracker: ruta final stagingrootdir = new path(conf.get("mapreduce.jobtracker.staging.root.dir" ," /tmp/hadoop/mapred/staging")); sistema de archivos final fs = stagingrootdir.getfilesystem(conf); return fs.makequalified(new path(stagingrootdir, usuario "/.staging")).string(); el stagingrootdir generado por el código anterior es el directorio donde se almacenan todos los archivos de trabajo, un directorio raíz, y no se refiere únicamente al trabajo actual. jobtracker solicitará un jobid (a través de rpc, tenga en cuenta que toda la comunicación entre jobclient y jobtracker es Básicamente, todo se realiza a través de rpc. Si no se explica a continuación, los famosos también deberían hacerlo en este caso).
La siguiente es la implementación específica de jobsubmitclient.getnewjobid: publicsynchronized jobid getnewjobid() throws ioexception {returnnew jobid(gettrackeridentifier(), nextjobid);} Después de obtener el jobid, combine el jobid con el stagingrootdir anterior para formar el almacenamiento específico del trabajo. dirección del archivo. Una vez realizado todo este trabajo, jobclient almacenará los archivos relevantes en hdfs.