Red de conocimiento informático - Aprendizaje de programación - La combinación de flink y kafka

La combinación de flink y kafka

Flink proporciona un conector Kafka único para leer y escribir datos de temas de Kafka. Cuando flink consume datos de Kafka, no implementa la semántica de una vez exacta mediante el seguimiento del desplazamiento del grupo de consumo de Kafka. En cambio, rastrea el desplazamiento internamente en flink y realiza un punto de control para implementar la semántica de una vez exacta.

Cuándo. flink está integrado con kafka, las dependencias de maven de las versiones correspondientes se muestran en la siguiente tabla

Ejemplos de dependencia de Maven

1.7.0

2.11

2.11.12

< dependencia>

?

? flink-streaming-scala_${scala.binary.version}

? version>$ {flink.version}

? proporcionado

Flink usa FlinkKafkaConsumer para lea y acceda a kafka, según la versión de kafka. El nombre de clase de FlinkKafkaConsumer se convierte en FlinkKafkaConsumer

[08,09,10...]. El siguiente número es el número de versión principal de Kafka.

La inicialización de FlinkKafkaConsumer requiere los siguientes parámetros

1. Nombre del tema, utilizado para especificar los datos que se consumirán de uno o más temas

2. Información de configuración de Kafka , Como dirección y puerto zk, dirección y puerto kafka, etc.

3. Deserializador (esquema), seleccione un deserializador para deserializar los datos consumidos.

El consumidor de flink kafka necesita saber cómo deserializar datos de mensajes en kafka en objetos en java o scala. Los usuarios logran esto usando DeserializationSchema, donde cada mensaje de Kafka usa el método eserialize(byte[] mensaje) de DeserializationSchema para convertir el mensaje de Kafka en la estructura que el usuario desea.

Los usuarios pueden personalizar el esquema implementando la interfaz KeyedDeserializationSchema o DeserializationSchema para convertir los datos a los que se accede en una estructura de datos personalizada.

Flink tiene soporte integrado para las siguientes implementaciones de DeserializationSchema:

clase pública SimpleStringSchema implementa DeserializationSchema

clase pública TypeInformationSerializationSchema implementa DeserializationSchema

Implementación de KeyedDeserializationSchema Sí

clase pública TypeInformationKeyValueSerializationSchema implementa KeyedDeserializationSchema>

clase pública JSONKeyValueDeserializationSchema implementa KeyedDeserializationSchema

Para ejemplo:

val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p)

la clase pública MySchema implementa KeyedDeserializationSchema<.lt;KafkaMsgDTO> {

@Override

public KafkaMsgDTO deserialize(byte[] messageKey, byte[] mensaje, tema de cadena, partición int, desplazamiento largo) lanza IOExpression("IOExpression") desplazamiento largo) lanza IOException {

String msg = new String(message, StandardCharsets.UTF_8);

String key = null;

if(messageKey key = null;

if(messageKey ! = null ){

key = new String(messageKey, StandardCharsets.UTF_8);

}

return new KafkaMsgDTO(msg,key, tema,partición,desplazamiento) )

}

@Override

public boolean isEndOfStream(KafkaMsgDTO nextElement) {

return false;

}

@Override

@Override

public TypeInformation getProducedType() {

return getForClass (KafkaMsgDTO.clase);

}

}

& lt;

;dependencia>

? org.apache.flink

? flink-connector-kafka-base_2.11

? 1.7.0

clase pública KafkaMsgDTO {

tema de cadena privada;

partición int privada;

desplazamiento largo privado;

mensaje de cadena privada;

@Override

cadena pública a cadena () {

return "KafkaMsgDTO{" +

"topic='" />

"id=''".

+ tema + '\''+

", partición=""+ partición +

", desplazamiento=""+ desplazamiento +

", mensaje =''"+ mensaje + '\''+

p> ", tecla='" + tecla + '\'''+ tecla + '\''+

'}';

}

clave de cadena privada;

pública KafkaMsgDTO(){

}

public KafkaMsgDTO(mensaje de cadena, clave de cadena, tema de cadena, partición int, desplazamiento largo){

this.mesg = mesg;

this.key = clave;

this.topic = tema;

this.partition = partición;

this.offset = offset;

}

público String getKey() {

clave de retorno;

}

public void setKey(Clave de cadena) {

this.key = clave ;

}

public String getTopic() {

devolver tema;

}

public void setTopic (Tema de cadena) {

this.topic = topic;

}

this.region = región

this.region =; región;

esta.región = región

esta.región = región

esta.región =

esto; región = región.topic = tema;

}

public int getPartition() {

devolver partición;

}

public void setPartition (int partición) {

this.partition = partición;

}

public long getOffset() {

retorno de compensación;

}

public void setOffset (compensación larga) {

this.offset = compensación;

}

public String getMesg() {

return mesg;

}

public void setMesg(String mesg) {

this.mesg = mesg;

}

}

val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("tema",nuevo MySc

hema(),p)

// myConsumer.setStartFromEarliest() ?

// Comience a consumir desde el principio, los datos consumidos se consumirán repetidamente, el valor predeterminado de Kafka no es el desplazamiento de confirmación.

// myConsumer.setStartFromLatest()

// Comience a consumir desde el más nuevo, no consuma los datos que no se consumieron antes del inicio de la transmisión, el valor predeterminado de Kafka es No confirmar offset.

?

? myConsumer.setStartFromGroupOffsets()

// Comience a consumir desde el desplazamiento de consumo. Kafka tiene un desplazamiento de confirmación, que es el consumo predeterminado.

//Si no se realiza ningún punto de control, los datos que ingresan al receptor enviarán el desplazamiento, si la lógica dentro del receptor falla, el desplazamiento aún se enviará y el programa lo hará. exit, si se reinicia el flujo de datos, los datos fallidos no se volverán a consumir

//Si se realiza un punto de control, garantizará que los datos se consuman exactamente una vez, de un extremo a otro. addSource( miConsumidor)

stream.addSink(x=>{

? println(x)

? println(1/(x.getMesg.toInt% 2) )//Error si el mensaje es par, el denominador es 0

? println(x)

})

val stream = env.addSource(myConsumer) )

// Los experimentos muestran que si hay un subproceso ejecutándose en la lógica de procesamiento del receptor y el subproceso asíncrono falla, el desplazamiento también fallará.

stream.addSink(x=>{

? println(x)

? new Thread(new Runnable {

anular def run():Unit = {

? println(1/(x.getMesg.toInt%2))//Error si el mensaje es par, el denominador es 0

}

?}).start()

? println(x)

})

val específicoStartOffsets = new java.util.HashMap[ KafkaTopicPartition, java.lang.Long]()

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)

specificStartOffsets.put(new KafkaTopicPartition("myTopic" , 1), 31L)

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumer.setStartFromSpecificOffsets(specificStartOffsets)