Red de conocimiento informático - Problemas con los teléfonos móviles - Error del trabajador de inicio de Spark, ¡responda!

Error del trabajador de inicio de Spark, ¡responda!

Análisis basado en el código fuente de Spark 1.3.1

Análisis del código fuente de inicio de Spark Master

1 Llame al método principal de master en start-master.sh, la llamada al método principal

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//startSystemAndActor

actorSystem.awaitTermination()

}

2. Llame a startSystemAndActor para iniciar el sistema y crear el rol

def startSystemAndActor(

host: Cadena,

puerto: int,

webUiPort.Int,

conf: SparkConf): (ActorSystem, Int, Int, Opción[Int] ) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem ,boundPort) = AkkaUtils.createActorSystem(systemName, host, puerto, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host,boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout ).asInstanceOf[BoundPortsResponse]

(actorSystem,boundPort, portsResponse. Llame a AkkaUtils.createActorSystem para crear un ActorSystem

def createActorSystem(

nombre: String,

host: String,

puerto: int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int => (ActorSy

madre, Int) = { actualPort =>

doCreateActorSystem(nombre, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(puerto, startService , conf, nombre)

}

4.startServiceOnPort inicia el servicio en el puerto y llama a doCreateActorSystem para crear ActorSystem después de que la creación sea exitosa

5 , ActorSystem se creó con éxito Luego cree el Actor

6. Llame al constructor principal del Master y ejecute preStart()

1. Clase de trabajador

def main(argStrings: Array [String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new WorkerArguments(argStrings , conf)

val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2. un rol

def startSystemAndActor(

host.String,

puerto: int,

webUiPort: Int,

núcleos: Int,

Memoria: Int,

masterUrls: Array[String],

workDir.String,

WorkerNumber : Option[Int] = Ninguno,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// LocalSparkCluster ejecuta múltiples sistemas de roles locales sparkWorkerX

val systemName = "sparkWorker" + trabajadorNumber .map(_.toString).getOrElse("")

val actorName = "Trabajador"

val securityMgr = new SecurityManager( conf)

val ( actorSystem,boundPort) = AkkaUtils.createActorSystem( systemName, host, puerto,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_,

AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host,boundPort, webUiPort, núcleos, memoria,

masterAkkaUrls, systemName, actorName, workDir , conf, securityMgr), nombre = actorName)

(actorSystem,boundPort)

}

3. Llame a createActorSystem de AkkaUtils para crear ActorSystem

.

def createActorSystem(

nombre: String,

host: String,

puerto: int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int => (ActorSystem, Int) = { actualPort =>

doCreateActorSystem (nombre, host, puerto actual, conf, administrador de seguridad)

}

Utils.startServiceOnPort(puerto, startService, conf, nombre)

}

4. Después de crear ActorSystem, llame al constructor principal del Worker y ejecute el método preStart

override def preStart() {

afirmar (! registrado)

logInfo("Iniciando el trabajador de Spark %s:%d con %d núcleos, %s RAM".format(

host, puerto, núcleos, Utils.megabytesToString(memoria))

logInfo(s "Ejecutando Spark versión ${org.apache.spark.SPARK_VERSION}")

5.

logInfo("Spark home: " + sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

shuffleService.bind()

RegisterWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Adjunte el controlador de servlet de métricas de trabajador a la WebUI después del sistema de métricas se inicia.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5. Llame al método RegisterWithMaster para registrar el trabajador iniciado en el programa principal

def RegisterWithMaster() {

// DisassociatedEvent puede activarse varias veces, así que no intente registrarse si hay programado un intento de registro pendiente

// .

registroRetryTimer coincide {

caso Ninguno =>

registrado = falso

tryRegisterAllMasters()

conexiónAttemptCount = 0

registrationRetryTimer = Algunos {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =>

logInfo("No se genera otro intento de registrarse con el maestro, ya que hay un" +

" intento programado ya."")

}

}

6. Llame a tryRegisterAllMasters para enviar un mensaje de trabajador registrado al Maestro

def privado tryRegisterAllMasters () {

for (masterAkkaUrl <- masterAkkaUrls) {

logInfo("Conectando al maestro " + masterAkkaUrl + "...")

val actor = contexto .actorSelection(masterAkkaUrl)

actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)

}

}

7. El messageWithLogging de la estación maestra recibe mensajes Ejecución estado

caso RegisterWorker(id, trabajadorHost, trabajadorPuerto, núcleos, memoria, trabajadorUiPort, dirección pública) =>

{

logInfo(" Registrando trabajador %s: %d con %d núcleos, %s RAM". format(

) =>

8.format(

trabajadorHost, trabajadorPuerto, núcleos, Utils. megabytesToString(memoria)))

if ( state == RecoveryState.STANDBY) {

// Ignorar, no enviar respuesta

} else if (idToWorker .contiene(id)) {

remitente! RegisterWorkerFailed("ID de trabajador duplicado")

} else {

val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPort,

núcleos, memoria,

remitente, trabajadorUiPort, dirección pública)

if (registerWorker(trabajador)) {

persistenceEngine.addWorker(trabajador)

remitente! RegisteredWorker(masterUrl, masterWebUiUrl)

Schedule()

} else {

val WorkersAddress = trabajador.actor.path.address

logWarning("Error en el registro del trabajador. Intente volver a registrar al trabajador en la misma dirección " +

": " + dirección del trabajador)

¡Remitente! RegisterWorkerFailed("Se intentó volver a registrar al trabajador en la misma dirección:"

+ WorkersAddress)

}

}

}

8. El fracaso regresa al Trabajador. El fracaso devolverá un mensaje de error al Trabajador, mientras que el éxito devolverá información sobre el Maestro.

9. Se llama al programa después de devolver la información, pero como no hay aplicación, por lo que no hay asignación de recursos en este momento

Se ha iniciado todo el clúster Spark

9.