Cómo usar Scala+Spark para leer y escribir hbase
Continuando con lo anterior, hbase almacena algunos datos en tiempo real. En las últimas dos semanas, los nuevos requisitos han realizado una actualización completa de los datos en la tabla especificada en hbase para cumplir con el desarrollo del negocio. La operación habitual de hbase es una sola cuajada, o insertar listas en lotes, usar la API de Java de hbase es relativamente simple, pero implica actualizaciones completas. Esta vez implica una actualización completa, por lo que si utiliza la API de operación de subproceso único original, definitivamente será mucho más lenta.
Con respecto a las operaciones por lotes de Hbase, generalmente usamos MapReduce para operar, lo que puede acelerar en gran medida la eficiencia del procesamiento. También escribí las operaciones MR de Hbase antes, y el proceso es relativamente engorroso, lo he estado usando recientemente. Scala para el desarrollo de Spark, así que use Scala + Spark directamente para hacer este trabajo. Por supuesto, la capa inferior usa TableOutputFormat y TableOutputFormat de Hbase. Esto es lo mismo que MR. luego haga un filtrado simple Conversión y finalmente escriba los resultados en hbase.
El proceso completo es el siguiente:
(1) Leer todos los datos de la tabla hbase
(2) Ejecutar una serie de ETL
(3) Vuelva a escribir todos los datos en hbase
El código principal es el siguiente:
//Get conf
val conf= HBaseConfiguration.create () //configura la tabla para leer
conf.set(TableInputFormat.INPUT_TABLE,tableName) //configura la tabla para escribir
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)//create sparkConf
val sparkConf=new SparkConf() //establece el nombre de la tarea para spark
sparkConf.setAppName(" lectura y escritura para hbase ") //create contexto spark
val sc=new SparkContext(sparkConf)
//spify formato de salida y nombre de la tabla de salida para el trabajo
val newAPIJobConfiguration1 = Job.getInstance(conf )
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
// Leer completamente la tabla hbase
val rdd= sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result] p>
/Filtrar datos vacíos, luego actualizar cada registro y convertirlo al formato escrito
val final_rdd= rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop( ) p>
Como se puede ver en el código anterior, es muy sencillo utilizar spark+scala para operar hbase.
Echemos un vistazo a varias funciones personalizadas utilizadas en el medio:
Primero: checkNotEmptyKs
Función: filtrar los datos de grupos de columnas vacías
def checkNotEmptyKs ( f:((ImmutableBytesWritable,Resultado)):.Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[ Byte ]]= r.getFamilyMap( Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true
}
Segundo: forDatas p>
Función: leer cada dato, actualizarlo y luego convertirlo en una operación de escritura
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r= f._2 // Obtener resultado
val put:Put=new Put(r.getRow) // Declarar put
val ks=Bytes.toBytes("ks") // Lea la familia de columnas especificada
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]] = r.getFamilyMap(ks ).asScala
map .foreach(kv=>{// Recorra los datos de cada columna en el grupo de columnas especificado debajo de cada clave de fila para realizar la transformación
val kid= Bytes.toString(kv._1)// ID del punto de conocimiento
var value=Bytes.toString(kv._2)//Valor del punto de conocimiento
value="Valor modificado"
put .addColumn(ks, kv._1,Bytes.toBytes(valor))//poner en objeto de colocación
}
) if( put.isEmpty) null else (new ImmutableBytesWritable( ),put)
}
Tercero: función checkNull: filtra datos vacíos en el resultado final
Def checkNull(f:(( ImmutableBytesWritable(value))
}
Def checkNull(f:(( ImmutableBytesWritable,Put)): Boolean={ if(f==null) false else true
}
}
Lo anterior es toda la lógica de procesamiento. Cabe señalar que los datos no válidos en hbase se filtran y los datos no válidos se pueden omitir. La lógica es relativamente simple y la cantidad de código es relativamente pequeña.
La lógica es relativamente simple y la cantidad de código es relativamente pequeña.