Red de conocimiento informático - Material del sitio web - Cómo usar Scala+Spark para leer y escribir hbase

Cómo usar Scala+Spark para leer y escribir hbase

La empresa tiene algunos proyectos de procesamiento de datos en tiempo real, que se almacenan en hbase para proporcionar recuperación en tiempo real. Por supuesto, el modelo de datos almacenado en hbase es relativamente simple y los resultados de recuperación multidimensional complejos se almacenan en es. También está presentando Kylin como un motor de análisis de datos OLAP. Esto es algo de lo que hablaré más adelante, lo estudiaré más adelante cuando tenga tiempo.

Continuando con lo anterior, hbase almacena algunos datos en tiempo real. En las últimas dos semanas, los nuevos requisitos han realizado una actualización completa de los datos en la tabla especificada en hbase para cumplir con el desarrollo del negocio. La operación habitual de hbase es una sola cuajada, o insertar listas en lotes, usar la API de Java de hbase es relativamente simple, pero implica actualizaciones completas. Esta vez implica una actualización completa, por lo que si utiliza la API de operación de subproceso único original, definitivamente será mucho más lenta.

Con respecto a las operaciones por lotes de Hbase, generalmente usamos MapReduce para operar, lo que puede acelerar en gran medida la eficiencia del procesamiento. También escribí las operaciones MR de Hbase antes, y el proceso es relativamente engorroso, lo he estado usando recientemente. Scala para el desarrollo de Spark, así que use Scala + Spark directamente para hacer este trabajo. Por supuesto, la capa inferior usa TableOutputFormat y TableOutputFormat de Hbase. Esto es lo mismo que MR. luego haga un filtrado simple Conversión y finalmente escriba los resultados en hbase.

El proceso completo es el siguiente:

(1) Leer todos los datos de la tabla hbase

(2) Ejecutar una serie de ETL

(3) Vuelva a escribir todos los datos en hbase

El código principal es el siguiente:

//Get conf

val conf= HBaseConfiguration.create () //configura la tabla para leer

conf.set(TableInputFormat.INPUT_TABLE,tableName) //configura la tabla para escribir

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)//create sparkConf

val sparkConf=new SparkConf() //establece el nombre de la tarea para spark

sparkConf.setAppName(" lectura y escritura para hbase ") //create contexto spark

val sc=new SparkContext(sparkConf)

//spify formato de salida y nombre de la tabla de salida para el trabajo

val newAPIJobConfiguration1 = Job.getInstance(conf )

newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)

newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

// Leer completamente la tabla hbase

val rdd= sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]

,classOf[ImmutableBytesWritable]

,classOf[Result]

/Filtrar datos vacíos, luego actualizar cada registro y convertirlo al formato escrito

val final_rdd= rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)

sc.stop( )

Como se puede ver en el código anterior, es muy sencillo utilizar spark+scala para operar hbase.

Echemos un vistazo a varias funciones personalizadas utilizadas en el medio:

Primero: checkNotEmptyKs

Función: filtrar los datos de grupos de columnas vacías

def checkNotEmptyKs ( f:((ImmutableBytesWritable,Resultado)):.Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[ Byte ]]= r.getFamilyMap( Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true

}

Segundo: forDatas

Función: leer cada dato, actualizarlo y luego convertirlo en una operación de escritura

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r= f._2 // Obtener resultado

val put:Put=new Put(r.getRow) // Declarar put

val ks=Bytes.toBytes("ks") // Lea la familia de columnas especificada

val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]] = r.getFamilyMap(ks ).asScala

map .foreach(kv=>{// Recorra los datos de cada columna en el grupo de columnas especificado debajo de cada clave de fila para realizar la transformación

val kid= Bytes.toString(kv._1)// ID del punto de conocimiento

var value=Bytes.toString(kv._2)//Valor del punto de conocimiento

value="Valor modificado"

put .addColumn(ks, kv._1,Bytes.toBytes(valor))//poner en objeto de colocación

}

) if( put.isEmpty) null else (new ImmutableBytesWritable( ),put)

}

Tercero: función checkNull: filtra datos vacíos en el resultado final

Def checkNull(f:(( ImmutableBytesWritable(value))

}

Def checkNull(f:(( ImmutableBytesWritable,Put)): Boolean={ if(f==null) false else true

}

}

Lo anterior es toda la lógica de procesamiento. Cabe señalar que los datos no válidos en hbase se filtran y los datos no válidos se pueden omitir. La lógica es relativamente simple y la cantidad de código es relativamente pequeña.

La lógica es relativamente simple y la cantidad de código es relativamente pequeña.