Red de conocimiento informático - Material del sitio web - Cómo ejecutar aplicaciones Spark en CDH5

Cómo ejecutar aplicaciones Spark en CDH5

Crear un proyecto maven

Cree un proyecto maven normal usando el siguiente comando:

Pruebe

$ mvn Architect:generate-DgroupId=com.cloud era.spark word count-darti factid = spark word count-DarchetypeArtifactId = maven-architect-quick start-DinteractiveMode = false

Cambie el nombre del directorio sparkwordcount a simplesparkapp y luego agregue el directorio del archivo fuente de Scala en el directorio simplesparkapp:

Pruebe

$ mkdir-p spark word count/src/main/Scala/com/cloud era/spark word count

Modifique pom.xml y agregue scala y dependencias de chispa;

Lenguaje de marcado extensible

ltDependenciesgt

ltDependenciesgt

ltgroupId gtorg lt /groupId gt; /p>

ltartifactId gtscala biblioteca lt/artifact id gt;

lt versión gt2.10.4 lt;/versión gt;

lt/dependency gt; p> ltdependenciesgt

ltgroupId gtorg . Apache ./groupId gt;

ltartifactId gt spark-core_2.10 lt;/artifact id gt ;

lt versión gt1 2 . 0-CD H5 3 0 lt;/versión gt;

lt/dependencia gt;

lt/dependencias gt;

Agregar complemento para compilar Scala:

Lenguaje de marcado extensible

ltplugingt

ltgroupId gtorg .Scala-Toolslt /groupId gt;

ltartifactId gtmaven-Scala-plugin lt;/artifact id gt;

ltexecutegt

ltexecutegt

lt objetivo gt

lt objetivo gt compilar lt /goal gt;

lt objetivo gt prueba compilar lt/goal gt;

lt/goals gt;

lt/execution gt;

lt/executions gt;

lt/plugin gt;

Agregue el almacenamiento necesario para los complementos de compilación de Scala. Bibliotecas:

Lenguaje de marcado extensible

ltpluginrepositorygt

ltpluginRepositorygt

ltid gtScala-tools.org lt;/i

d gt;

lt nombre gtScala-tools Maven2 almacén lt/nombre gt;

lturl gt/content/repositories/releases/ lt;/URL gt; > lt/repository gt;

ltrepositorygt

ltid gtcloud era-repos lt;/id gt;

ltnamegtCloudera Repos lt/name gt ;

lturl gt/arti factory/cloud era-repos/ lt;/URL gt;

lt/repository gt;

lt/repositories gt;

Finalmente, el archivo pom.xml completo se puede encontrar en: /javachen/simple park app/blob/master/POM.

Ejecute el siguiente comando para comprobar si el proyecto se puede compilar correctamente:

Pruebe

paquete mvn

Escriba código de muestra

Tomando WordCount como ejemplo, el programa necesita completar la siguiente lógica:

Leer el archivo de entrada

Cuenta el número de veces que aparece cada palabra.

Filtrar palabras menos de un número determinado de veces.

Cuenta el número de apariciones de cada letra en las palabras restantes.

En MapReduce, la lógica anterior requiere dos tareas de MapReduce, pero en Spark, solo se requiere una tarea simple y la cantidad de código será un 90% menor.

Escribe el programa Scala de la siguiente manera:

Scala

Importar org.apache.spark.SparkContext

Importar org.Apache.spark . contexto de chispa _

Importar org.apache.spark.SparkConf

