Skip to content
章节导航

RocketMQ 生产者之 MessageQueueSelector 实战

生产消息使用 MessageQueueSelector 投递到 Topic 下指定的 queue

  • 应用场景:顺序消息,分摊负载
  • 默认 topic 下的 queue 数量是4,可以配置
  • 支持同步,异步发送指定的 MessageQueue
  • 选择的 queue 数量必须小于配置的,否则会出错

同步发送

shell
// 同步发送: 消息,MessageQueueSelector, ID 序列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
     @Override
     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
         int queueNum = Integer.parseInt(arg.toString());
                return mqs.get(queueNum);
     }
  }, 3);
System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());

异步发送

shell
// 异步发送到指定 queue, SendCallback 不能用 lambda 表达式,有两个函数需要被实现
producer.getProducer().send(message, (mqs, msg, arg) -> {
        int queueNum = Integer.parseInt(arg.toString());
        return mqs.get(queueNum);
    }, 3, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
        }
        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
        }
    });