Red de conocimiento informático - Problemas con los teléfonos móviles - Cómo implementa rocketmq el envío de mensajes en orden

Cómo implementa rocketmq el envío de mensajes en orden

La mensajería secuencial de Rocketmq debe lograr dos cosas:

1. El lado del productor garantiza que los mensajes se envíen a la misma cola en orden.

2. El consumidor deberá asegurar el consumo en la misma cola.

Veamos primero un ejemplo, la versión del código es la misma que la anterior.

Clase de productor:

importar Java.io.IOException;

importar java.text.SimpleDateFormat

importar java.util. Lista;

importar com.alibaba.rocketmq.client.exception.MQBrokerException;

importar com.alibaba.rocketmq.client.

importar com.alibaba.

/**

*Productor, envía mensajes secuenciales

*/

Productor de clase pública {

public static void main(String[] args) lanza IOException {

prueba {

DefaultMQProducer = new DefaultMQProducer("please_rename_unique_group_ name");

productor .setNamesrvAddr ("192.168.0.104:9876");

productor.start();

String[] etiquetas = new String[] { "EtiquetaA", "EtiquetaC". " };

Fecha fecha = nueva Fecha();

SimpleDateFormat sdf = nueva SimpleDateFormat("aaaa-MM-dd HH:mm:ss");

String dateStr = sdf.format(date);

for (int i = 0; i < 10; i++) {

// Agregar un sufijo de hora

Cuerpo de cadena = dateStr + " Hola RocketMQ " + i;

Mensaje msg = nuevo Mensaje("TopicTestjjj", etiquetas[i % tags.length], "KEY" + i, cuerpo .getBytes ());

SendResult sendResult = productor.send(msg, new MessageQueueSelector() {

@Override

public MessageQueue select(List mqs , Mensaje mensaje, Objeto arg) {

ID entero = (Entero) arg;

return mqs.get(id);

}

}, 0);//0 es el subíndice de la cola

system.out.println(sendResult + ", body:" + body);

}

productor.shutdown();

} captura (MQClientException e) {

e.printStackTrace();

} captura (RemotingException e) {<

>

e.printStackTrace();

} captura (MQBrokerException e) {

e.printStackTrace();

} captura (InterruptedException e) {

e.printStackTrace();

}

System.in.read();

}

}

Lado del consumidor:

importar java.

importar java.util.List

importar java.util. Aleatorio

importar java.util.concurrent.TimeUnit;

importar com.alibaba.rocketmq.client.consumer.

importar com.alibaba.rocketmq. client.consumer.listener.ConsumeOrderlyContext;

importar com.alibaba.setNamesrvAddr(" 192.168.0.104:9876");

/**

* Configuración Cuando el consumidor inicia por primera vez, ya sea que comience a consumir desde el principio de la cola o desde el final de la cola

* Si no se inicia por primera vez, continuará consumiendo de acuerdo a la posición del último consumo

*/

consumidor. setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

Consumidor.

subscribe("TopicTestjjj", "TagA ||| TagC ||| TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Aleatorio aleatorio = new Aleatorio();

@Override

public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {

context.setAutoCommit( true);

System.out.print(Thread.currentThread().getName() + " Recibir nuevos mensajes: " );

for (MessageExt msg: msgs) {

System .out .println(msg + ", content:" + new String(msg.getBody()));

}

prueba {

// Simula el lógica empresarial procesada en...

TimeUnit.SECONDS.sleep(random.nextInt(10));

} catch (Exception e) {

e.printStackTrace();

}

return ConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumidor iniciado.");

}

}

Inicio de NameServer y BrokerServer Finalmente, ejecute imprimir, elimine los que no sean importantes en el frente y mire solo las siguientes columnas:

contenido: 2015-12-06 17:03:21 Hola RocketMQ 0

content:2015- 12-06 17:03:21 Hola RocketMQ 1

content:2015-12-06 17:03:21 Hola RocketMQ 2

content:2015-12- 06 17:03 :21 Hola RocketMQ 3

contenido:2015-12-06 17:03:21 Hola RocketMQ 4

contenido:2015-12-06 17:03:21 Hola RocketMQ 5

contenido:2015-12-06 17:03:21 Hola RocketMQ 6

contenido:2015-12-06 17:03:21 Hola RocketMQ 7

content:2015-12-06 17:03:21 Hola RocketMQ 8

content:2015-12-06 17 :03:21 Hola RocketMQ 9

Como tú Como puede ver, los mensajes están ordenados.