Red de conocimiento informático - Conocimiento del nombre de dominio - Cómo utilizar Spark Streaming y Hadoop para lograr conexiones de sesión casi en tiempo real

Cómo utilizar Spark Streaming y Hadoop para lograr conexiones de sesión casi en tiempo real

Spark Streaming es uno de los componentes más interesantes de Apache Spark. Puede utilizar Spark Streaming para crear canalizaciones de datos para procesar datos de transmisión utilizando la misma API que los datos de carga por lotes. Además, el método de "microprocesamiento" de Spark Steaming proporciona una flexibilidad considerable para hacer frente a fallas en las tareas causadas por algunos motivos.

En este artículo, lo familiarizaré con algunas características comunes y avanzadas de Spark Streaming a través de una demostración de ejemplo de eco casi en tiempo real de eventos del sitio web y luego cargaré estadísticas relacionadas con la actividad en Apache HBase usando Don' No me gustan las herramientas de BI para gráficos y análisis. (La sesionización se refiere a capturar toda la actividad del flujo de clics dentro del período de tiempo de la sesión del sitio web de un solo visitante). Puede encontrar el código para esta demostración aquí.

Sistemas como este son muy útiles para comprender el comportamiento de los visitantes, ya sean humanos o máquinas. Con un poco de trabajo adicional, también se puede diseñar en modo de ventana para detectar posibles fraudes de forma asincrónica.

Código de Spark Streaming

La clase principal en nuestro ejemplo es:

com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData

Echemos un vistazo a este fragmento (ignore las líneas 1 a 59, que contienen importaciones y otras cosas aburridas).

Líneas 60 a 112: Configuración de Spark Streaming Estas líneas son muy básicas y se utilizan para configurar Spark Streaming, y puede elegir recibir flujos de datos desde HDFS o sockets. Si es nuevo en Spark Streaming, agregué algunos comentarios detallados para ayudarlo a comprender el código. (No voy a entrar en detalles aquí, ya que todavía está en el código de muestra.

)

//Esto es solo crear un objeto Spark Config. No hago mucho aquí, pero

//agrego el nombre de la aplicación. Hay toneladas de opciones para poner en el. Configuración de Spark,

//pero no se necesita ninguna para este ejemplo simple.

val sparkConf = new SparkConf().

setAppName("SessionizeData " + args (0)).

set("spark.cleaner.ttl", "120000")

//Estas dos líneas nos sacarán de SparkContext y nuestro StreamingContext.

//Estos objetos tienen toda la funcionalidad raíz que necesitamos para comenzar.

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds (10))

//Aquí está cargando nuestro objeto de configuración de HBase. Esto tendrá

//toda la información necesaria para conectarnos a nuestro clúster de HBase.

< p. >//No hay nada diferente aquí de cuando normalmente interactúas con HBase.

val conf = HBaseConfiguration.create();

conf.addResource(new Path("/etc / hbase/conf/core-site.xml"));

conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));

//Este es un objeto HBaseContext. Esta es una buena abstracción que nos ocultará

//cualquier elemento complejo de HBase para que podamos centrarnos en nuestro caso de negocio

// HBaseContext. es del proyecto SparkOnHBase que se puede encontrar en

// /tmalaska/SparkOnHBase

val hbaseContext = new HBaseContext(sc, conf);

/ / Esto crea una referencia a nuestro DStream raíz. Los DStreams son como RDD pero

//con el contexto de estar en micro lotes.

world. Lo configuré como nulo ahora

//porque luego doy la opción de completar estos datos desde HDFS o desde

//un socket. también estar poblado por Kafka,

//Flume, sistema MQ o cualquier otra cosa. Me concentré en estos porque

//son los más fáciles de configurar.

var líneas: DStream[String] = null

//Opciones para la carga de datos Agregaremos Kafka y Flume en algún momento

if (args(0). igual("socket")) {

val host = args(FIXED_ARGS);

val puerto = args(FIXED_ARGS + 1);

println(" host:" + host)

println("port:" + Integer.parseInt(port))

//Ejemplo simple de cómo configurar un receptor desde un Socket Stream

líneas = ssc.socketTextStream(host, port.toInt)

} else if (args(0).equals("newFile")) {

val directorio = args(FIXED_ARGS)

println("directorio:" + directorio)

//Ejemplo simple de cómo configurar un receptor desde una carpeta HDFS

líneas = ssc.fileStream[LongWritable, Text, TextInputFormat](directorio, (t: Path) => true, true).map(_._2.toString)

} else {

throw new RuntimeException ("tipo de entrada incorrecto")

}

