Red de conocimiento informático - Problemas con los teléfonos móviles - Cómo iniciar trabajadores en Spark

Cómo iniciar trabajadores en Spark

Análisis basado en el código fuente de Spark 1.3.1

Análisis del código fuente de inicio de Spark Master

1 Llame al método principal de master en start-master.sh, la llamada al método principal

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//startSystemAndActor

actorSystem.awaitTermination()

}

2. Llame a startSystemAndActor para iniciar el sistema y crear el rol

def startSystemAndActor(

host: Cadena,

puerto: int,

webUiPort.Int,

conf: SparkConf): (ActorSystem, Int, Int, Opción[Int] ) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem,boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host,boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout ).asInstanceOf[BoundPortsResponse]

(actorSystem,boundPort, portsResponse. Llame a AkkaUtils.createActorSystem para crear un ActorSystem

def createActorSystem(

nombre: String,

host: Cadena,

puerto: int,

conf: SparkConf,

seguridad

Administrador: SecurityManager): (ActorSystem, Int) = {

val startService: Int =gt; (ActorSystem, Int) = { actualPort =gt; doCreateActorSystem(nombre, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(puerto, startService, conf, nombre)

}

4. startServiceOnPort para iniciar el servicio en el puerto y llamar a doCreateActorSystem para crear ActorSystem después de que la creación sea exitosa

5. Cree el Actor después de que ActorSystem se haya creado exitosamente

6. Llame al constructor principal del Master y ejecutarlo preStart()

1. start-slaves.sh llama al método principal de la clase Worker

def main(argStrings: Array [String]) {

SignalLogger.register(log)

val conf = nueva SparkConf

val args = nueva WorkerArguments(argStrings, conf)

val (actorSystem, _) = startSystemAndActor(args .host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2. Llame a startSystemAndActor para iniciar el sistema y crear un rol

def startSystemAndActor(

host.String,

puerto: int,

webUiPort: Int,

núcleos: Int,

memoria: Int,

masterUrls: Array[String],

workDir.String,

WorkerNumber: Option[Int] = Ninguno,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// LocalSparkCluster ejecuta múltiples sistemas de roles locales sparkWorkerX

val systemName = "sparkWorker" trabajadorNumber.map(_.toString).getOrElse ("")

val actorName = "Trabajador"

val securityMgr = new SecurityManager(conf)

val (ac

torSystem,boundPort) = AkkaUtils.createActorSystem(nombredelsistema, host, puerto,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host,boundPort, webUiPort, núcleos, memoria,

masterAkkaUrls, systemName, actorName, workDir , conf, securityMgr), nombre = actorName)

(actorSystem,boundPort)

}

3. Llame a createActorSystem de AkkaUtils para crear ActorSystem

.

def createActorSystem(

nombre: String,

host: String,

puerto: int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int =gt; (ActorSystem, Int) = { actualPort =gt; > doCreateActorSystem(nombre, host, puerto actual, conf, administrador de seguridad)

}

Utils.startServiceOnPort(puerto, startService, conf, nombre)

}

4. Después de crear ActorSystem, llame al constructor principal del Worker y ejecute el método preStart

override def preStart() {

afirmar (! registrado)

logInfo("Iniciando Spark Workers: d con d núcleos, s RAM".format(

host, puerto, núcleos, Utils.megabytesToString(memoria))

logInfo (s "Ejecutando la versión de Spark ${org.apache.spark.SPARK_VERSION}")

5.

logInfo("Spark home: " sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

<

p> shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

RegisterWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Adjunte el controlador de servlet de métricas de trabajador a la WebUI después de que se haya iniciado el sistema de métricas.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5. Llame al método RegisterWithMaster para registrar el trabajador iniciado en el programa principal

def RegisterWithMaster() {

// DisassociatedEvent puede activarse varias veces, así que no intente registrarse si hay intentos de registro pendientes en la programación

// .

registroRetryTimer coincide {

caso Ninguno =gt

registrado = falso

tryRegisterAllMasters()

conexiónAttemptCount = 0

RegistrationRetryTimer = Algunos {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =gt;

logInfo("No se genera otro intento de registrarse con el maestro, ya que hay un"

" intento ya está programado.")

}

}

6. Llame a tryRegisterAllMasters para enviar el mensaje del trabajador registrado a la definición privada del Maestro

tryRegisterAllMasters () {

for (masterAkkaUrl lt; - masterAkkaUrls) {

logInfo("Conectando al maestro " masterAkkaUrl "...")

val actor = contexto.actorSelection(masterAkkaUrl)

actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)

}

}

7. El messageWithLogging de la estación maestra recibe mensajes Ejecución estado

caso RegisterWorker(id, trabajadorHost, trabajadorPort, núcleos, memoria, trabajadorUiPort, dirección pública) =gt;

{

logInfo(" Registrando trabajadores: d con d núcleos, s RAM". format(

) =gt;

8.format(

trabajadorHost, trabajadorPort, núcleos, Utils.megabytesToString ( memoria)))

if ( state == RecoveryState.STANDBY) {

// Ignorar, no enviar respuesta

} else if (idToWorker.contains (id)) {

remitente! RegisterWorkerFailed("Duplicado

e ID del trabajador")

} else {

val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPort, núcleos, memoria,

remitente, trabajadorUiPort, dirección pública )

if (registerWorker(trabajador)) {

persistenceEngine.addWorker(trabajador)

remitente RegisteredWorker(masterUrl, masterWebUiUrl)

Schedule()

} else {

val workAddress = trabajador.actor.path.address

logWarning("Error en el registro del trabajador. Intente lo mismo"

"Dirección para volver a registrar al trabajador:" dirección de trabajador)

Remitente! RegisterWorkerFailed("Se intentó volver a registrar al trabajador en la misma dirección:"

dirección de trabajador)< / p>

}

}

}

8. El fracaso regresa al trabajador. El fracaso devolverá un mensaje de error al trabajador y el éxito lo hará. devolver información sobre la información maestra

9. Se llama a la programación después de que se devuelve la información, pero como no hay una aplicación, no hay asignación de recursos en este momento

Todo Spark. se ha iniciado el clúster

9.