Cómo iniciar trabajadores en Spark
Análisis del código fuente de inicio de Spark Master
1 Llame al método principal de master en start-master.sh, la llamada al método principal
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//startSystemAndActor
actorSystem.awaitTermination()
}
2. Llame a startSystemAndActor para iniciar el sistema y crear el rol
def startSystemAndActor(
host: Cadena,
puerto: int,
webUiPort.Int,
conf: SparkConf): (ActorSystem, Int, Int, Opción[Int] ) = { p>
val securityMgr = new SecurityManager(conf)
val (actorSystem,boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host,boundPort, webUiPort, securityMgr, conf), actorName) p>
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout ).asInstanceOf[BoundPortsResponse]
(actorSystem,boundPort, portsResponse. Llame a AkkaUtils.createActorSystem para crear un ActorSystem
def createActorSystem(
nombre: String,
host: Cadena,
puerto: int,
conf: SparkConf,
seguridad
Administrador: SecurityManager): (ActorSystem, Int) = {
val startService: Int =gt; (ActorSystem, Int) = { actualPort =gt; doCreateActorSystem(nombre, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(puerto, startService, conf, nombre)
}
4. startServiceOnPort para iniciar el servicio en el puerto y llamar a doCreateActorSystem para crear ActorSystem después de que la creación sea exitosa
5. Cree el Actor después de que ActorSystem se haya creado exitosamente
6. Llame al constructor principal del Master y ejecutarlo preStart()
1. start-slaves.sh llama al método principal de la clase Worker
def main(argStrings: Array [String]) {
SignalLogger.register(log)
val conf = nueva SparkConf
val args = nueva WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args .host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2. Llame a startSystemAndActor para iniciar el sistema y crear un rol
def startSystemAndActor(
host.String,
puerto: int,
webUiPort: Int,
núcleos: Int,
memoria: Int, p>
masterUrls: Array[String],
workDir.String,
WorkerNumber: Option[Int] = Ninguno,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = { p>
// LocalSparkCluster ejecuta múltiples sistemas de roles locales sparkWorkerX
val systemName = "sparkWorker" trabajadorNumber.map(_.toString).getOrElse ("")
val actorName = "Trabajador"
val securityMgr = new SecurityManager(conf)
val (ac
torSystem,boundPort) = AkkaUtils.createActorSystem(nombredelsistema, host, puerto,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host,boundPort, webUiPort, núcleos, memoria,
masterAkkaUrls, systemName, actorName, workDir , conf, securityMgr), nombre = actorName)
(actorSystem,boundPort)
}
3. Llame a createActorSystem de AkkaUtils para crear ActorSystem
.def createActorSystem(
nombre: String,
host: String,
puerto: int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int =gt; (ActorSystem, Int) = { actualPort =gt; > doCreateActorSystem(nombre, host, puerto actual, conf, administrador de seguridad)
}
Utils.startServiceOnPort(puerto, startService, conf, nombre)
} p>
4. Después de crear ActorSystem, llame al constructor principal del Worker y ejecute el método preStart
override def preStart() {
afirmar (! registrado)
logInfo("Iniciando Spark Workers: d con d núcleos, s RAM".format(
host, puerto, núcleos, Utils.megabytesToString(memoria))
logInfo (s "Ejecutando la versión de Spark ${org.apache.spark.SPARK_VERSION}")
5.
logInfo("Spark home: " sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
<p> shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
RegisterWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Adjunte el controlador de servlet de métricas de trabajador a la WebUI después de que se haya iniciado el sistema de métricas.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
5. Llame al método RegisterWithMaster para registrar el trabajador iniciado en el programa principal
def RegisterWithMaster() {
// DisassociatedEvent puede activarse varias veces, así que no intente registrarse si hay intentos de registro pendientes en la programación
// .
registroRetryTimer coincide {
caso Ninguno =gt
registrado = falso
tryRegisterAllMasters()
conexiónAttemptCount = 0
RegistrationRetryTimer = Algunos {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =gt;
logInfo("No se genera otro intento de registrarse con el maestro, ya que hay un"
" intento ya está programado.")
}
}
6. Llame a tryRegisterAllMasters para enviar el mensaje del trabajador registrado a la definición privada del Maestro
tryRegisterAllMasters () {
for (masterAkkaUrl lt; - masterAkkaUrls) {
logInfo("Conectando al maestro " masterAkkaUrl "...")
val actor = contexto.actorSelection(masterAkkaUrl)
actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)
}
}
7. El messageWithLogging de la estación maestra recibe mensajes Ejecución estado
caso RegisterWorker(id, trabajadorHost, trabajadorPort, núcleos, memoria, trabajadorUiPort, dirección pública) =gt;
{
logInfo(" Registrando trabajadores: d con d núcleos, s RAM". format(
) =gt;
8.format(
trabajadorHost, trabajadorPort, núcleos, Utils.megabytesToString ( memoria)))
if ( state == RecoveryState.STANDBY) {
// Ignorar, no enviar respuesta
} else if (idToWorker.contains (id)) {
remitente! RegisterWorkerFailed("Duplicado
e ID del trabajador")
} else {
val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPort, núcleos, memoria,
remitente, trabajadorUiPort, dirección pública )
if (registerWorker(trabajador)) {
persistenceEngine.addWorker(trabajador)
remitente RegisteredWorker(masterUrl, masterWebUiUrl)
Schedule()
} else {
val workAddress = trabajador.actor.path.address
logWarning("Error en el registro del trabajador. Intente lo mismo"
"Dirección para volver a registrar al trabajador:" dirección de trabajador)
Remitente! RegisterWorkerFailed("Se intentó volver a registrar al trabajador en la misma dirección:"
dirección de trabajador)< / p>
}
}
}
8. El fracaso regresa al trabajador. El fracaso devolverá un mensaje de error al trabajador y el éxito lo hará. devolver información sobre la información maestra
9. Se llama a la programación después de que se devuelve la información, pero como no hay una aplicación, no hay asignación de recursos en este momento
Todo Spark. se ha iniciado el clúster
9.