RocketMQ 分布式事务消息实战
TransactionMQProducer基础介绍和使用
自定义线程池和消息生产者结合
java
/监听器 ,执行本地事务
TransactionListener transactionListener = new TransactionListenerImpl();
//创建事务消息发送者
TransactionMQProducer producer = new TransactionMQProducer("unique_group_name");
//创建自定义线程池
//@param corePoolSize 池中所保存的核心线程数
//@param maximumPoolSize 池中允许的最大线程数
//@param keepActiveTime 非核心线程空闲等待新任务的最长时间
//@param timeunit keepActiveTime参数的时间单位
//@param blockingqueue 任务队列
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置producer基本属性
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();分布式事务消息生产者
shell
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
* 分布式事务消息
*
* @author 王大宸
* @date 2025-11-05 10:07
*/
@Component
public class TransactionProducer {
private String producerGroup = "trac_producer_group";
//事务监听器
private TransactionListener transactionListener = new TransactionListenerImpl();
private TransactionMQProducer producer = null;
//一般自定义线程池的时候,需要给线程加个名称
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
public TransactionProducer() {
producer = new TransactionMQProducer(producerGroup);
producer.setTransactionListener(transactionListener);
producer.setExecutorService(executorService);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
public TransactionMQProducer getProducer() {
return this.producer;
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown() {
this.producer.shutdown();
}
}
class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("==== executeLocalTransaction =======");
String body = new String(msg.getBody());
String key = msg.getKeys();
String transactionId = msg.getTransactionId();
System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body);
// 执行本地事务begin TODO
int status = Integer.parseInt(arg.toString());
// 二次确认消息,然后消费者可以消费
if (status == 1) {
return LocalTransactionState.COMMIT_MESSAGE;
}
// 回滚消息,broker 端会删除半消息
if (status == 2) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// broker 端会进行回查消息,再或者什么都不响应
if (status == 3) {
return LocalTransactionState.UNKNOW;
}
// 执行本地事务end TODO
return null;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("==== checkLocalTransaction =======");
String body = new String(msg.getBody());
String key = msg.getKeys();
String transactionId = msg.getTransactionId();
System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body);
// 要么 commit 要么 rollback
// 可以根据 key 去检查本地事务消息是否完成
return LocalTransactionState.COMMIT_MESSAGE;
}
}发送分布式事务消息
java
@Autowired
private TransactionProducer transactionMQProducer;
@RequestMapping("/api/v1/pay/cb")
public Object callback( String tag, String otherParam ) throws Exception {
Message message = new Message(JmsConfig.TOPIC, tag, tag+"_key",tag.getBytes());
/* 注意:这里发送的是 sendMessageInTransaction 消息 */
SendResult sendResult = transactionMQProducer.getProducer().sendMessageInTransaction(message, otherParam);
System.out.printf("发送结果=%s, sendResult=%s \n", sendResult.getSendStatus(), sendResult.toString());
return new HashMap<>();
}
剑鸣秋朔