SpringBoot 整合 RocketMQ 消费消息
编写消费者
java
/**
* 支付消费者
*
* @author 王大宸
* @date 2025-10-29 15:09
*/
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(JmsConfig.consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(JmsConfig.TOPIC, "*");
// consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// try {
// Message msg = msgs.get(0);
// System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
// String topic = msg.getTopic();
// String body = new String(msg.getBody(), "utf-8");
// String tags = msg.getTags();
// String keys = msg.getKeys();
// System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// }
// });
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
Message msg = msgs.get(0);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}启动项目
启动项目时, 控制台打印 consumer start ... 启动成功 
测试
启动项目,浏览器访问 http://localhost:8080/api/v1/pay_cb?text=123
控制台打印返回消息
shell
SendResult [sendStatus=SEND_OK, msgId=AC1690010FD036BAF30C97A79EEE0000, offsetMsgId=C0A8C83200002A9F000000000000050A, messageQueue=MessageQueue [topic=uni_pay_test_topic_666, brokerName=broker-a, queueId=0], queueOffset=0, recallHandle=null]
ConsumeMessageThread_pay_consumer_group_1 Receive New Messages: hello uni rocketmq = 123
topic=uni_pay_test_topic_666, tags=taga, keys=null, msg=hello uni rocketmq = 123
因为网络或主机原因, 会有一定的延迟
剑鸣秋朔