Red de conocimiento informático - Problemas con los teléfonos móviles - Análisis del código fuente de Xstream

Análisis del código fuente de Xstream

Analice el código fuente basado en spark1.3.1.

Spark master inicia el análisis del código fuente

1. Llame al método principal del master en start-master.sh.

def main(arg strings:Array[String]){

SignalLogger.register(log)

val conf =new SparkConf

val args = nuevos argumentos maestros(arg strings, conf)

Val (actor system,_,_,_)= startsystemandactor(args.host,args.port,args.webuiport,conf)// Iniciar el sistema y los actores.

actorSystem.awaitTermination()

}

2. Llame a startSystemAndActor para iniciar el sistema y crear elementos de ejecución.

def startSystemAndActor(

Host: String,

puerto: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = nuevo administrador de seguridad (conf)

val (actorSystem, puerto vinculado) = akkautils . createactorsystem(nombre del sistema, host, puerto, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf) [Master], host,boundPort, webUiPort, securityMgr, conf), actorName)

val time out = akkautils . Ask time out(conf)

val ports request = actor. (BoundPortsRequest)(tiempo de espera)

respuesta de puertos val = resultado de espera (solicitud de puertos, tiempo de espera). Como instancia de [BoundPortsResponse]

(actorSystem,boundPort, portsResponse.webUIPort, portsResponse.restPort)

3. Llame a AkkaUtils.createActorSystem para crear un ActorSystem.

def createActorSystem(

Nombre: String,

Moderador: String,

puerto: Int,

conf: SparkConf,

administrador de seguridad:administrador de seguridad):(ActorSystem, Int) = {

val startService:Int = & gt; (ActorSystem, Int) = { puerto real = & gt;

doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)

}

Utils.startServiceOnPort(puerto, servicio de inicio, configuración, nombre)

}

4. Llame a Utils.startServiceOnPort para iniciar el servicio en el puerto. Después de que la creación sea exitosa, llame a DocCreator System para crear ActorSystem.

5. Cree Actor después de crear ActorSystem con éxito.

6. Llame al constructor principal de Master y ejecute preStart().

1 y start-slaves.sh llaman al método principal de la clase Worker.

def main(arg strings:Array[String]){

SignalLogger.register(log)

val conf =new SparkConf

val args = nuevos argumentos de trabajador(arg strings, conf)

val (actorSystem, _) = startSystemAndActor(args. host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2. crear elemento ejecutivo.

def startSystemAndActor(

Host: String,

puerto: Int,

webUiPort: Int,

Núcleo: Int,

Memoria: Int,

URL maestra: Matriz[Cadena],

DirTrabajo: Cadena,

Número de trabajador: Option[Int] = Ninguna,

conf:spark conf = new spark conf):(actor system, Int) = {

LocalSparkCluster ejecuta múltiples sistemas de actores locales sparkWorkerX

nombre del sistema val = " trabajador de chispa "+ número de trabajador mapa (_.toString).

getOrElse(" ")

val actorName = "Trabajador "

val securityMgr =nuevo administrador de seguridad (conf)

val (actorSystem, puerto vinculado) = akkautils createactorsystem(nombre del sistema, host, puerto,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = URL maestras (master . toakkaurl(_, AkkaUtils. protocolo(actorSystem. )))

actor system .actor of(Props(clase de[Trabajador], host,boundPort, webUiPort, núcleos, memoria,

masterAkkaUrls, systemName, actorName, workDir, conf. , securityMgr), nombre = actorName)

(actorSystem,boundPort)

}

3. Llame a createActorSystem de AkkaUtils para crear ActorSystem.

def createActorSystem(

Nombre: String,

Moderador: String,

puerto: Int,

conf: SparkConf,

administrador de seguridad:administrador de seguridad):(ActorSystem, Int) = {

val startService:Int = & gt; (ActorSystem, Int) = { puerto real = & gt;

doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)

}

Utils.startServiceOnPort(puerto, servicio de inicio, configuración, name)

}

4. Después de crear ActorSystem, llame al constructor principal del Worker y ejecute el método preStart.

Anular def preStart() {

Aserción (! Registrada)

logInfo("Iniciando el trabajador Spark %s:%d, hay %d Kernel, %s RAM "). Formato (

Host, puerto, núcleo, utilidad.

Cadena de megabytes (memoria)))

logInfo(s "Ejecutando la versión de Spark $ { org . Apache . Spark . Spark _ VERSION } ")

logInfo("Inicio de Spark : " + sparkHome)

createWorkDir()

contexto del sistema. p>

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

sistema de métricas. fuente del registro (fuente del trabajador)

metricsSystem.start()

// Adjunte el controlador del servlet de métricas del trabajador a la interfaz de usuario web después de iniciar el sistema de métricas.

sistema de métricas. getservlethandlers. foreach(webui. adjuntar controlador)

}

5. Llame al método RegisterWithMaster para registrar el proceso de trabajo iniciado con el proceso principal. .

def RegisterWithMaster() {

// DisassociatedEvent puede activarse varias veces, así que no intentes registrarte

//Si está programado un intento de registro sin finalizar .

Coincidencia del temporizador de reintento de registro {

case Ninguno = & gt

register=false

tryRegisterAllMasters()

Recuento de intentos de conexión = 0

registrationRetryTimer = Algún {

sistema de contexto . INTERVALO, self, registrarseConMaster)

}

case Some(_)= & gt;

logInfo("no generará otra solicitud al servidor maestro A Se realizó un intento de registro porque había un "+

" intento programado ")

}

}

6. Llame a tryRegisterAllMasters para enviar mensajes de trabajo registrados al servidor maestro.

private def tryRegisterAllMasters(){

for(masterAkkaUrl & lt;- masterAkkaUrls) {

logInfo("Conectar al servidor maestro "+ masterAkkaUrl +" . .. ")

val actor = contexto . selección de actor(masterAkkaUrl)

¡Actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)

}

}

7. lo ejecuta.

case RegisterWorker(id, trabajadorHost, trabajadorPort, núcleos, memoria, trabajadorUiPort, dirección pública) = & gt; %s:%d, tiene %d núcleos, %s RAM "). Formato (

workerHost, trabajadorPort, núcleos, Utils.megabytesToString(memoria)))

if (estado == RecoveryState. Standby) {

//ignorar , no envíes una respuesta

} else if(idtoworker . contains(id)){

Remitente! RegisterWorkerFailed("ID de trabajo duplicado ")

} En caso contrario {

val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPuerto, núcleos, memoria,

Enviar remitente , informe de trabajo, dirección pública)

if (registerWorker(trabajador)) {

motor de persistencia .agregar trabajador(trabajador)

¡Remitente! RegisteredWorker(masterUrl, masterWebUiUrl)

plan()

} else {

val dirección del trabajador = dirección del actor . >logWarning("Error en el registro del trabajador. Intentando enviar la misma "+

"dirección:"+dirección del trabajo)

¡Remitente! RegisterWorkerFailed("Intento volver a registrar el programa de trabajo en la misma dirección: "

+dirección de trabajo)

}

}

}

p>

8. Si falla, el mensaje de falla se devuelve al trabajador. Si tiene éxito, se devuelve la información relevante del Maestro.

9. Se llama a la programación después de devolver el mensaje, pero como no hay ninguna aplicación, no se asignarán recursos en este momento.

En este punto, se ha iniciado todo el clúster Spark.