Skip to content
章节导航

【案例实战】RocketMQ 顺序消息消息者消费

  • MessageListenerConcurrently

  • MessageListenerOrderly
    1、Consumer 会平均分配 queue 的数量
    2、并不是简单禁止并发处理,而是为每个Consumer Queue 加个锁,消费每个 消息前,需要获得这个消息所在的 Queue 的锁,这样同个时间,同个 Queue 的消息不被并发消费,但是不同 Queue 的消息可以并发处理

扩展思维:为什么高并发情况下 ConcurrentHashMap 比 HashTable 和 HashMap 更高效且线程安全?
分段锁 Segment

模拟顺序消息消息者消费

顺序消息消息者消费类

java
@Component
public class PayOrderlyConsumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_orderly_consumer_group";

    public PayOrderlyConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //默认是集群方式,可以更改为广播,但是广播方式不支持重试
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");


        consumer.registerMessageListener( new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt msg = msgs.get(0);
                try {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    //做业务逻辑操作 TODO

                    return ConsumeOrderlyStatus.SUCCESS;

                } catch (Exception e) {

                    e.printStackTrace();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

        });

        consumer.start();
        System.out.println("consumer start ...");
    }


}
shell
ConsumeMessageThread_pay_orderly_consumer_group_1 Receive New Messages: ProductOrder{orderId=222, type='创建订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_2 Receive New Messages: ProductOrder{orderId=111, type='创建订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_3 Receive New Messages: ProductOrder{orderId=333, type='创建订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_3 Receive New Messages: ProductOrder{orderId=333, type='支付订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_1 Receive New Messages: ProductOrder{orderId=222, type='支付订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_2 Receive New Messages: ProductOrder{orderId=111, type='支付订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_3 Receive New Messages: ProductOrder{orderId=333, type='完成订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_1 Receive New Messages: ProductOrder{orderId=222, type='完成订单'} 
ConsumeMessageThread_pay_orderly_consumer_group_2 Receive New Messages: ProductOrder{orderId=111, type='完成订单'}