Recuento de chispas de objeto {

def main(args: Array[String]) {< / p>

val sc = nuevo contexto de chispa(nueva conf de chispa().setAppName("recuento de chispas"))

umbral de valor = args(1). toInt

//Dividir cada documento en palabras

val tokenized = sc . Plane map(_. Split(" ")

//Cuenta el número de apariciones de cada palabra

val wordCounts = tokenized.map((_, 1)).reduceByKey( _ _)

//Filtrar palabras que aparecen por debajo del umbral

val filtered = word counts filter(_._ 2 gt=threshold)

//Cuenta el número de caracteres

val char counts = filtered .plan map(_._1.toCharArray).map((_,1))

system out. println(cuenta de caracteres. recoger().

mkString(",")

recuento de caracteres saveastextfile(" resultado-conteo mundial ")

}

}

Spark Utiliza una estrategia de ejecución diferida, lo que significa que la transición solo se ejecutará cuando se ejecute la acción. Las acciones en el ejemplo anterior son recopilar y guardarAsTextFile. La primera es enviar los datos al cliente y la segunda es guardar los datos en HDFS.

A modo de comparación, la versión Java de este programa es la siguiente:

Lenguaje Java (un lenguaje informático, especialmente utilizado para crear sitios web)

Importar Java. util . ArrayList;

Importar Java.util.arrays;

Importar org.Apache.spark.API.Java.*;

Importar org.Apache. Spark.API. Función Java.

Importar org. Tuple2

Clase pública JavaWordCount {

Public static void main(String[] args) {

JavaSparkContext sc = new JavaSparkContext(new spark conf().setAppName (" Spark Count "));

final int umbral = entero parse int(args[1]);

//Dividir cada documento en palabras

JavaRDD tokenizado = sc archivo de texto(args[0]).

FlatMap(

new FlatMapFunction() {

Llamada iterable pública (String){

Devuelve matrices . aslist(s . split(" " );

}

}

);

//Cuenta el número de apariciones de cada palabra

JavaPairRDD cuenta = tokenizado . maptopair(

new PairFunction() {

llamada pública de Tuple2(String){

devuelve nuevas Tuple2(s, 1);

}

}

).reduceByKey(

Nueva función 2() {

llamada entera pública (entero i1, entero i2) {

retorno I 1 I2;

}

}

);

p>

Además, la versión Python del programa es la siguiente:

Python

Importar sistema

Importar SparkContext desde pyspark

archivo ="inputfile.txt "

count=2

if __name__ == "__main__ ":

sc = contexto de chispa(appName = " palabra de Python contar " )

líneas = sc.textFile(archivo, 1)

cuentas = líneas . mapa plano(lambda x: x . split(' '))\

. mapa(λx: (x, 1)) \

reducirByKey(λa, b: a b) \

= contar) \

<. p>. flatMap(lambda (a, b): lista(a)) \

.map(λx: (x, 1)) \

.reduceByKey(λa, b). : a b)

Imprimir "," .str(t) de t en join(counts.collect())

Detener()

Compilar

Ejecute el siguiente comando para generar jar:

Pruebe

paquete $ mvn

Después de la operación exitosa, Spark Word Count-0. Se generará 1 archivo jar de instantánea en el directorio de destino.

Ejecutar

Debido a que la versión de Spark de la que depende el proyecto es 1.2.0-cdh5.3.0, el siguiente comando solo se puede ejecutar en el clúster cdh5.3.

Primero, cargue el archivo de prueba inputfile.txt en HDFS

Pruebe

$ wget/javachen/simplesparkapp/blob/master/data/input. txt

$ hadoop fs -put inputfile.txt

En segundo lugar, cargue spark-0.0.1-snapshot.jar en un nodo del clúster y luego use el script spark-submit; Ejecute la versión Scala del programa:

Pruebe

$ spark-submit-class com . Spark word count-master local spark word count-0. 0 . 1-instantánea . archivo de entrada jar . txt 2

Como alternativa, ejecute la versión Java del programa:

Pruebe

$ spark-submit-class. com. era de la nube. Recuento de palabras de Spark. Recuento de palabras de Java-master. El script es:

Pruebe

$ spark-submit-master recuento de palabras de Python local .py

Si su clúster está implementado en modo independiente, puede reemplazar el valor del parámetro maestro para spark://

La salida de la versión final de Python del programa es la siguiente:

(u'a ', 4), (u 'c', 1), (u' b', 1), (u'e', 6), (u'f', 1), (u'i', 1), (u'h', 1 ), (u'l', 1), (u'o', 2), (u'n', 4), (u'p', 2), (u'r', 2), (u' u', 1), (u't', 2.