Cómo ejecutar aplicaciones Spark en CDH5
Cree un proyecto maven normal usando el siguiente comando:
Pruebe
$ mvn Architect:generate-DgroupId=com.cloud era.spark word count-darti factid = spark word count-DarchetypeArtifactId = maven-architect-quick start-DinteractiveMode = false
Cambie el nombre del directorio sparkwordcount a simplesparkapp y luego agregue el directorio del archivo fuente de Scala en el directorio simplesparkapp:
Pruebe
$ mkdir-p spark word count/src/main/Scala/com/cloud era/spark word count
Modifique pom.xml y agregue scala y dependencias de chispa;
Lenguaje de marcado extensible
ltDependenciesgt
ltDependenciesgt
ltgroupId gtorg lt /groupId gt; /p>
ltartifactId gtscala biblioteca lt/artifact id gt;
lt versión gt2.10.4 lt;/versión gt;
lt/dependency gt; p> ltdependenciesgt
ltgroupId gtorg . Apache ./groupId gt;
ltartifactId gt spark-core_2.10 lt;/artifact id gt ;
lt versión gt1 2 . 0-CD H5 3 0 lt;/versión gt;
lt/dependencia gt;
lt/dependencias gt;
Agregar complemento para compilar Scala:
Lenguaje de marcado extensible
ltplugingt
ltgroupId gtorg .Scala-Toolslt /groupId gt;
ltartifactId gtmaven-Scala-plugin lt;/artifact id gt;
ltexecutegt
ltexecutegt
lt objetivo gt
lt objetivo gt compilar lt /goal gt;
lt objetivo gt prueba compilar lt/goal gt;
lt/goals gt;
lt/execution gt;
lt/executions gt;
lt/plugin gt;
Agregue el almacenamiento necesario para los complementos de compilación de Scala. Bibliotecas:
Lenguaje de marcado extensible
ltpluginrepositorygt
ltpluginRepositorygt
ltid gtScala-tools.org lt;/i
d gt;
lt nombre gtScala-tools Maven2 almacén lt/nombre gt;
lturl gt/content/repositories/releases/ lt;/URL gt; > lt/repository gt;
ltrepositorygt
ltid gtcloud era-repos lt;/id gt;
ltnamegtCloudera Repos lt/name gt ;
lturl gt/arti factory/cloud era-repos/ lt;/URL gt;
lt/repository gt;
lt/repositories gt;
Finalmente, el archivo pom.xml completo se puede encontrar en: /javachen/simple park app/blob/master/POM.
Ejecute el siguiente comando para comprobar si el proyecto se puede compilar correctamente:
Pruebe
paquete mvn
Escriba código de muestra p>
Tomando WordCount como ejemplo, el programa necesita completar la siguiente lógica:
Leer el archivo de entrada
Cuenta el número de veces que aparece cada palabra.
Filtrar palabras menos de un número determinado de veces.
Cuenta el número de apariciones de cada letra en las palabras restantes.
En MapReduce, la lógica anterior requiere dos tareas de MapReduce, pero en Spark, solo se requiere una tarea simple y la cantidad de código será un 90% menor.
Escribe el programa Scala de la siguiente manera:
Scala
Importar org.apache.spark.SparkContext
Importar org.Apache.spark . contexto de chispa _
Importar org.apache.spark.SparkConf
Recuento de chispas de objeto {
def main(args: Array[String]) {< / p>
val sc = nuevo contexto de chispa(nueva conf de chispa().setAppName("recuento de chispas"))
umbral de valor = args(1). toInt
//Dividir cada documento en palabras
val tokenized = sc . Plane map(_. Split(" ")
//Cuenta el número de apariciones de cada palabra
val wordCounts = tokenized.map((_, 1)).reduceByKey( _ _)
//Filtrar palabras que aparecen por debajo del umbral
val filtered = word counts filter(_._ 2 gt=threshold)
val char counts = filtered .plan map(_._1.toCharArray).map((_,1))
system out. println(cuenta de caracteres. recoger().
mkString(",")
recuento de caracteres saveastextfile(" resultado-conteo mundial ")
}
}
Spark Utiliza una estrategia de ejecución diferida, lo que significa que la transición solo se ejecutará cuando se ejecute la acción. Las acciones en el ejemplo anterior son recopilar y guardarAsTextFile. La primera es enviar los datos al cliente y la segunda es guardar los datos en HDFS.
A modo de comparación, la versión Java de este programa es la siguiente:
Lenguaje Java (un lenguaje informático, especialmente utilizado para crear sitios web)
Importar Java. util . ArrayList;
Importar Java.util.arrays;
Importar org.Apache.spark.API.Java.*;
Importar org.Apache. Spark.API. Función Java.
Importar org. Tuple2
Clase pública JavaWordCount {
Public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new spark conf().setAppName (" Spark Count "));
final int umbral = entero parse int(args[1]);
//Dividir cada documento en palabras
JavaRDD tokenizado = sc archivo de texto(args[0]).
FlatMap(
new FlatMapFunction() {
Llamada iterable pública (String){
Devuelve matrices . aslist(s . split(" " ); p>
}
}
);
//Cuenta el número de apariciones de cada palabra
JavaPairRDD cuenta = tokenizado . maptopair(
new PairFunction() {
llamada pública de Tuple2(String){
devuelve nuevas Tuple2(s, 1);
}
}
).reduceByKey(
Nueva función 2() {
llamada entera pública (entero i1, entero i2) {
retorno I 1 I2;
}
}
);
p>
Además, la versión Python del programa es la siguiente:
Python
Importar sistema
Importar SparkContext desde pyspark
archivo ="inputfile.txt "
count=2
if __name__ == "__main__ ":
sc = contexto de chispa(appName = " palabra de Python contar " )
líneas = sc.textFile(archivo, 1)
cuentas = líneas . mapa plano(lambda x: x . split(' '))\
. mapa(λx: (x, 1)) \
reducirByKey(λa, b: a b) \
= contar) \
<. p>. flatMap(lambda (a, b): lista(a)) \.map(λx: (x, 1)) \
.reduceByKey(λa, b). : a b)
Imprimir "," .str(t) de t en join(counts.collect())
Detener()
Compilar p>
Ejecute el siguiente comando para generar jar:
Pruebe
paquete $ mvn
Después de la operación exitosa, Spark Word Count-0. Se generará 1 archivo jar de instantánea en el directorio de destino.
Ejecutar
Debido a que la versión de Spark de la que depende el proyecto es 1.2.0-cdh5.3.0, el siguiente comando solo se puede ejecutar en el clúster cdh5.3.
Primero, cargue el archivo de prueba inputfile.txt en HDFS
Pruebe
$ wget/javachen/simplesparkapp/blob/master/data/input. txt
$ hadoop fs -put inputfile.txt
En segundo lugar, cargue spark-0.0.1-snapshot.jar en un nodo del clúster y luego use el script spark-submit; Ejecute la versión Scala del programa:
Pruebe
$ spark-submit-class com . Spark word count-master local spark word count-0. 0 . 1-instantánea . archivo de entrada jar . txt 2
Como alternativa, ejecute la versión Java del programa:
Pruebe
$ spark-submit-class. com. era de la nube. Recuento de palabras de Spark. Recuento de palabras de Java-master. El script es:
Pruebe
$ spark-submit-master recuento de palabras de Python local .py
Si su clúster está implementado en modo independiente, puede reemplazar el valor del parámetro maestro para spark://
La salida de la versión final de Python del programa es la siguiente:
(u'a ', 4), (u 'c', 1), (u' b', 1), (u'e', 6), (u'f', 1), (u'i', 1), (u'h', 1 ), (u'l', 1), (u'o', 2), (u'n', 4), (u'p', 2), (u'r', 2), (u' u', 1), (u't', 2.