SoFunction
Updated on 2025-03-08

How to avoid repeated messages consumption issues

How to avoid repeated messages consumption

In Internet applications, especially when the network is unstable, messages in the message queue RocketMQ may be duplicated, causing repeated consumption.

This repetition can be summarized as follows:

  • The message is repeated when the produce is sent to the Broker

When a message has been successfully sent to the server and completed persistence, a network flash occurs or a producer crash occurs, causing the server Broker to fail to respond to the producer.

If the producer realizes that the message has failed and tries to send the message again, the consumer will receive two messages with the same content and the Message ID.

  • The message is repeated when the Broker delivers a message to the Consumer

In the scenario of message consumption, the message has been delivered to the consumer and the business processing is completed. When the client feedbacks the server, the network breaks down.

In order to ensure that the message is consumed at least once, the server of the message queue RocketMQ will try to deliver the previously processed messages again after the network is restored. The consumer will receive two messages with the same content and the same Message ID in the future.

  • Message duplication during load balancing (including but not limited to network jitter, Broker restart and subscriber application restart)

When the Broker or client of the message queue RocketMQ is restarted, expanded or reduced, the Rebalance rebalancing mechanism will be triggered, and the consumer may receive duplicate messages.

Idepotency solutions

Impotence: The result of a request or multiple requests initiated by the user for the same operation is the same, and there will be no side effects due to multiple clicks.

From the above analysis, we know that in RocketMQ, it is impossible to guarantee that each message will be delivered only once, so we must ensure the idempotence of message consumption in our business. To deal with this problem, each message in RocketMQ has a unique MessageId, and this parameter will not change during multiple delivery, so this MessageId can be used as the key basis for judging idempotence in business.

However, this MessageId cannot be guaranteed to be globally unique and there will be conflicts. Therefore, in some scenarios where idempotence is strictly required, it is better to use a unique identifier in the business. This id can be generated using distributed middleware redis, zookeeper, etc. For example, the order ID. This service identifier can be passed using the Message Key.

At the same time, create a message table and get this message to do the database insert operation. Make a unique primary key or unique constraint for this message, even if there is repeated consumption, it will lead to primary key conflicts, and the message will no longer be processed.

The strategy for message persistence layer to do message uniqueness (this solution has not been proven)

1. Service unique identity verification during the persistence process. Each message has a service unique identity. Before the message is finally persisted, the uniqueness of the message is guaranteed by verification uniqueness. If the same message appears at the persistence location of the message, the system will not process it, and there will be no delivery process during this period to ensure the uniqueness of the message.

2. The business unique identification verification is performed during use. If the same message appears during use, the system will perform corresponding exception processing.

RocketMQ message repetitive consumption scenario

Answer

The deduplication operation is placed directly on the consumer side, and the business logic of the consumer side to process messages remains idempotent. After receiving the message, the consumer obtains the message identifier from the message and writes it to Redis (distributed lock) or database (the identity is inserted into a record as the table's unique index), and will not be processed when the message is received again.

On the broker side, the Queue is locked (synchronized). The Queue listened to by the Consumer has a message that has been delivered but has not received an ack and has not timed out. The lock is not allowed to be acquired. Until all the messages delivered by the Queue are ack or consumption timed out, the new Consumer is allowed to acquire the lock and pull the message.

Think about problems

  • 1. Why not remove heavy weights from producers?
  • 2. Why do consumers do heavy lifting?

1. Network fluctuations cause system A to send message to RocketMQ without receiving a message sending timeout, system A retry causes message to be repeated  

1. RocketMQ supports the function of message query. Before sending the message, go to RocketMQ to check whether the message has been sent. If it exists, it will not be sent. If it does not exist, it will be sent to RocketMQ. In high concurrency scenarios, each message is queried when sent to RocketMQ, which will affect the performance of the interface.

2. Redis distributed lock. After sending a message to RocketMQ successfully, insert a piece of data into redis. If a retry occurs, go to redis to query whether it exists. If it exists, no longer send a message. The redis cluster is down at this time. I checked again to determine whether the message has been sent and could not get the correct result.

The above two methods only guarantee that they are sent only once, but cannot guarantee that they are consumed only once.

2. MQ must ensure the reliability of message delivery, and will be delivered repeatedly for messages that are not acked.

  • Scenario 1: Broker sends the Consumer timeout

The consumer side must ensure the idempotence of consumption, obtain the message identifier from the message and write it to Redis or the database, and will not be processed when the message is received again.

  • Scenario 2: In the load balancing stage, the previous consumer instance listening to the Queue pulls the message not all acknowledges, and the new consumer instance listening to the Queue pulls the message again.

A transition state is added during the process of changing load balancing results. When the transition state is, the Consumer will continue to retain the results of the previous load balancing until all the messages pulled by the original consumer acknowledge the old results.

On the broker side, the Queue is locked (synchronized). The Queue listened to by the Consumer has a message that has been delivered but has not received an ack and has not timed out. The lock is not allowed to be acquired. Until all the messages delivered by the Queue are ack or consumption timed out, the new Consumer is allowed to acquire the lock and pull the message.

Summarize

The above is personal experience. I hope you can give you a reference and I hope you can support me more.