Skip to content
章节导航

【案例实战】RocketMQ 顺序消息生产者投递

模拟顺序消息生产者投递

实体类

java
public class ProductOrder  implements Serializable {

    //订单id
    private long orderId;

    //操作类型
    private String type;


    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public  ProductOrder(){}

    public  ProductOrder(long orderId ,String type){
        this.orderId = orderId;
        this.type = type;

    }


    /***
    * 模拟订单信息
    *
    * @author 王大宸
    * @date 2025/11/4 10:39
    * @return java.util.List<com.github.itdachen.domain.ProductOrder>
    */
    public static List<ProductOrder> getOrderList(){
        List<ProductOrder> list = new ArrayList<>();
        list.add(new ProductOrder(111L,"创建订单"));
        list.add(new ProductOrder(222L,"创建订单"));
        list.add(new ProductOrder(111L,"支付订单"));
        list.add(new ProductOrder(222L,"支付订单"));
        list.add(new ProductOrder(111L,"完成订单"));
        list.add(new ProductOrder(333L,"创建订单"));
        list.add(new ProductOrder(222L,"完成订单"));
        list.add(new ProductOrder(333L,"支付订单"));
        list.add(new ProductOrder(333L,"完成订单"));
        return list;

    }

    @Override
    public String toString() {
        return "ProductOrder{" +
                "orderId=" + orderId +
                ", type='" + type + '\'' +
                '}';
    }

}

模拟投递

java
@RequestMapping("/api/v2/pay_cb")
public Object callback() throws Exception {
    List<ProductOrder> list  = ProductOrder.getOrderList();
    for(int i=0; i< list.size(); i++){
        ProductOrder order = list.get(i);

        Message message = new Message(JmsConfig.ORDERLY_TOPIC,"", order.getOrderId()+"", order.toString().getBytes());

        SendResult sendResult =  producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Long id = (Long) arg;
                long index = id % mqs.size();
                return mqs.get((int)index);
            }
        },order.getOrderId());

        System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());
    }
    return new HashMap<>();
}

控制台打印结果

shell
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89A420000, offsetMsgId=C0A8C83200002A9F0000000000003E1C, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=3], queueOffset=0, recallHandle=null] ,orderid=111, type=创建订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89A930001, offsetMsgId=C0A8C83200002A9F0000000000003F3B, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=2], queueOffset=0, recallHandle=null] ,orderid=222, type=创建订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89AC80002, offsetMsgId=C0A8C83200002A9F000000000000405A, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=3], queueOffset=1, recallHandle=null] ,orderid=111, type=支付订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89ACD0003, offsetMsgId=C0A8C83200002A9F0000000000004179, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=2], queueOffset=1, recallHandle=null] ,orderid=222, type=支付订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89ADA0004, offsetMsgId=C0A8C83200002A9F0000000000004298, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=3], queueOffset=2, recallHandle=null] ,orderid=111, type=完成订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89AE60005, offsetMsgId=C0A8C83200002A9F00000000000043B7, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=1], queueOffset=0, recallHandle=null] ,orderid=333, type=创建订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89AEB0006, offsetMsgId=C0A8C83200002A9F00000000000044D6, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=2], queueOffset=2, recallHandle=null] ,orderid=222, type=完成订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89AEF0007, offsetMsgId=C0A8C83200002A9F00000000000045F5, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=1], queueOffset=1, recallHandle=null] ,orderid=333, type=支付订单
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=AC1B3001105436BAF30C11B89AF30008, offsetMsgId=C0A8C83200002A9F0000000000004714, messageQueue=MessageQueue [topic=uni_pay_order_topic_orderly, brokerName=broker-a, queueId=1], queueOffset=2, recallHandle=null] ,orderid=333, type=完成订单