Error en la compilación del código fuente de Spark
Spark master inicia el análisis del código fuente
1. Llame al método principal del master en start-master.sh.
def main(arg strings: Array[String]){
SignalLogger.register(log)
val conf = new SparkConf
val args = nuevos argumentos maestros(arg strings, conf)
Val (actor system,_,_,_)= startsystemandactor(args.host,args.port,args.webuiport,conf)// Iniciar el sistema y los actores.
actorSystem.awaitTermination()
}
2. Llame a startSystemAndActor para iniciar el sistema y crear elementos de ejecución.
def startSystemAndActor(
Host: String,
puerto: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = nuevo administrador de seguridad (conf)
val (actorSystem, puerto vinculado) = akkautils . createactorsystem(nombre del sistema, host, puerto, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf) [Master], host,boundPort, webUiPort, securityMgr, conf), actorName)
val time out = akkautils . Ask time out(conf)
val ports request = actor. (BoundPortsRequest)(tiempo de espera)
respuesta de puertos val = resultado de espera (solicitud de puertos, tiempo de espera). Como instancia de [BoundPortsResponse]
(actorSystem,boundPort, portsResponse.webUIPort, portsResponse.restPort)
3. Llame a AkkaUtils.createActorSystem para crear un ActorSystem.
def createActorSystem(
Nombre: Cadena,
Host: Cadena,
puerto: Int,
conf: SparkConf,
administrador de seguridad: administrador de seguridad): (ActorSystem, Int) = {
val startService: Int = gt; (ActorSystem, Int) = { puerto real = gt; ;
doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)
}
Utils.startServiceOnPort(puerto, servicio de inicio, configuración, nombre)
}
4. Llame a Utils.startServiceOnPort para iniciar el servicio en el puerto. Después de que la creación sea exitosa, llame a DocCreator System para crear ActorSystem.
5. Cree Actor después de crear ActorSystem con éxito.
6. Llame al constructor principal de Master y ejecute preStart().
1 y start-slaves.sh llaman al método principal de la clase Worker.
def main(arg strings: Array[String]){
SignalLogger.register(log)
val conf = new SparkConf
val args = nuevos argumentos de trabajador(arg strings, conf)
val (actorSystem, _) = startSystemAndActor(args. host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2. crear elemento ejecutivo.
def startSystemAndActor(
Host: String,
puerto: Int,
webUiPort: Int,
Núcleo: Int,
Memoria: Int,
URL maestra: Matriz[Cadena],
DirTrabajo: Cadena,
Número de trabajador: Opción[Int] = Ninguna,
conf: spark conf = new spark conf): (actor system, Int) = {
LocalSparkCluster ejecuta múltiples sistemas de actores locales sparkWorkerX
nombre del sistema val = "trabajador de chispa" número de trabajador (_.toString).
getOrElse(" ")
val actorName = "Trabajador "
val securityMgr =nuevo administrador de seguridad (conf)
val (actorSystem, puerto vinculado) = akkautils createactorsystem(nombre del sistema, host, puerto,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = URL maestras (master . toakkaurl(_, AkkaUtils. protocolo(actorSystem. )))
actor system .actor of(Props(clase de[Trabajador], host,boundPort, webUiPort, núcleos, memoria,
masterAkkaUrls, systemName, actorName, workDir, conf. , securityMgr), nombre = actorName)
(actorSystem,boundPort)
}
3. Llame a createActorSystem de AkkaUtils para crear ActorSystem.
def createActorSystem(
Nombre: Cadena,
Host: Cadena,
puerto: Int,
conf: SparkConf,
administrador de seguridad: administrador de seguridad): (ActorSystem, Int) = {
val startService: Int = gt; (ActorSystem, Int) = { puerto real = gt; ;
doCreateActorSystem(nombre, host, puerto real, configuración, administrador de seguridad)
}
Utils.startServiceOnPort(puerto, servicio de inicio, configuración, nombre)
}
4. Después de crear ActorSystem, llame al constructor principal de Worker y ejecute el método preStart.
Anular def preStart() {
Assert(!Registered)
logInfo("Iniciando Spark Workers: d, con d núcleos, s RAM ") . Formato (
Host, puerto, núcleo, utilidad.
Cadena de megabytes (memoria)))
logInfo(s "Ejecutando la versión de Spark $ { org . Apache . Spark . Spark _ VERSION } ")
logInfo("Inicio de Spark : " sparkHome )
createWorkDir()
contexto .sistema de flujo de eventos.suscríbete(self, clase de[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()
fuente de registro del sistema de métricas (. fuente del trabajador)
metricsSystem.start()
// Adjunte el controlador del servlet de métricas del trabajador a la interfaz de usuario web después de iniciar el sistema de métricas.
sistema de métricas. getservlethandlers. foreach(webui. adjuntar controlador)
}
5. Llame al método RegisterWithMaster para registrar el proceso de trabajo iniciado con el proceso principal. .
def RegisterWithMaster() {
// DisassociatedEvent puede activarse varias veces, así que no intente registrarse
//Si está programado un intento de registro sin finalizar .
El temporizador de reintento de registro coincide {
case Ninguno = gt
register=false
tryRegisterAllMasters()
Recuento de intentos de conexión = 0
registrationRetryTimer = Algún {
sistema de contexto . , self, RegisterWithMaster)
}
case Some(_)= gt;
logInfo("no generará otro registro con el intento del servidor maestro porque no es un "
"Intento ya programado ")
}
}
6. Llamar a tryRegisterAllMasters lo hará. Se envían mensajes de trabajo registrados al servidor maestro.
private def tryRegisterAllMasters(){
for(masterAkkaUrl lt; - masterAkkaUrls) {
logInfo("Conectando al servidor maestro " masterAkkaUrl " ... " )
val actor = contexto . selección de actor(masterAkkaUrl)
¡Actor! RegisterWorker(workerId, host, puerto, núcleos, memoria, webUi.boundPort, publicAddress)
}
}
7. lo ejecuta.
case RegisterWorker(id, trabajadorHost, trabajadorPort, núcleos, memoria, trabajadorUiPort, dirección pública) = gt
{
logInfo("Registro de procesos de trabajo; : d, con d núcleos, s RAM "). Formato (
workerHost, trabajadorPort, núcleos, Utils.megabytesToString(memoria)))
if (estado == RecoveryState. Standby) {
//ignorar , no envíes una respuesta
} else if(idtoworker . contains(id)){
Remitente! RegisterWorkerFailed("ID de trabajo duplicado ")
} En caso contrario {
val trabajador = new WorkerInfo(id, trabajadorHost, trabajadorPuerto, núcleos, memoria,
Enviar remitente , informe de trabajo, dirección pública)
if (registerWorker(trabajador)) {
motor de persistencia .agregar trabajador(trabajador)
¡Remitente! RegisteredWorker(masterUrl, masterWebUiUrl)
plan()
} else {
val dirección del trabajador = dirección del actor . >logWarning("Error en el registro del trabajador. Intentando enviar un mensaje a la misma "
"Dirección:" dirección de trabajo)
¡Remitente! RegisterWorkerFailed("Intento volver a registrar el programa de trabajo en la misma dirección: "
Dirección de trabajo)
}
}
}
8. Si falla, se devuelve el mensaje de error al trabajador. Si tiene éxito, se devuelve la información relevante del Maestro.
9. Se llama a la programación después de devolver el mensaje, pero como no hay ninguna aplicación, no se asignarán recursos en este momento.
En este punto, se ha iniciado todo el clúster de Spark.