【案例实战】RocketMQ 顺序消息消息者消费
MessageListenerConcurrently
MessageListenerOrderly
1、Consumer 会平均分配 queue 的数量
2、并不是简单禁止并发处理,而是为每个Consumer Queue 加个锁,消费每个 消息前,需要获得这个消息所在的 Queue 的锁,这样同个时间,同个 Queue 的消息不被并发消费,但是不同 Queue 的消息可以并发处理
扩展思维:为什么高并发情况下 ConcurrentHashMap 比 HashTable 和 HashMap 更高效且线程安全?
分段锁 Segment
模拟顺序消息消息者消费
顺序消息消息者消费类
java
@Component
public class PayOrderlyConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_orderly_consumer_group";
public PayOrderlyConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//默认是集群方式,可以更改为广播,但是广播方式不支持重试
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");
consumer.registerMessageListener( new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
MessageExt msg = msgs.get(0);
try {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//做业务逻辑操作 TODO
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}shell
ConsumeMessageThread_pay_orderly_consumer_group_1 Receive New Messages: ProductOrder{orderId=222, type='创建订单'}
ConsumeMessageThread_pay_orderly_consumer_group_2 Receive New Messages: ProductOrder{orderId=111, type='创建订单'}
ConsumeMessageThread_pay_orderly_consumer_group_3 Receive New Messages: ProductOrder{orderId=333, type='创建订单'}
ConsumeMessageThread_pay_orderly_consumer_group_3 Receive New Messages: ProductOrder{orderId=333, type='支付订单'}
ConsumeMessageThread_pay_orderly_consumer_group_1 Receive New Messages: ProductOrder{orderId=222, type='支付订单'}
ConsumeMessageThread_pay_orderly_consumer_group_2 Receive New Messages: ProductOrder{orderId=111, type='支付订单'}
ConsumeMessageThread_pay_orderly_consumer_group_3 Receive New Messages: ProductOrder{orderId=333, type='完成订单'}
ConsumeMessageThread_pay_orderly_consumer_group_1 Receive New Messages: ProductOrder{orderId=222, type='完成订单'}
ConsumeMessageThread_pay_orderly_consumer_group_2 Receive New Messages: ProductOrder{orderId=111, type='完成订单'}
剑鸣秋朔