Skip to content
章节导航

SpringBoot 整合 RocketMQ 发送消息

SpringBoot 整合 RocketMQ 项目结构目录如下(包括 SpringBoot 整合 RocketMQ 消费消息)

依赖版本
JDK21
IDEA2025.2.3
Spring Boot3.4.2
RocketMQ5.3.3

pom.xml 依赖

shell
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.github.itdachen</groupId>
    <artifactId>uni-rocket-mq</artifactId>
    <version>1.0</version>

    <properties>
        <java.version>21</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <spring.boot.version>3.4.2</spring.boot.version>
    </properties>

    <dependencyManagement>
      <dependencies>
          <!-- spring boot 依赖 -->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-dependencies</artifactId>
              <version>${spring.boot.version}</version>
              <type>pom</type>
              <scope>import</scope>
          </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.3.3</version>
        </dependency>

    </dependencies>
    </dependencyManagement>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                    <!-- 启用 -parameters 编译器标志 -->
                    <compilerArgument>-parameters</compilerArgument>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

基础配置常量

java
/**
 * JmsConfig
 *
 * @author 王大宸
 * @date 2025-10-29 15:10
 */
public class JmsConfig {

    /* RocketMQ 地址 */
    public static final String NAME_SERVER = "192.168.200.50:9876";

    public static final String TOPIC = "uni_pay_test_topic_666";

    public static final String consumerGroup = "pay_consumer_group";

    public static final  String producerGroup = "pay_group";

}

消息生产者

java
/**
 * 支付消息生产者
 *
 * @author 王大宸
 * @date 2025-10-29 14:47
 */
@Component
public class PayProducer {

    private DefaultMQProducer producer;


    public PayProducer() {
        producer = new DefaultMQProducer(JmsConfig.producerGroup);

        /* 指定 nameServerAddr, 多个节点之间使用 ; 隔开 */
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");

        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        
        start();
    }


    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /***
     * 对象在使用之前必须调用一次,对象只能初始化一次
     *
     * @author 王大宸
     * @date 2025/10/29 14:57
     * @return void
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    /***
     * 一般在应用上下文,使用上下文监听器,进行关闭
     *
     * @author 王大宸
     * @date 2025/10/29 14:57
     * @return void
     */
    public void shutdown() throws MQClientException {
        this.producer.shutdown();
    }

}

测试接口

java
/**
 * 模拟支付
 *
 * @author 王大宸
 * @date 2025-10-29 15:00
 */
@RestController
public class PayController {

    @Autowired
    private PayProducer producer;

    private static final String topic = "uni_pay_test_topic2";

    @GetMapping("/api/v1/pay_cb")
    public Object callback(@RequestParam String text) throws MQBrokerException, RemotingException,
            InterruptedException, MQClientException {

        Message message = new Message(JmsConfig.TOPIC,
                "taga",
                ("hello uni rocketmq = " + text).getBytes()
        );

        SendResult sendResult = producer.getProducer().send(message);
        /* 打印发送之后返回消息 */
        System.out.println(sendResult);

        return "SEND_OK";
    }


}

测试

启动项目,浏览器访问 http://localhost:8080/api/v1/pay_cb?text=123

控制台打印返回消息

shell
SendResult [sendStatus=SEND_OK, msgId=AC169001235036BAF30C9797495C0000, offsetMsgId=C0A8C83200002A9F0000000000000102, messageQueue=MessageQueue [topic=uni_pay_test_topic2, brokerName=broker-a, queueId=3], queueOffset=0, recallHandle=null]

常见问题