Red de conocimiento informático - Problemas con los teléfonos móviles - Error en la compilación del código fuente de Spark

Error en la compilación del código fuente de Spark

Analice el código fuente basado en spark1.3.1.

Spark master inicia el análisis del código fuente

1. Llame al método principal del master en start-master.sh.

def main(arg strings: Array[String]){

SignalLogger.register(log)

val conf = new SparkConf

val args = nuevos argumentos maestros(arg strings, conf)

Val (actor system,_,_,_)= startsystemandactor(args.host,args.port,args.webuiport,conf)// Iniciar el sistema y los actores.

actorSystem.awaitTermination()

}

2. Llame a startSystemAndActor para iniciar el sistema y crear elementos de ejecución.

def startSystemAndActor(

Host: String,

puerto: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = nuevo administrador de seguridad (conf)

val (actorSystem, puerto vinculado) = akkautils . createactorsystem(nombre del sistema, host, puerto, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf) [Master], host,boundPort, webUiPort, securityMgr, conf), actorName)

val time out = akkautils . Ask time out(conf)

val ports request = actor. (BoundPortsRequest)(tiempo de espera)

respuesta de puertos val = resultado de espera (solicitud de puertos, tiempo de espera). Como instancia de [BoundPortsResponse]

(actorSystem,boundPort, portsResponse.webUIPort, portsResponse.restPort)

3. Llame a AkkaUtils.createActorSystem para crear un ActorSystem.

def createActorSystem(

Nombre: Cadena,

Host: Cadena,

puerto: Int,

conf: SparkConf,

administrador de seguridad: administrador de seguridad): (ActorSystem, Int) = {

val startService: Int = gt; (ActorSystem, Int) = { puerto real = gt; ;

doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)

}

Utils.startServiceOnPort(puerto, servicio de inicio, configuración, nombre)

}

4. Llame a Utils.startServiceOnPort para iniciar el servicio en el puerto. Después de que la creación sea exitosa, llame a DocCreator System para crear ActorSystem.

5. Cree Actor después de crear ActorSystem con éxito.

6. Llame al constructor principal de Master y ejecute preStart().

1 y start-slaves.sh llaman al método principal de la clase Worker.

def main(arg strings: Array[String]){

SignalLogger.register(log)

val conf = new SparkConf

val args = nuevos argumentos de trabajador(arg strings, conf)

val (actorSystem, _) = startSystemAndActor(args. host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2. crear elemento ejecutivo.

def startSystemAndActor(

Host: String,

puerto: Int,

webUiPort: Int,

Núcleo: Int,

Memoria: Int,

URL maestra: Matriz[Cadena],

DirTrabajo: Cadena,

Número de trabajador: Opción[Int] = Ninguna,

conf: spark conf = new spark conf): (actor system, Int) = {

LocalSparkCluster ejecuta múltiples sistemas de actores locales sparkWorkerX

nombre del sistema val = "trabajador de chispa" número de trabajador (_.toString).

getOrElse(" ")

val actorName = "Trabajador "

val securityMgr =nuevo administrador de seguridad (conf)

val (actorSystem, puerto vinculado) = akkautils createactorsystem(nombre del sistema, host, puerto,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = URL maestras (master . toakkaurl(_, AkkaUtils. protocolo(actorSystem. )))

actor system .actor of(Props(clase de[Trabajador], host,boundPort, webUiPort, núcleos, memoria,

masterAkkaUrls, systemName, actorName, workDir, conf. , securityMgr), nombre = actorName)

(actorSystem,boundPort)

}

3. Llame a createActorSystem de AkkaUtils para crear ActorSystem.

def createActorSystem(

Nombre: Cadena,

Host: Cadena,

puerto: Int,

conf: SparkConf,

administrador de seguridad: administrador de seguridad): (ActorSystem, Int) = {

val startService: Int = gt; (ActorSystem, Int) = { puerto real = gt; ;

doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)

}

Utils.startServiceOnPort(puerto, servicio de inicio, configuración, nombre)

}

4. Después de crear ActorSystem, llame al constructor principal de Worker y ejecute el método preStart.

Anular def preStart() {

Assert(!Registered)

logInfo("Iniciando Spark Workers: d, con d núcleos, s RAM ") . Formato (

Host, puerto, núcleo, utilidad.

Cadena de megabytes (memoria)))

logInfo(s "Ejecutando la versión de Spark $ { org . Apache . Spark . Spark _ VERSION } ")

logInfo("Inicio de Spark : " sparkHome )

createWorkDir()

contexto .sistema de flujo de eventos.suscríbete(self, clase de[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

fuente de registro del sistema de métricas (. fuente del trabajador)

metricsSystem.start()

// Adjunte el controlador del servlet de métricas del trabajador a la interfaz de usuario web después de iniciar el sistema de métricas.

sistema de métricas. getservlethandlers. foreach(webui. adjuntar controlador)

}

5. Llame al método RegisterWithMaster para registrar el proceso de trabajo iniciado con el proceso principal. .

def RegisterWithMaster() {

// DisassociatedEvent puede activarse varias veces, así que no intente registrarse

//Si está programado un intento de registro sin finalizar .

El temporizador de reintento de registro coincide {

case Ninguno = gt

register=false

tryRegisterAllMasters()

Recuento de intentos de conexión = 0

registrationRetryTimer = Algún {

sistema de contexto . , self, RegisterWithMaster)

}

case Some(_)= gt;

logInfo("no generará otro registro con el intento del servidor maestro porque no es un "

"Intento ya programado ")

}

}

6. Llamar a tryRegisterAllMasters lo hará. Se envían mensajes de trabajo registrados al servidor maestro.

private def tryRegisterAllMasters(){

for(masterAkkaUrl lt; - masterAkkaUrls) {

logInfo("Conectando al servidor maestro " masterAkkaUrl " ... " )

val actor = contexto . selección de actor(masterAkkaUrl)

¡Actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)

}

}

7. lo ejecuta.

case RegisterWorker(id, trabajadorHost, trabajadorPort, núcleos, memoria, trabajadorUiPort, dirección pública) = gt

{

logInfo("Registro de procesos de trabajo; : d, con d núcleos, s RAM "). Formato (

workerHost, trabajadorPort, núcleos, Utils.megabytesToString(memoria)))

if (estado == RecoveryState. Standby) {

//ignorar , no envíes una respuesta

} else if(idtoworker . contains(id)){

Remitente! RegisterWorkerFailed("ID de trabajo duplicado ")

} En caso contrario {

val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPuerto, núcleos, memoria,

Enviar remitente , informe de trabajo, dirección pública)

if (registerWorker(trabajador)) {

motor de persistencia .agregar trabajador(trabajador)

¡Remitente! RegisteredWorker(masterUrl, masterWebUiUrl)

plan()

} else {

val dirección del trabajador = dirección del actor . >logWarning("Error en el registro del trabajador. Intentando enviar un mensaje a la misma "

"Dirección:" dirección de trabajo)

¡Remitente! RegisterWorkerFailed("Intento volver a registrar el programa de trabajo en la misma dirección: "

Dirección de trabajo)

}

}

}

8. Si falla, se devuelve el mensaje de error al trabajador. Si tiene éxito, se devuelve la información relevante del Maestro.

9. Se llama a la programación después de devolver el mensaje, pero como no hay ninguna aplicación, no se asignarán recursos en este momento.

En este punto, se ha iniciado todo el clúster de Spark.