Error del trabajador de inicio de Spark, ¡responda!
Análisis del código fuente de inicio de Spark Master
1 Llame al método principal de master en start-master.sh, la llamada al método principal
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//startSystemAndActor
actorSystem.awaitTermination()
}
2. Llame a startSystemAndActor para iniciar el sistema y crear el rol
def startSystemAndActor(
host: Cadena,
puerto: int,
webUiPort.Int,
conf: SparkConf): (ActorSystem, Int, Int, Opción[Int] ) = { p>
val securityMgr = new SecurityManager(conf)
val (actorSystem ,boundPort) = AkkaUtils.createActorSystem(systemName, host, puerto, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host,boundPort, webUiPort, securityMgr, conf), actorName) p>
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout ).asInstanceOf[BoundPortsResponse]
(actorSystem,boundPort, portsResponse. Llame a AkkaUtils.createActorSystem para crear un ActorSystem
def createActorSystem(
nombre: String,
host: String,
puerto: int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSy
madre, Int) = { actualPort =>
doCreateActorSystem(nombre, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(puerto, startService , conf, nombre)
}
4.startServiceOnPort inicia el servicio en el puerto y llama a doCreateActorSystem para crear ActorSystem después de que la creación sea exitosa
5 , ActorSystem se creó con éxito Luego cree el Actor
6. Llame al constructor principal del Master y ejecute preStart()
1. Clase de trabajador
def main(argStrings: Array [String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings , conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2. un rol
def startSystemAndActor(
host.String,
puerto: int,
webUiPort: Int,
núcleos: Int,
Memoria: Int,
masterUrls: Array[String],
workDir.String,
WorkerNumber : Option[Int] = Ninguno,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
// LocalSparkCluster ejecuta múltiples sistemas de roles locales sparkWorkerX
val systemName = "sparkWorker" + trabajadorNumber .map(_.toString).getOrElse("")
val actorName = "Trabajador"
val securityMgr = new SecurityManager( conf)
val ( actorSystem,boundPort) = AkkaUtils.createActorSystem( systemName, host, puerto,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_,
AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host,boundPort, webUiPort, núcleos, memoria,
masterAkkaUrls, systemName, actorName, workDir , conf, securityMgr), nombre = actorName)
(actorSystem,boundPort)
}
3. Llame a createActorSystem de AkkaUtils para crear ActorSystem
.def createActorSystem(
nombre: String,
host: String,
puerto: int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem (nombre, host, puerto actual, conf, administrador de seguridad)
}
Utils.startServiceOnPort(puerto, startService, conf, nombre)
}
4. Después de crear ActorSystem, llame al constructor principal del Worker y ejecute el método preStart
override def preStart() {
afirmar (! registrado)
logInfo("Iniciando el trabajador de Spark %s:%d con %d núcleos, %s RAM".format(
host, puerto, núcleos, Utils.megabytesToString(memoria))
logInfo(s "Ejecutando Spark versión ${org.apache.spark.SPARK_VERSION}")
5.
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
shuffleService.bind()
RegisterWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Adjunte el controlador de servlet de métricas de trabajador a la WebUI después del sistema de métricas se inicia.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
5. Llame al método RegisterWithMaster para registrar el trabajador iniciado en el programa principal
def RegisterWithMaster() {
// DisassociatedEvent puede activarse varias veces, así que no intente registrarse si hay programado un intento de registro pendiente
// .
registroRetryTimer coincide {
caso Ninguno =>
registrado = falso
tryRegisterAllMasters()
conexiónAttemptCount = 0
registrationRetryTimer = Algunos {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("No se genera otro intento de registrarse con el maestro, ya que hay un" +
" intento programado ya."")
}
}
6. Llame a tryRegisterAllMasters para enviar un mensaje de trabajador registrado al Maestro
def privado tryRegisterAllMasters () {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Conectando al maestro " + masterAkkaUrl + "...")
val actor = contexto .actorSelection(masterAkkaUrl)
actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)
}
}
7. El messageWithLogging de la estación maestra recibe mensajes Ejecución estado
caso RegisterWorker(id, trabajadorHost, trabajadorPuerto, núcleos, memoria, trabajadorUiPort, dirección pública) =>
{
logInfo(" Registrando trabajador %s: %d con %d núcleos, %s RAM". format(
) =>
8.format(
trabajadorHost, trabajadorPuerto, núcleos, Utils. megabytesToString(memoria)))
if ( state == RecoveryState.STANDBY) {
// Ignorar, no enviar respuesta
} else if (idToWorker .contiene(id)) {
remitente! RegisterWorkerFailed("ID de trabajador duplicado")
} else {
val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPort,
núcleos, memoria,
remitente, trabajadorUiPort, dirección pública)
if (registerWorker(trabajador)) {
persistenceEngine.addWorker(trabajador)
remitente! RegisteredWorker(masterUrl, masterWebUiUrl)
Schedule()
} else {
val WorkersAddress = trabajador.actor.path.address
logWarning("Error en el registro del trabajador. Intente volver a registrar al trabajador en la misma dirección " +
": " + dirección del trabajador)
¡Remitente! RegisterWorkerFailed("Se intentó volver a registrar al trabajador en la misma dirección:"
+ WorkersAddress)
}
}
} p>
8. El fracaso regresa al Trabajador. El fracaso devolverá un mensaje de error al Trabajador, mientras que el éxito devolverá información sobre el Maestro.
9. Se llama al programa después de devolver la información, pero como no hay aplicación, por lo que no hay asignación de recursos en este momento
Se ha iniciado todo el clúster Spark
9.