Skip to content
章节导航

SpringBoot 整合 RocketMQ 消费消息

编写消费者

java
/**
 * 支付消费者
 *
 * @author 王大宸
 * @date 2025-10-29 15:09
 */
@Component
public class PayConsumer {

    private DefaultMQPushConsumer consumer;

    public PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(JmsConfig.consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(JmsConfig.TOPIC, "*");

//        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//            try {
//                Message msg = msgs.get(0);
//                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//                String topic = msg.getTopic();
//                String body = new String(msg.getBody(), "utf-8");
//                String tags = msg.getTags();
//                String keys = msg.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {

                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}

启动项目

启动项目时, 控制台打印 consumer start ... 启动成功

测试

启动项目,浏览器访问 http://localhost:8080/api/v1/pay_cb?text=123

控制台打印返回消息

shell
SendResult [sendStatus=SEND_OK, msgId=AC1690010FD036BAF30C97A79EEE0000, offsetMsgId=C0A8C83200002A9F000000000000050A, messageQueue=MessageQueue [topic=uni_pay_test_topic_666, brokerName=broker-a, queueId=0], queueOffset=0, recallHandle=null]
ConsumeMessageThread_pay_consumer_group_1 Receive New Messages: hello uni rocketmq = 123 
topic=uni_pay_test_topic_666, tags=taga, keys=null, msg=hello uni rocketmq = 123

因为网络或主机原因, 会有一定的延迟