1. Introduction
Today we will talk about how to send messages in batches, mainly how to use themRocketMQTemplateofsyncSendmethod.
1.1. Features
The main differences between batch sending and single sending messages are as follows:
- Network overheadWhen sending a single message, each message needs to establish a separate network connection, send data packets, wait for responses, etc., which has a large network overhead. Batch sending can package multiple messages together to send, reducing the number of network connections established and reducing network overhead
- ThroughputSince batch sending reduces network overhead, more messages can be sent per unit time, improving throughput. In high concurrency and high traffic scenarios, batch transmission can achieve better performance
- Message orderThe order of a single message is sent is ordered, and the later sent are ranked behind the first and the first sent in the queue. For batch sending, the order of messages within a batch is fixed, but the order of messages between different batches is unordered and will be stored in the queue in the order of arrival. If strict message order is required, a single message is more suitable
- Retry the messageIf some messages fail to be sent in a batch sent in batches, the entire batch needs to be resented, and there is no function to resend some of the messages (involving idempotence issues). When a single message fails, just resend the single message
- Programming complexityBatch sending requires constructing MessageBatch or Message list objects, and the programming is slightly more complicated. A single message object is required to construct a single message object
2. Maven dependency
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="/POM/4.0.0" xmlns:xsi="http:///2001/XMLSchema-instance" xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0."> <parent> <artifactId>rocketmq</artifactId> <groupId></groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>06-send-batched-message</artifactId> <properties> <>8</> <>8</> </properties> <dependencies> <dependency> <groupId></groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </project>
The parent project has been described in my previous article, and the general public package is also explained in my previous article, including consumers. Specific reference:SpringBoot integrates RocketMQ to send synchronous messages_java_me ()
III. Application configuration
=8005 # rocketmq address-server=192.168.0.234:9876 # Default producer group=batched_group # Send synchronous message timeout-message-timeout=3000 # Used to set whether the producer tries to switch to the next server after a message has failed. Set to true to enable, try to switch to the next server when sending fails-next-server=true # Used to specify the number of retries when message sending fails-times-when-send-failed=3 # Set the threshold for message compression, to 0 means to disable the compression of the message body-message-body-threshold=0
4. Bulk send
In RocketMQ,RocketMQTemplateofsyncSendMethod, it allows you to send synchronous messages in batches, with main parameters:
- topic: (Ordinary messages are sent to topic=string_message_topic)
- Collection<T>: Message collection
All test classes introduce dependencies
@Autowired private RocketMQTemplate rocketMQTemplate;
4.1. Synchronize messages
@Test public void syncSendBatchStringMessagesWithBuilder() { String topic = "string_message_topic"; String message = "I love Golang language very much:"; List<Message<String>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message<String> rocketMessage = (message + i) // Set message type .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // Add to the list (rocketMessage); } // Send batch messages using syncSend SendResult sendResult = (topic, messageList); ("Send ordinary message results in batches in synchronous batches:{}",sendResult); }
Running results:
[_GROUP_STRING_1] : Message received by string consumer: Send normal messages in synchronous batches: 0
[_GROUP_STRING_2] : Message received by string consumers: Send normal messages in batches in synchronous batches: 1
[_GROUP_STRING_4] : Messages received by string consumers: Send normal messages in batches in synchronous batches: 3
[_GROUP_STRING_5] : Messages received by string consumers: Send normal messages in synchronous batches: 4
[_GROUP_STRING_3] : Messages received by string consumers: Send normal messages in synchronous batches: 2
[_GROUP_STRING_6] : Messages received by string consumers: Send normal messages in synchronous batches: 5
[_GROUP_STRING_7] : Messages received by string consumers: Send normal messages in synchronous batches: 6
[_GROUP_STRING_8] : Messages received by string consumers: Send normal messages in synchronous batches: 7
[_GROUP_STRING_9] : Messages received by string consumers: Send normal messages in synchronous batches: 8
[GROUP_STRING_10] : Messages received by string consumers: Send normal messages in batches in synchronous batches: 9
4.2. Asynchronous Message
@Test public void asyncSendBatchStringMessageWithBuilder() { String topic = "string_message_topic"; String message = "Alian loves Golang language very much:"; List<Message<String>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message<String> rocketMessage = (message + i) // Set message type .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // Add to the list (rocketMessage); } // Send batch messages using asyncSend (topic, messageList, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // Asynchronously send successful callback logic ("Asynchronous batch sending normal messages successfully: " + sendResult); } @Override public void onException(Throwable e) { // Asynchronous sending callback logic ("Asynchronous batch sending normal messages failed: " + ()); } }); }
Running results:
[_GROUP_STRING_1] : Message received by string consumer: Asynchronous batch sending of normal messages: 0
[_GROUP_STRING_8] : Message received by string consumer: Asynchronous batch sending of normal messages: 1
[_GROUP_STRING_3] : Message received by string consumer: Asynchronous batch sending of normal messages: 7
[_GROUP_STRING_6] : Message received by string consumer: Asynchronous batch sending of normal messages: 4
[_GROUP_STRING_9] : Message received by string consumer: Asynchronous batch sending of normal messages: 2
[_GROUP_STRING_5] : Message received by string consumer: Asynchronous batch sending of normal messages: 6
[GROUP_STRING_10] : Message received by string consumer: Asynchronous batch sending of normal messages: 3
[_GROUP_STRING_2] : Message received by string consumer: Asynchronous batch sending of normal messages: 8
[_GROUP_STRING_4] : Message received by string consumer: Asynchronous batch sending of normal messages: 9
[_GROUP_STRING_7] : Message received by string consumer: Asynchronous batch sending of normal messages: 5
4.3. Sequential message
In RocketMQ,RocketMQTemplateofsyncSendOrderlyMethod, it allows you to send synchronous messages in batches, with main parameters:
- topic: (It is different from before, ordinary messages are sent to topic=ordered_string_message_topic)
- Collection<T>: Message collection
- hashKey: Send to the same queue via hashKey
@Test public void syncSendBatchOrderlyStringMessagesWithBuilder() { String topic = "ordered_string_message_topic"; String message = "Simultaneous batch sending sequential messages, I really like Go language:"; List<Message<String>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message<String> rocketMessage = (message + i) // Set message type .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // Add to the list (rocketMessage); } // Use syncSendOrderly to send batch order messages, the consumer thread is set to 1 SendResult sendResult = (topic, messageList, "alian_sync_ordered"); ("Batch sending sequential message sending results:{}",sendResult); }
Running results:
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 0
[GROUP_STRING_10] : Messages received by string consumers: Simultaneous batch sending sequential messages, I like Go language very much: 1
[GROUP_STRING_10] : Messages received by string consumers: Simultaneous batch sending sequential messages, I like Go language very much: 2
[GROUP_STRING_10] : Messages received by string consumers: Simultaneous batch sending sequential messages, I like Go language very much: 3
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 4
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 5
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 6
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 7
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 8
[GROUP_STRING_10] : Messages received by string consumers: Synchronous batch sending sequential messages, I really like Go language: 9
So I said before that the topics for sending messages in batches are different, because
@Slf4j @Service @RocketMQMessageListener(topic = "ordered_string_message_topic", consumerGroup = "ORDERED_GROUP_STRING", consumeMode = ) public class StringMessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { ("Message received by string consumer: {}", message); // Business logic for processing messages } }
Sequential messages should be consumed in sequence, that is, each time a thread consumes, which is equivalent to a single thread, and it will be ordered. The key is to configure: consumeMode =
Of course, we can also set the number of consumer threads to consumeThreadNumber = 1, which means that the single thread consumes, thus ensuring the sequential consumption of messages (referring to single instance):
@RocketMQMessageListener(topic = "ordered_string_message_topic", consumerGroup = "CONCURRENT_GROUP_STRING", consumeThreadNumber = 1) public class StringMessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { ("Message received by string consumer: {}", message); // Business logic for processing messages } }
4.4. About asynchronous batch sending
It is possible that you will write the following asynchronous batch sending sequential message
@Test public void asyncSendBatchOrderlyStringMessageWithBuilder2() { String topic = "ordered_string_message_topic"; String message = "Alian loves Golang language very much:"; List<String> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { // Add to the list (message + i); } // Use asyncSendOrderly to send batch messages (topic, messageList, "alian_async_ordered", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // Asynchronously send successful callback logic ("Async message sends string message successfully: " + sendResult); } @Override public void onException(Throwable e) { // Asynchronous sending callback logic ("Async message failed to send string message: " + ()); } }); }
Actually, this is wrong. The final result is that you receive the messageList here as a message list, as follows:
[GROUP_STRING_18] : Message received by string consumer: ["Alian loves Golang language very much: 0","Alian loves Golang language very much: 1","Alian loves Golang language very much: 2","Alian loves Golang language very much: 3","Alian loves Golang language very much: 4","Alian loves Golang language very much: 5","Alian loves Golang language very much: 6","Alian loves Golang language very much: 7","Alian loves Golang language very much: 8","Alian loves Golang language very much: 9"]
How is RocketMQ processed in a queue for single messages and batch messages?
- For a single message sent, RocketMQ will distribute each message to a consumer thread in the order in the queue. Therefore, even if there are multiple consumer threads, since each message is processed separately, the order of consumption will still be consistent with the order of sending.
- For messages sent in batches, the situation is different. Batch messages are sent as a whole, so in the queue they are treated as a separate entity. When RocketMQ fetches batch messages from the queue, it distributes the entire batch message as a whole to a consumer thread. If there are multiple consumer threads, due to the thread scheduling policy of the operating system, the thread processing batch messages may be scheduled during the process of processing the messages, allowing other threads to process the subsequent messages. This may lead to inconsistent order of consumption and the order of sending.
4.5. Conclusion
I tested this several times and came to the conclusion:
- Send a single message to the same queue and consume the queue using multiple consumer threads. Since the message itself is ordered, the consumption order is also ordered.
- Send messages in batches to the same queue in a single batch, and consume the queue using a single consumer thread. Since the consumer thread is single, the consumption order is also ordered.
- Send messages in batches to the same queue in a single batch. When using multiple consumer threads to consume, the consumption order is not orderly.
V. Others
Since you know that batch messages are as a whole, there will definitely be a limit on the message size. In Apache RocketMQ, the default limit for batch messages is 4MB. This means that you cannot send batch messages with a total size of more than 4MB.
If you want to modify this limit, you need to modify the configuration of RocketMQ. The specific modification methods are as follows:
- Find the RocketMQ configuration file, which is usually located in the conf directory of the RocketMQ installation directory.
- In the file, find the maxMessageSize configuration item. This configuration item determines the maximum size of batch messages.
- Modify the value of maxMessageSize to the size you want. Note that this value is in bytes, so if you want to set the maximum size of batch messages to 8MB, you should set maxMessageSize=8388608.
- Save and close the file.
- Restart RocketMQ's Broker service to make the new configuration take effect.
While you can increase the maximum size of batch messages by modifying the configuration, you should consider this decision with caution. Increasing the maximum size of batch messages may increase the memory usage of the Broker and may affect the sending and receiving performance of messages. Therefore, before modifying this configuration, you should first consider the requirements of your application and the performance of Broker.
Because the priority is@RocketMQMessageListenerSettings in the annotationconsumerGroupandmessageModelParameters.
The above is the detailed content of SpringBoot's implementation of integrating RocketMQ batch sending messages. For more information about SpringBoot RocketMQ batch sending messages, please pay attention to my other related articles!