SoFunction
Updated on 2025-03-08

RocketMQ ensures orderly message sharing

In distributed systems, the orderliness of message queues (MQs) is an important feature, especially in business scenarios where events need to be executed sequentially. Apache RocketMQ is a commonly used open source messaging middleware that provides powerful ordered message processing capabilities. Here we will discuss how RocketMQ ensures the orderliness of messages, including its design principles and related source code implementations.

Simple principle

RocketMQ basic concept of ordered messages

RocketMQ The main method to ensure order is throughSequential messageTo achieve it. In RocketMQ, sequential messages are divided into global order and partition order:

  • Global order: It refers to the order of the message globally, that is, in all messages, they are consumed in the order they are sent.
  • Partition order: It means that messages in the same queue are ordered, while messages between different queues are not guaranteed to be ordered.

RocketMQ uses partition order by default, and by dividing messages under the same topic into the same queue, ensuring that the messages in the queue are orderly.

RocketMQ Ordered Message Implementation Message

Message sending

On the sending side, RocketMQ ensures the order of messages by ensuring that the producer sends messages to the same queue (Queue). The producer can specify the message when sending the messagekeysOr other attributes, RocketMQ calculates which queue the message should be sent to.

Source code example (pseudocode):

public class Producer {
    public void sendMessages(List<Message> messages) {
        for (Message msg : messages) {
            int queueId = (msg);
            (queueId);
            (msg, queueId);
        }
    }

    private int calculateQueueId(Message msg) {
        // Use the hash algorithm to calculate the queue ID based on the message key        return (().hashCode()) % ;
    }
}

Message consumption

On the consumer side, RocketMQ uses a single-thread consumption pattern to ensure the order of messages in the same queue. The consumer will be assigned to a certain queue in a fixed manner, and a single thread pulls and processes messages from the queue to ensure orderly processing of messages.

Source code example (pseudocode):

public class Consumer {
    public void consume() {
        while (true) {
            Message msg = ();
            (msg);
        }
    }
}

Simple case

In Spring Boot, use RocketMQ to ensure the queue order of messages. We need to configure RocketMQ's client and server side to support sequential messages. The following is an example of message order sending and consuming based on RocketMQ and Spring Boot implementations. This scenario assumes that we need to process order status updates in an e-commerce system, and order status updates must be processed in order to avoid status inconsistencies.

Step 1: Add dependencies

First, make sure yoursThe Spring Boot Starter dependency of RocketMQ was added.

<dependencies>
    <dependency>
        <groupId></groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

Step 2: Configure RocketMQ

existorConfigure the basic properties of RocketMQ in:

rocketmq:
  name-server: 127.0.0.1:9876  # Modify to your NameServer address  producer:
    group: order_producer_group
    send-message-timeout: 3000
  consumer:
    group: order_consumer_group
    consume-thread-min: 1
    consume-thread-max: 1

Step 3: Producer Configuration

Create a producer service that sends order status updates as sequential messages.

import ;
import ;
import ;
import ;

@Service
public class OrderStatusProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderStatusUpdate(String orderStatus, String orderId) {
        // Use the order ID as key to ensure that the same order is updated in the same queue        SendResult result = ("order-topic", orderStatus, orderId);
        ("Message sent, result: " + ());
    }
}

Step 4: Consumer Configuration

Create a consumer service that will update the status of the consumption order in order.

import ;
import ;
import ;
import ;

@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order_consumer_group", consumeMode = )
public class OrderStatusConsumer implements RocketMQListener&lt;String&gt; {

    @Override
    public void onMessage(String message) {
        ("Received order update: " + message);
        // Processing order update logic        processOrderUpdate(message);
    }

    private void processOrderUpdate(String status) {
        // Implement order update processing logic        ("Processing order status update: " + status);
    }
}

Step 5: Test the message order

You can send multiple messages by writing a simple test and observe whether the consumer receives them in order.

import ;
import ;
import ;

@Component
public class OrderStatusTestRunner implements CommandLineRunner {

    @Autowired
    private OrderStatusProducer producer;

    @Override
    public void run(String... args) throws Exception {
        ("Order Created", "OrderId123");
        ("Payment Received", "OrderId123");
        ("Shipped", "OrderId123");
        ("Delivered", "OrderId123");
    }
}

Through this setting, RocketMQ and Spring Boot can ensure that different state updates of the same order are processed in the order of sending. This is very important for business logic that requires sequential consistency.

The above is the detailed content of the case sharing of RocketMQ to ensure the orderliness of messages. For more information on the orderliness of RocketMQ messages, please pay attention to my other related articles!