RocketMQ 生产者之 MessageQueueSelector 实战
生产消息使用 MessageQueueSelector 投递到 Topic 下指定的 queue
- 应用场景:顺序消息,分摊负载
- 默认 topic 下的 queue 数量是4,可以配置
- 支持同步,异步发送指定的 MessageQueue
- 选择的 queue 数量必须小于配置的,否则会出错
同步发送
shell
// 同步发送: 消息,MessageQueueSelector, ID 序列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int queueNum = Integer.parseInt(arg.toString());
return mqs.get(queueNum);
}
}, 3);
System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());异步发送
shell
// 异步发送到指定 queue, SendCallback 不能用 lambda 表达式,有两个函数需要被实现
producer.getProducer().send(message, (mqs, msg, arg) -> {
int queueNum = Integer.parseInt(arg.toString());
return mqs.get(queueNum);
}, 3, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
剑鸣秋朔