Líneas 114 a 124: análisis de cadenas Aquí es donde comienza Spark Streaming. Mire las siguientes cuatro líneas: :

val ipKeyLines = lines.map[(String, (Long, Long, String))](eventRecord => {

//Obtener la hora y la dirección IP del evento original

val time = dateFormat.parse(

eventRecord.substring(evento

Record.indexOf('[') + 1, eventRecord.indexOf(']'))).

getTime()

val ipAddress = eventRecord.substring(0, eventRecord. indexOf(' '))

//Devolvemos la hora dos veces porque usaremos la primera como hora de inicio

//y la segunda como hora de finalización

(ipAddress, (time, time, eventRecord))

})

El primer comando anterior es una función de mapa y análisis de las "líneas" del objeto DSTREAM. El evento sin formato está separado por la dirección IP, la marca de tiempo y el cuerpo del evento. Para aquellos nuevos en Spark Streaming, un DSTREAM contiene un lote de registros para procesar. Estos registros se completan con el objeto receptor previamente definido y la función de mapa genera otro DSTREAM dentro de este microlote para almacenar los registros transformados para su procesamiento adicional.

Al observar el diagrama de Spark Streaming como el anterior, hay algunas cosas a tener en cuenta:

Cada microlote se destruye cuando llega al segundo conjunto cuando se construye StreamingContext

El receptor siempre se llena con RDD del siguiente microlote

Los RDD antiguos del microlote anterior se limpiarán y descartarán

Líneas 126. a 135: Generar sesiones Ahora que tenemos la dirección IP y la hora obtenidas de los registros de red, es hora de crear sesiones. El siguiente código establece sesiones a través del primer evento de agregación en microlotes y luego reduce estas sesiones en DSTREAM.

val lastSessionInfo = ipKeyLines.

map[(String, (Long, Long, Long))](a => {

//transformar a ( ipAddress, (hora, hora, contador))

(a._1, (a._2._1, a._2._2, 1))

}).

reduceByKey((a, b) => {

//transformar a (ipAddress, (lowestStartTime, MaxFinishTime, sumOfCounter))

(Math.min(a ._1, b._1), Math.max(a._2, b._2), a._3 + b._3)

}).

updateStateByKey(updateStatbyOfSessions)

Aquí hay un ejemplo de cómo se pueden reducir los registros en un microlote:

Para unirnos dentro de un microlote en el alcance de la sesión, podemos usar la excelente función updateStateByKey. (Realizar operaciones similares a unir/reducir) La siguiente figura ilustra cómo este proceso cambia con el tiempo en términos de DStreams.

Ahora, profundicemos en la función updateStatbyOfSessions, que está definida en la parte inferior del archivo. Este código (tenga en cuenta los comentarios detallados) contiene mucha magia para hacer que la sesión se realice en modo continuo en microlotes.

/**

* Esta función será llamada para la unión de claves del Reduce DStream

* con las sesiones activas del último micro lote con la dirección ip

* es la clave

*

* El objetivo es que esto produzca un RDD con estado que tenga todos los activos

* sesiones. Entonces agregamos nuevas sesiones y eliminamos sesiones que han expirado

* y ampliamos las sesiones que aún están activas

*/

