Preguntas sobre programación de Spark
Los más de 100.000 datos existentes se almacenan en varios archivos en la carpeta de información de usuario en HDFS. El formato de datos es el siguiente:
Zhang San|Hombre|23|Soltero. |Beijing|Haidian
Li Si|Mujer|25|Casado|Hebei|Shijiazhuang
Solicitud:
1. La edad promedio de todas las personas en los datos
2. El número de hombres solteros y el número de mujeres solteras en los datos
3. Las tres provincias con mayor número de personas casadas entre 20 y 30 en los datos
Respuesta:
paquete spark08
importar org.apache.spark.rdd.RDD
importar org.apache.spark.util. LongAccumulator
import org.apache. spark.{SparkConf, SparkContext}
/**
*Zhang San|Hombre|23|Soltero|Beijing|Haidian
*Li Si|Mujer| 25|Casada|Hebei|Shijiazhuang
?*
*Estadísticas:
* 1. El edad promedio de todas las personas en los datos
* 2. El número de hombres solteros y el número de mujeres solteras en los datos
* 3. Las tres provincias principales con 20-30 personas casadas en los datos
* 4. La proporción de solteros (las 3 ciudades principales con el mayor número de personas solteras/número total de personas en la ciudad)
?*/
objeto UserInfo {
?def main(args: Array[String ]): Unidad = {
val conf: SparkConf = new SparkConf() p>
?.setAppName(this.getClass.getSimpleName)
?.setMaster( "local[*]")
val sc = new SparkContext(conf) p>
//Leer el archivo original
val strFile: RDD[String] = sc.textFile("D:\\data\\data\\userinfo")
val srcRdd: RDD[(String, String, Int, String, String, String)] = strFile.map(t => {
?val strings: Array[String] = t.split(" \\|")
?val nombre: String = strings(0)
?val género = strings(1)
?val edad = strings( 2).toInt
?val isMarry: String = strings(3)
?val provincia = strings(4)
?val ciudad = strings(5 )
?(nombre, sexo, edad, esCasarse, provincia, ciudad)
})
srcRdd.cache()
//1. La edad promedio de todas las personas en los datos Li Si|Mujer|25|Casado|Hebei|Shijiazhuang p>
val ageAccumulator: LongAccumulator = sc.longAccumulator //Use el acumulador para contar el número total de personas
val ageCount: I
nt = srcRdd.map(t => {
?ageAccumulator.add(1)
?t._3
}).reduce(_ + _ )
val ageNumber = ageAccumulator.value
val avgAge = ageCount.toLong/(ageNumber*1.0)
println(s"La edad promedio de todas las personas es ${avgAge}")
//2. El número de hombres solteros y el número de mujeres solteras en los datos
val GenderAndMarryRDD: RDD[(String, Iterable[( String, String) ])] = srcRdd.map(t => {
(t._1, t._3) //Género, estado civil
}).filter( _._2. equals("No casado")).groupBy(_._1) //Agrupar por género
val res2RDD: RDD[(String, Int)] = GenderAndMarryRDD.mapValues(t=>t .size)
res2RDD.collect().foreach(println)
// En los datos, las tres provincias principales con entre 20 y 30 personas casadas son Li Si| Mujer|25|Casada|Hebei|Shijiazhuang
val res3: Array[(Int, String)] = srcRdd.filter(t => {
t._3 >= 20 && t._3 <= 30 && t. _4.equals("married")
})// Elimina los datos que satisfacen al grupo de 20-30 casados, agrúpalos por provincia y encuentra el tamaño de v cuál es el número de 20-30 personas casadas p>
?.groupBy(_._5).mapValues(_.size)
//k, v intercambia los primeros 3
?.map(t = > (t._2, t._1)).top(3)
res3.foreach(println)
// (ciudad, (número de personas solteras, número de personas casadas))
//Las 3 ciudades principales con la proporción más alta de personas solteras (número de personas solteras/número total de personas en la ciudad) Li Si| Mujer|25|Casada|Hebei|Shijiazhuang