Springboot integra Kafka
Recientemente, recibí comentarios de muchos amigos de que están planeando introducir el middleware Kafka en el proyecto Springboot. Encontré mucha información en Internet, pero está dispersa. Después de seguir las instrucciones, todavía es bastante difícil que el proyecto funcione normalmente. Escuché que conozco mejor a Kafka y espero compartirlo. Da la casualidad de que debido a la epidemia, las cosas están un poco más relajadas de lo habitual recientemente. También quería aprovechar esta oportunidad para escribir algo. En primer lugar, puedo utilizarlo como resumen. Orientación para amigos necesitados. Espero que este artículo sea útil para todos los amigos que tengan problemas.
1. Instale JDK. La versión específica se puede seleccionar de acuerdo con la situación real del proyecto. Actualmente, jdk8 es la más utilizada.
2. Instale Zookeeper. se puede seleccionar de acuerdo con la situación real del proyecto. Se utiliza este proyecto 3.5.8
3. Instalar Kafka? La versión específica se puede seleccionar de acuerdo con la situación real que se utiliza en este proyecto. 3.5.1
4. Instalar Kafka Manage ? (No esencial: la instalación es principalmente para proporcionar operaciones de interfaz gráfica para las operaciones del proyecto Kafka. La versión específica se puede seleccionar de acuerdo con la situación real del proyecto). Este proyecto utiliza 1.3.3.7
lt; parentgt;
lt;groupIdgt;org.springframework.bootlt;/groupIdgt;
lt;artifactIdgt;spring. -boot-starter-parentlt;/artifactIdgt;
lt;versiongt;2.3 .12.RELEASSelt;/versiongt;
lt;relativePath/gt;!-- padre de búsqueda desde el repositorio --gt;
lt;/parentgt;
lt;dependenciesgt;
lt;!--dependencias web de springboot--gt; p>
lt;dependencygt;
lt;groupIdgt;org. springframework.bootlt;/groupIdgt;
lt;artifactIdgt;spring-boot-starter-weblt;/artifactIdgt ;
lt;/dependencygt;
lt; !--dependencia kafka--gt;
lt;dependencygt;
lt ;groupIdgt;org.springframework.kafkalt;/groupIdgt;
lt;artifactIdgt ;spring-kafkalt;/artifactIdgt;
lt;/dependencygt;
lt ;/dependenciesgt;
spring:
? kafka :
bootstrap-servers: 127.0.0.1:9092
productor: #producer
? # El número de veces que se retransmite el mensaje después de que se produce un error.
Reintentar una vez. Este valor debe combinarse con el escenario empresarial. Ya sea volver a intentarlo o no, tiene sus propios méritos (reintentar, ventaja: asegúrese de que el productor escriba el bloque con éxito tanto como sea posible; desventaja: es posible que los datos con la escritura secuencial puede alterar el orden
? #Por ejemplo: escriba los datos 1/2/3 en secuencia, pero al escribir 1, debido a una anomalía de la red, se reescribió y los datos que cayeron en el bloque se convirtieron 2/3/1) p>
? reintentos: 1
? #Cuando es necesario enviar varios mensajes a la misma partición, el productor los colocará en el mismo lote. Este parámetro especifica el tamaño de memoria que puede utilizar un lote, calculado en bytes.
16k en modo
? tamaño de lote: 16384 #16k
? # Establecer el tamaño del buffer de memoria del productor
? p>
? acks: 1
? # Método de serialización de claves
? > ? # Método de serialización de valores
? value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumidor:
? : default-group
? # El intervalo de tiempo para el envío automático en la versión Spring Boot 2.X utiliza el tipo de valor Duración, que debe cumplir con un formato específico, como 1S, 1M, 2H, 5D. Este atributo solo tiene efecto cuando enable-auto-commit: true
? auto-commit-interval: 1S
? > ? # Este atributo especifica lo que debe hacer el consumidor al leer una partición sin un desplazamiento o con un desplazamiento no válido:
? # último (predeterminado) en En caso de un desplazamiento no válido, el consumidor comenzará leer datos del último registro (registros generados después de que se inicia el consumidor)
#earliest: en el caso de un desplazamiento no válido, el consumidor leerá el registro de la partición desde la posición inicial
? auto-offset-reset: más temprano
? #Método de deserialización de la clave
clave -deserializer: org.apache.kafka.common.serialization.StringSerializer
? #Método de deserialización del valor
? value-deserializer: org.apache.kafka.common.serialization .StringSerializer
oyente:
? # Enviar cuando el oyente consumidor procesa cada registro (ListenerConsumer)
? # RECORD
# ¿Cuándo el oyente consumidor procesa cada lote de datos de encuesta () (ListenerConsumer) y enviado
? # BATCH
? # Cuando cada lote de poll() Después de que el consumidor oyente (ListenerConsumer) procesa los datos, se envía cuando llega el momento. ya que el último envío es mayor que TIME
? # TIME
? # Cuando cada lote de datos de encuesta () Después de ser procesado por el oyente del consumidor (ListenerConsumer), se envía cuando ¿El número de registros procesados es mayor o igual a COUNT
? # COUNT
?
# TIME COUNT Enviar cuando se cumple una condición
? # COUNT_TIME
? # Después de que el oyente del consumidor (ListenerConsumer) procese cada lote de datos de encuesta (), enviar manualmente después. llamando a Acknowledgment.acknowledge()
? # MANUAL
? # Enviar inmediatamente después de llamar manualmente a Acknowledgment.acknowledge(), esto se usa generalmente
?
? ack-mode: manual_immediate
? # Número de subprocesos que se ejecutan en el contenedor de escucha
? kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --particiones 1 --prueba de tema
?-- 127.0.0.1:2181?
--?replication-factor?particiones número de réplicas
--particiones?número de particiones
Haga clic en Agregar clúster para agregar un clústergt; Interfaz de configuración del clúster:
Ingrese el nombre del clúster (como Kafka-Cluster-1) y la dirección del servidor Zookeeper (como localhost: 2181) y seleccione la versión de Kafka más cercana (como 0.10. 1.0)
paquete com.charlie.cloudconsumer.service.impl.kafka;
importar com.charlie.cloudconsumer.common.utils.JSON;
importar lombok.extern.slf4j Slf4j;
importar org.springframework.beans.factory.annotation.Autowired;
importar org.springframework.kafka.core.KafkaTemplate;
importar org.springframework.kafka.support.SendResult;
importar org.springframework.util.concurrent.ListenableFuture;
importar org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
/**
* @Author: charlie
* @CreateTime: 2022/4 /9
* @Descripción: final de la producción de mensajes de Kafka, que proporciona API para descubrir mensajes en el bloque de Kafka para fines comerciales
*/
@Component
@Slf4j
public class QueueProducer {
@Autowired
private KafkaTemplatekafkaTemplate;
public void sendQueue(String topic, Object msgContent) {
String obj2String =JSON.toJSONString(msgContent);
log.info("preparación del servicio kafka El mensaje a enviar es: {}", obj2String);
//Enviar mensaje
ListenableFuturegt; futuro =kafkaTemplate.send(topic, UUID.randomUUID().toString() , obj2String );
futuro.addCallback(new ListenableFutureCallbackgt; () {
//Mensaje enviado correctamente
@Override
public void onSuccess (SendResult stringObjectSendResult) {
log.info("[producción de servicio kafka exitosa] tema: {}, resultado {}", tema, stringObjectSendResult.toString());
}
// Error al enviar el mensaje
@Override
public void onFailure(Throwable throwable) {
//Manejo de falla de envío, este departamento solo registra registros de errores, que se pueden procesar en combinación con el negocio real
log.info("[falla de producción de servicio de Kafka] tema: {}, motivo de falla {}", tema , throwable.getMessage ());
}
}
}
}
paquete com.charlie.cloudconsumer.service.impl.kafka;
importar org.apache.commons.lang3.ObjectUtils;
importar org.springframework.stereotype.Component;
/ **
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description: el objeto de procesamiento comercial real en el lado del consumidor
p>*/
@Component // La razón para agregar esta anotación es porque el lado del consumidor se inicializará cuando comience el proyecto. use esta clase, por lo que también se permite su uso en el proyecto Regístrese al inicio
public class QueueDataProcess {
pu.
blic boolean doExec(Object obj) {
// toda la lógica empresarial específica
if (ObjectUtils.isNotEmpty(obj)) {
return true; p>
}else {
devuelve falso
}
}
}
paquete; com.charlie.cloudconsumer.service.impl.kafka;
importar com.charlie.cloudconsumer.common.utils.JSON;
importar com.charlie.cloudconsumer.model.Order;
importar lombok.extern.slf4j.Slf4j;
importar org.apache.commons.lang.exception.ExceptionUtils;
importar org.apache.kafka. client.consumer.ConsumerRecord;
importar org.springframework.beans.factory.annotation.Autowired;
importar org.springframework.kafka.annotation.KafkaListener;
importar org.springframework.kafka.support.Acknowledgement;
importar org.springframework.kafka.support.KafkaHeaders;
importar org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @ Autor : charlie
* @CreateTime: 2022/4/9
* @Description: consumidor de mensajes kafka, responsable de consumir mensajes de temas específicos
*/< / p>
@Component
@Slf4j
@SuppressWarnings("all")
public class QueueConsumer {
@ Autowired
QueueDataProcess privado queueDataProcess;
/**
*
*/
@KafkaListener(topics = "prueba", groupId ="consumidor")
public void doConsumer(registro ConsumerRecord,Ack
acuse de recibo de ahora, @Header(KafkaHeaders.RECEIVED_TOPIC)Tema de cadena) {
Mensaje opcional =Optional.ofNullable(record.value());
if (message.isPresent()) {
prueba {
Objeto msg =message.get();
log.info("[kafka-Consumption] doConsumer consumido: Tema:" tema ", Mensaje:" msg);
boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(), Order.class));
if (res) {
ack.acknowledge();
}
}catch (Excepción ex) {
log.error("[kafka - Excepción de consumo] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));
}
}
}
}
paquete com.charlie.cloudconsumer.controller;
importar com.alibaba.fastjson.JSON
importar com.charlie.cloudconsumer.common.utils. ;
importar com.charlie.cloudconsumer.common.utils.BuildResponseUtils;
importar com.charlie.cloudconsumer.model.Order;
importar charlie. .cloudconsumer.service.impl.kafka.QueueProducer;
importar org.apache.commons.lang3.ObjectUtils;
importar org.springframework.beans.factory.annotation.Autowired <; /p>
importar org.springframework.web.bind.annotation.RequestBody;
importar org.springframework.web.bind.annotation.RequestMapping
importar org. .web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author : charlie
* @
CreateTime: 2022/4/9
* @Descripción: controlador de envío de mensajes kafka, responsable de aceptar mensajes en cola enviados por los usuarios
*/
@RestController
@RequestMapping(value ="/kafka", produce =MediaType.APPLICATION_JSON_VALUE)
clase pública KafkaController {
@Autowired
privada QueueProducer queueProducer;
@RequestMapping(value = "/send", método = RequestMethod.POST)
public? AjaxResultlt;?gt; sendMsg(@RequestBody Order order) {< / p>
AjaxResultlt;?gt; ajaxResult= null;
if (ObjectUtils.isNotEmpty(order)) {
? queueProducer.sendQueue("test", orden) ;
ajaxResult = BuildResponseUtils.success(0, "Enviar mensaje: " JSON.toJSONString(order) "¡Éxito!"
} else {
ajaxResult = BuildResponseUtils.success(1, "Enviar mensaje:" JSON.toJSONString(order) "¡Error!"
}
return ajaxResult
}
}