def updateStatbyOfSessions(

//(sessionStartTime, sessionFinishTime, countOfEvents)

a: Seq[(Long, Long, Long)],

//(sessionStartTime, sessionFinishTime, countOfEvents , esNuevaSesión)

b: Opción[(Largo, Largo, Largo, Booleano)]

): Opción[(Largo, Largo, Largo, Booleano)] = {

//Esta función devolverá un valor Opcional.

//Si queremos eliminar el valor podemos devolver un "Ninguno" opcional.

//Este valor contiene cuatro partes

//(startTime, endTime, countOfEvents, isNewSession)

var resultado: Option[(Long, Long, Long, Boolean)] = null

// Estas declaraciones if dicen si no obtuvimos un nuevo evento para

//la dirección IP de esta sesión por más tiempo que la sesión

//tiempo de espera + el lote tiempo entonces es seguro eliminar este valor clave

//del futuro Stateful DStream

if (a.size == 0) {

if ( System.currentTimeMillis() - b.get._2 > SESSION_TIMEOUT + 11000) {

resultado = Ninguno

} más {

if (b.get. _4 == falso) {

resultado = b

>

} else {

resultado = Algunos((b.get._1, b.get._2, b.get._3, false))

}

}

}

//Ahora, debido a que usamos la función de reducción antes de esta función,

//solo llegaremos a más un evento en la Secuencia.

a.foreach(c => {

if (b.isEmpty) {

//Si no había ningún valor en Stateful DStream, simplemente agréguelo

//nuevo, con un verdadero para ser una sesión nueva

resultado = Some((c._1, c._2, c._3 , verdadero))

} else {

if (c._1 - b.get._2 < SESSION_TIMEOUT) {

//Si la sesión del DStream con estado no ha agotado el tiempo de espera

//luego extiende la sesión

result = Some((

Math.min(c._1, b.get. _1),

//newStartTime

Math.max(c._2, b.get._2),

//newFinishTime

b.get._3 + c._3,

//newSumOfEvents

false

//Esta no es una sesión nueva

))

} else {

//De lo contrario, elimine la sesión anterior con una nueva

resultado = Some((

c._1,

//newStartTime

c._2,

//newFinishTime

b.get._3,

//newSumOfEvents

verdadero

//nueva sesión

))

}

}

})

resultado

}

}

Se ha hecho mucho en este Codificar y, en muchos sentidos, es la parte más complicada de todo el trabajo. En resumen, realiza un seguimiento de las sesiones activas, para que sepa si continúa una sesión existente o inicia una nueva.

Líneas 126 a 207: Conteo y HBase Esta parte realiza la mayor parte del trabajo de conteo. Hay muchas repeticiones aquí, veamos un ejemplo de conteo y luego, paso a paso, almacenamos los mismos conteos de registros generados en HBase.

val onlyActiveSessions = lastSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT)

val newSessionCount = onlyActiveSessions .filter(t => {

//es la sesión más nueva que el último micro lote

//y el booleano que dice que esta es una nueva sesión es verdadero

(System.currentTimeMillis() - t._2._2 > 11000 && t._2._4)

}).

recuento.

mapa [HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))

En resumen, el código anterior filtra todas las sesiones excepto las activas, las cuenta y pone El recuento final es grabado en una instancia de HashMap. Utiliza HashMap como contenedor, por lo que una vez realizados todos los recuentos, podemos llamar a la función de reducción a continuación para colocarlos todos en un solo registro. (Estoy seguro de que hay mejores maneras de lograr esto, pero este método funciona bien).

A continuación, el siguiente código procesa todos esos HashMap y coloca todos sus valores en un HashMap.

val allCounts = newSessionCount.

unión(totalSessionCount).

unión(totales).

unión(totalEventsCount).

p>

unión(deadSessionsCount).

unión(totalSessionEventCount).

reducir((a, b) => b ++ a)

El uso de HBaseContext hace que sea muy fácil interactuar con Spark Streaming y HBase. Todo lo que necesita hacer es usar un HashMap y una función para convertirlo en un objeto put para proporcionarlo a DSTREAM.

hbaseContext.streamBulkPut[HashMap[String, Long]](

allCounts,

//El RDD de entrada

hTableName,

//El nombre de la tabla que queremos poner también

(t) => {

//Aquí estamos convirtiendo nuestro registro de entrada en un put

//La clave de fila es C para Recuento y un tiempo de conteo hacia atrás, por lo que el recuento más nuevo

//se muestra primero en el orden de HBase

val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis())))

//Estamos iterando a través del HashMap para hacer todas las columnas con sus recuentos p>

t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString)))

put

},

false)

Ahora, esta información de HBase se puede empaquetar con la tabla de Apache Hive y luego ejecutar una consulta para obtener un gráfico como el que se muestra a continuación, que se actualizará en cada microlote.

Líneas 209 a 215: Escritura en HDFS La tarea final es agregar la información de la sesión activa que contiene los datos del evento y luego persistir el evento en HDFS con la hora de inicio de la sesión.

//Persistir en HDFS

ipKeyLines.join(onlyActiveSessions).

map(t => {

//Raíz de sesión hora de inicio | Mensaje del evento

dateFormat.format(new Date(t._2._2._1)) + "t" + t._2._1._3

}).

saveAsTextFiles(outputDir + "/session", "txt")