Análisis del código fuente de Xstream
Spark master inicia el análisis del código fuente
1. Llame al método principal del master en start-master.sh.
def main(arg strings:Array[String]){
SignalLogger.register(log)
val conf =new SparkConf
val args = nuevos argumentos maestros(arg strings, conf)
Val (actor system,_,_,_)= startsystemandactor(args.host,args.port,args.webuiport,conf)// Iniciar el sistema y los actores.
actorSystem.awaitTermination()
}
2. Llame a startSystemAndActor para iniciar el sistema y crear elementos de ejecución.
def startSystemAndActor(
Host: String,
puerto: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = nuevo administrador de seguridad (conf)
val (actorSystem, puerto vinculado) = akkautils . createactorsystem(nombre del sistema, host, puerto, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf) [Master], host,boundPort, webUiPort, securityMgr, conf), actorName)
val time out = akkautils . Ask time out(conf)
val ports request = actor. (BoundPortsRequest)(tiempo de espera)
respuesta de puertos val = resultado de espera (solicitud de puertos, tiempo de espera). Como instancia de [BoundPortsResponse]
(actorSystem,boundPort, portsResponse.webUIPort, portsResponse.restPort)
3. Llame a AkkaUtils.createActorSystem para crear un ActorSystem.
def createActorSystem(
Nombre: String,
Moderador: String,
puerto: Int,
conf: SparkConf,
administrador de seguridad:administrador de seguridad):(ActorSystem, Int) = {
val startService:Int = & gt; (ActorSystem, Int) = { puerto real = & gt;
doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)
}
Utils.startServiceOnPort(puerto, servicio de inicio, configuración, nombre)
}
4. Llame a Utils.startServiceOnPort para iniciar el servicio en el puerto. Después de que la creación sea exitosa, llame a DocCreator System para crear ActorSystem.
5. Cree Actor después de crear ActorSystem con éxito.
6. Llame al constructor principal de Master y ejecute preStart().
1 y start-slaves.sh llaman al método principal de la clase Worker.
def main(arg strings:Array[String]){
SignalLogger.register(log)
val conf =new SparkConf
val args = nuevos argumentos de trabajador(arg strings, conf)
val (actorSystem, _) = startSystemAndActor(args. host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2. crear elemento ejecutivo.
def startSystemAndActor(
Host: String,
puerto: Int,
webUiPort: Int,
Núcleo: Int,
Memoria: Int,
URL maestra: Matriz[Cadena],
DirTrabajo: Cadena,
Número de trabajador: Option[Int] = Ninguna,
conf:spark conf = new spark conf):(actor system, Int) = {
LocalSparkCluster ejecuta múltiples sistemas de actores locales sparkWorkerX
nombre del sistema val = " trabajador de chispa "+ número de trabajador mapa (_.toString).
getOrElse(" ")
val actorName = "Trabajador "
val securityMgr =nuevo administrador de seguridad (conf)
val (actorSystem, puerto vinculado) = akkautils createactorsystem(nombre del sistema, host, puerto,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = URL maestras (master . toakkaurl(_, AkkaUtils. protocolo(actorSystem. )))
actor system .actor of(Props(clase de[Trabajador], host,boundPort, webUiPort, núcleos, memoria,
masterAkkaUrls, systemName, actorName, workDir, conf. , securityMgr), nombre = actorName)
(actorSystem,boundPort)
}
3. Llame a createActorSystem de AkkaUtils para crear ActorSystem.
def createActorSystem(
Nombre: String,
Moderador: String,
puerto: Int,
conf: SparkConf,
administrador de seguridad:administrador de seguridad):(ActorSystem, Int) = {
val startService:Int = & gt; (ActorSystem, Int) = { puerto real = & gt;
doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)
}
Utils.startServiceOnPort(puerto, servicio de inicio, configuración, name)
}
4. Después de crear ActorSystem, llame al constructor principal del Worker y ejecute el método preStart.
Anular def preStart() {
Aserción (! Registrada)
logInfo("Iniciando el trabajador Spark %s:%d, hay %d Kernel, %s RAM "). Formato (
Host, puerto, núcleo, utilidad.
Cadena de megabytes (memoria)))
logInfo(s "Ejecutando la versión de Spark $ { org . Apache . Spark . Spark _ VERSION } ")
logInfo("Inicio de Spark : " + sparkHome)
createWorkDir()
contexto del sistema. p>
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()
sistema de métricas. fuente del registro (fuente del trabajador)
metricsSystem.start()
// Adjunte el controlador del servlet de métricas del trabajador a la interfaz de usuario web después de iniciar el sistema de métricas.
sistema de métricas. getservlethandlers. foreach(webui. adjuntar controlador)
}
5. Llame al método RegisterWithMaster para registrar el proceso de trabajo iniciado con el proceso principal. .
def RegisterWithMaster() {
// DisassociatedEvent puede activarse varias veces, así que no intentes registrarte
//Si está programado un intento de registro sin finalizar .
Coincidencia del temporizador de reintento de registro {
case Ninguno = & gt
register=false
tryRegisterAllMasters()
Recuento de intentos de conexión = 0
registrationRetryTimer = Algún {
sistema de contexto . INTERVALO, self, registrarseConMaster)
}
case Some(_)= & gt;
logInfo("no generará otra solicitud al servidor maestro A Se realizó un intento de registro porque había un "+
" intento programado ")
}
}
6. Llame a tryRegisterAllMasters para enviar mensajes de trabajo registrados al servidor maestro.
private def tryRegisterAllMasters(){
for(masterAkkaUrl & lt;- masterAkkaUrls) {
logInfo("Conectar al servidor maestro "+ masterAkkaUrl +" . .. ")
val actor = contexto . selección de actor(masterAkkaUrl)
¡Actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)
}
}
7. lo ejecuta.
case RegisterWorker(id, trabajadorHost, trabajadorPort, núcleos, memoria, trabajadorUiPort, dirección pública) = & gt; %s:%d, tiene %d núcleos, %s RAM "). Formato (
workerHost, trabajadorPort, núcleos, Utils.megabytesToString(memoria)))
if (estado == RecoveryState. Standby) {
//ignorar , no envíes una respuesta
} else if(idtoworker . contains(id)){
Remitente! RegisterWorkerFailed("ID de trabajo duplicado ")
} En caso contrario {
val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPuerto, núcleos, memoria,
Enviar remitente , informe de trabajo, dirección pública)
if (registerWorker(trabajador)) {
motor de persistencia .agregar trabajador(trabajador)
¡Remitente! RegisteredWorker(masterUrl, masterWebUiUrl)
plan()
} else {
val dirección del trabajador = dirección del actor . >logWarning("Error en el registro del trabajador. Intentando enviar la misma "+
"dirección:"+dirección del trabajo)
¡Remitente! RegisterWorkerFailed("Intento volver a registrar el programa de trabajo en la misma dirección: "
+dirección de trabajo)
}
}
}
p>8. Si falla, el mensaje de falla se devuelve al trabajador. Si tiene éxito, se devuelve la información relevante del Maestro.
9. Se llama a la programación después de devolver el mensaje, pero como no hay ninguna aplicación, no se asignarán recursos en este momento.
En este punto, se ha iniciado todo el clúster Spark.