La combinación de flink y kafka
Cuándo. flink está integrado con kafka, las dependencias de maven de las versiones correspondientes se muestran en la siguiente tabla
Ejemplos de dependencia de Maven
< dependencia>
?
?
? version>$ {flink.version}
?
Flink usa FlinkKafkaConsumer para lea y acceda a kafka, según la versión de kafka. El nombre de clase de FlinkKafkaConsumer se convierte en FlinkKafkaConsumer
[08,09,10...]. El siguiente número es el número de versión principal de Kafka.
La inicialización de FlinkKafkaConsumer requiere los siguientes parámetros
1. Nombre del tema, utilizado para especificar los datos que se consumirán de uno o más temas
2. Información de configuración de Kafka , Como dirección y puerto zk, dirección y puerto kafka, etc.
3. Deserializador (esquema), seleccione un deserializador para deserializar los datos consumidos.
El consumidor de flink kafka necesita saber cómo deserializar datos de mensajes en kafka en objetos en java o scala. Los usuarios logran esto usando DeserializationSchema, donde cada mensaje de Kafka usa el método eserialize(byte[] mensaje) de DeserializationSchema para convertir el mensaje de Kafka en la estructura que el usuario desea.
Los usuarios pueden personalizar el esquema implementando la interfaz KeyedDeserializationSchema o DeserializationSchema para convertir los datos a los que se accede en una estructura de datos personalizada.
Flink tiene soporte integrado para las siguientes implementaciones de DeserializationSchema:
clase pública SimpleStringSchema implementa DeserializationSchema
clase pública TypeInformationSerializationSchema Implementación de KeyedDeserializationSchema Sí clase pública TypeInformationKeyValueSerializationSchema clase pública JSONKeyValueDeserializationSchema implementa KeyedDeserializationSchema Para ejemplo: val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p) la clase pública MySchema implementa KeyedDeserializationSchema<.lt;KafkaMsgDTO> { @Override p> public KafkaMsgDTO deserialize(byte[] messageKey, byte[] mensaje, tema de cadena, partición int, desplazamiento largo) lanza IOExpression("IOExpression") desplazamiento largo) lanza IOException { String msg = new String(message, StandardCharsets.UTF_8); String key = null; if(messageKey key = null; if(messageKey ! = null ){ key = new String(messageKey, StandardCharsets.UTF_8); } return new KafkaMsgDTO(msg,key, tema,partición,desplazamiento) ) } @Override public boolean isEndOfStream(KafkaMsgDTO nextElement) { return false; } @Override @Override public TypeInformation return getForClass (KafkaMsgDTO.clase); } } & lt; ;dependencia> ? ? ? clase pública KafkaMsgDTO { tema de cadena privada; p> partición int privada; desplazamiento largo privado; mensaje de cadena privada; @Override cadena pública a cadena () { return "KafkaMsgDTO{" + "topic='" /> "id=''". + tema + '\''+ ", partición=""+ partición + ", desplazamiento=""+ desplazamiento + ", mensaje =''"+ mensaje + '\''+ p> ", tecla='" + tecla + '\'''+ tecla + '\''+ '}'; } clave de cadena privada; pública KafkaMsgDTO(){ } public KafkaMsgDTO(mensaje de cadena, clave de cadena, tema de cadena, partición int, desplazamiento largo){ this.mesg = mesg; this.key = clave; this.topic = tema; this.partition = partición; this.offset = offset; } público String getKey() { clave de retorno; } public void setKey(Clave de cadena) { this.key = clave ; } public String getTopic() { devolver tema; } public void setTopic (Tema de cadena) { this.topic = topic; } this.region = región this.region =; región; esta.región = región esta.región = región esta.región = esto; región = región.topic = tema; } public int getPartition() { devolver partición; } public void setPartition (int partición) { this.partition = partición; } public long getOffset() { retorno de compensación; } public void setOffset (compensación larga) { this.offset = compensación; } public String getMesg() { return mesg; } public void setMesg(String mesg) { this.mesg = mesg; } } val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("tema",nuevo MySc hema(),p) // myConsumer.setStartFromEarliest() ? // Comience a consumir desde el principio, los datos consumidos se consumirán repetidamente, el valor predeterminado de Kafka no es el desplazamiento de confirmación. // myConsumer.setStartFromLatest() // Comience a consumir desde el más nuevo, no consuma los datos que no se consumieron antes del inicio de la transmisión, el valor predeterminado de Kafka es No confirmar offset. ? ? myConsumer.setStartFromGroupOffsets() // Comience a consumir desde el desplazamiento de consumo. Kafka tiene un desplazamiento de confirmación, que es el consumo predeterminado. //Si no se realiza ningún punto de control, los datos que ingresan al receptor enviarán el desplazamiento, si la lógica dentro del receptor falla, el desplazamiento aún se enviará y el programa lo hará. exit, si se reinicia el flujo de datos, los datos fallidos no se volverán a consumir //Si se realiza un punto de control, garantizará que los datos se consuman exactamente una vez, de un extremo a otro. addSource( miConsumidor) stream.addSink(x=>{ ? println(x) ? println(1/(x.getMesg.toInt% 2) )//Error si el mensaje es par, el denominador es 0 ? println(x) }) val stream = env.addSource(myConsumer) ) // Los experimentos muestran que si hay un subproceso ejecutándose en la lógica de procesamiento del receptor y el subproceso asíncrono falla, el desplazamiento también fallará. stream.addSink(x=>{ ? println(x) ? new Thread(new Runnable { anular def run():Unit = { ? println(1/(x.getMesg.toInt%2))//Error si el mensaje es par, el denominador es 0 } p> ?}).start() ? println(x) }) val específicoStartOffsets = new java.util.HashMap[ KafkaTopicPartition, java.lang.Long]() specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) specificStartOffsets.put(new KafkaTopicPartition("myTopic" , 1), 31L) specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) myConsumer.setStartFromSpecificOffsets(specificStartOffsets)