Skip to content
章节导航

RocketMQ 分布式事务消息实战

TransactionMQProducer基础介绍和使用

自定义线程池和消息生产者结合

java
/监听器  ,执行本地事务
TransactionListener transactionListener = new TransactionListenerImpl();

//创建事务消息发送者
TransactionMQProducer producer = new TransactionMQProducer("unique_group_name");

//创建自定义线程池
//@param corePoolSize    池中所保存的核心线程数
//@param maximumPoolSize    池中允许的最大线程数
//@param keepActiveTime    非核心线程空闲等待新任务的最长时间
//@param timeunit    keepActiveTime参数的时间单位
//@param blockingqueue    任务队列
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, 
new ArrayBlockingQueue<Runnable>(2000), 
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});

//设置producer基本属性
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

分布式事务消息生产者

shell
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

/**
 * 分布式事务消息
 *
 * @author 王大宸
 * @date 2025-11-05 10:07
 */
@Component
public class TransactionProducer {
    private String producerGroup = "trac_producer_group";

    //事务监听器
    private TransactionListener transactionListener = new TransactionListenerImpl();

    private TransactionMQProducer producer = null;


    //一般自定义线程池的时候,需要给线程加个名称

    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }

    });

    public TransactionProducer() {
        producer = new TransactionMQProducer(producerGroup);
        producer.setTransactionListener(transactionListener);
        producer.setExecutorService(executorService);

        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);

        start();
    }

    public TransactionMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }


}


class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        System.out.println("==== executeLocalTransaction =======");
        String body = new String(msg.getBody());
        String key = msg.getKeys();
        String transactionId = msg.getTransactionId();
        System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body);


        // 执行本地事务begin TODO

        int status = Integer.parseInt(arg.toString());

        // 二次确认消息,然后消费者可以消费
        if (status == 1) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        // 回滚消息,broker 端会删除半消息
        if (status == 2) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // broker 端会进行回查消息,再或者什么都不响应
        if (status == 3) {
            return LocalTransactionState.UNKNOW;
        }
        
       // 执行本地事务end TODO   

        return null;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        System.out.println("==== checkLocalTransaction =======");
        String body = new String(msg.getBody());
        String key = msg.getKeys();
        String transactionId = msg.getTransactionId();
        System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body);

        // 要么 commit 要么 rollback

        // 可以根据 key 去检查本地事务消息是否完成

        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

发送分布式事务消息

java
@Autowired
    private TransactionProducer transactionMQProducer;

    @RequestMapping("/api/v1/pay/cb")
    public Object callback( String tag, String otherParam ) throws Exception {
        Message message = new Message(JmsConfig.TOPIC, tag, tag+"_key",tag.getBytes());
      
        /* 注意:这里发送的是 sendMessageInTransaction 消息 */
        SendResult sendResult =  transactionMQProducer.getProducer().sendMessageInTransaction(message, otherParam);
       
        System.out.printf("发送结果=%s, sendResult=%s \n", sendResult.getSendStatus(), sendResult.toString());

        return new HashMap<>();
    }