Cómo implementa rocketmq el envío de mensajes en orden
1. El lado del productor garantiza que los mensajes se envíen a la misma cola en orden.
2. El consumidor deberá asegurar el consumo en la misma cola.
Veamos primero un ejemplo, la versión del código es la misma que la anterior.
Clase de productor:
importar Java.io.IOException;
importar java.text.SimpleDateFormat
importar java.util. Lista;
importar com.alibaba.rocketmq.client.exception.MQBrokerException;
importar com.alibaba.rocketmq.client.
importar com.alibaba.
/**
*Productor, envía mensajes secuenciales
*/
Productor de clase pública {
public static void main(String[] args) lanza IOException {
prueba {
DefaultMQProducer = new DefaultMQProducer("please_rename_unique_group_ name");
productor .setNamesrvAddr ("192.168.0.104:9876");
productor.start();
String[] etiquetas = new String[] { "EtiquetaA", "EtiquetaC". " };
Fecha fecha = nueva Fecha();
SimpleDateFormat sdf = nueva SimpleDateFormat("aaaa-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// Agregar un sufijo de hora p>
Cuerpo de cadena = dateStr + " Hola RocketMQ " + i;
Mensaje msg = nuevo Mensaje("TopicTestjjj", etiquetas[i % tags.length], "KEY" + i, cuerpo .getBytes ());
SendResult sendResult = productor.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List
ID entero = (Entero) arg;
return mqs.get(id);
} p>
}, 0);//0 es el subíndice de la cola
system.out.println(sendResult + ", body:" + body);
}
productor.shutdown();
} captura (MQClientException e) {
e.printStackTrace();
} captura (RemotingException e) {<
>
e.printStackTrace();
} captura (MQBrokerException e) {
e.printStackTrace();
} captura (InterruptedException e) {
e.printStackTrace();
}
System.in.read();
}
}
Lado del consumidor:
importar java.
importar java.util.List
importar java.util. Aleatorio
importar java.util.concurrent.TimeUnit;
importar com.alibaba.rocketmq.client.consumer.
importar com.alibaba.rocketmq. client.consumer.listener.ConsumeOrderlyContext;
importar com.alibaba.setNamesrvAddr(" 192.168.0.104:9876");
/**
* Configuración Cuando el consumidor inicia por primera vez, ya sea que comience a consumir desde el principio de la cola o desde el final de la cola
* Si no se inicia por primera vez, continuará consumiendo de acuerdo a la posición del último consumo
*/
consumidor. setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
Consumidor.
subscribe("TopicTestjjj", "TagA ||| TagC ||| TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Aleatorio aleatorio = new Aleatorio();
@Override
public ConsumeOrderlyStatus consumeMessage(List
context.setAutoCommit( true);
System.out.print(Thread.currentThread().getName() + " Recibir nuevos mensajes: " );
for (MessageExt msg: msgs) {
System .out .println(msg + ", content:" + new String(msg.getBody()));
}
prueba {
// Simula el lógica empresarial procesada en...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumidor iniciado.");
}
}
Inicio de NameServer y BrokerServer Finalmente, ejecute imprimir, elimine los que no sean importantes en el frente y mire solo las siguientes columnas:
contenido: 2015-12-06 17:03:21 Hola RocketMQ 0
content:2015- 12-06 17:03:21 Hola RocketMQ 1
content:2015-12-06 17:03:21 Hola RocketMQ 2
content:2015-12- 06 17:03 :21 Hola RocketMQ 3
contenido:2015-12-06 17:03:21 Hola RocketMQ 4
contenido:2015-12-06 17:03:21 Hola RocketMQ 5
contenido:2015-12-06 17:03:21 Hola RocketMQ 6
contenido:2015-12-06 17:03:21 Hola RocketMQ 7
content:2015-12-06 17:03:21 Hola RocketMQ 8
content:2015-12-06 17 :03:21 Hola RocketMQ 9
Como tú Como puede ver, los mensajes están ordenados.