1. RabbitMQ basic concepts
1.1 Message processing flow and components
Producer Send a message. Messages are sent to Exchange first, rather than directly to the queue.
- Exchange After receiving the message, it decides which Queues to send the message according to the Routing Key and Binding rules.
- Queue stores messages, waiting for Consumer to consume.
- Consumer receives and processes messages from the queue.
Producer (Producer)
effect: Responsible for sending messages to the entrance of RabbitMQ, specifying the Exchange and Routing Key of the message.
Key points:
- Producer only needs to know Exchange and Routing Key, not care about queues.
- Producer does not interact directly with the queue, and the routing and storage of messages is determined by Exchange and Binding.
Code Example:
import ; import ; import ; @Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message) { (exchange, routingKey, message); ("Sent message: " + message); } }
Call Example:
("direct-exchange", "key1", "Hello RabbitMQ");
-
direct-exchange
: Target switch. -
key1
: The routing key of the message.
Exchange (switch)
effect: Receive messages from Producer and decide which queues to send messages to based on the configuration of Routing Key and Binding.
Exchange usually requires manual registration as a bean.
- RabbitMQ's Exchange is identified by its name.
- In Spring Boot, you pass
@Bean
Methods When registering Exchange, it is actually binding the name and type of Exchange to the RabbitMQ server.- When sending a message, the RabbitMQ client will find the corresponding Exchange based on the Exchange name and route the message to the queue according to the Routing Key.
type:
- Direct Exchange: Exactly match Routing Key. The Routing Key of the message must be exactly the same as the Routing Key of Binding.
-
Topic Exchange: Supports wildcard matching. For example,
with("key.*")
Can matchkey.1
、key.2
wait. - Fanout Exchange: Ignore the Routing Key, and the message will be broadcast to all bound queues.
- Headers Exchange: Ignore the Routing Key and match according to the message header attribute.
Code Example(Define the switch):
import ; import ; import ; import ; import ; import ; @Configuration public class ExchangeConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("direct-exchange"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout-exchange"); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topic-exchange"); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers-exchange"); } }
Queue (queue)
effect: The message storage container, waiting for the consumer to retrieve the message from it for processing.
Queue also needs to be manually registered as a bean. Spring Boot does not automatically register a queue because the name and attributes of the queue (such as whether it is persisted, whether it is exclusive, etc.) need to be configured according to business needs.
Key points:
- Messages are saved in the queue until consumed.
- The queue can be persistent (the message still exists after restarting RabbitMQ) or non-persistent.
Code Example(Define the queue):
import ; import ; import ; @Configuration public class QueueConfig { @Bean public Queue demoQueue() { return new Queue("demo-queue", true); // Persistence queue } }
Routing Key
effect: Decide how messages are routed from the switch to the queue.
Key points:
- Routing Key is specified by Producer.
- In Exchange of Direct and Topic types, Routing Key determines whether the queue receives messages.
Binding
- effect: Connect the queue to the switch and define routing rules.
-
Key points:
- Binding defines the conditions for a queue to accept messages.
- Combining Routing Key and switch type, we jointly determine how messages are routed.
Code Example(Define the binding):
import ; import ; import ; import ; import ; import ; @Configuration public class BindingConfig { @Bean public Binding binding(Queue demoQueue, DirectExchange directExchange) { return (demoQueue).to(directExchange).with("key1"); } }
with("key1")
The function isSpecify the Routing Key of Binding. Its meaning is:
- When a message is sent to Exchange, Exchange matches the message's Routing Key and the Binding's Routing Key.
- If the match is successful, the message will be routed to the corresponding queue; if the match fails, the message will be discarded or entered into the dead letter queue (if configured).
Consumer
effect: Receive and process messages from the queue.
Key points:
- The consumer is directly associated with the queue.
- Multiple consumers can listen to the same queue to achieve load balancing.
Code Example:
import ; import ; @Service public class Consumer { @RabbitListener(queues = "demo-queue") public void receiveMessage(String message) { ("Received message: " + message); } }
1.2 RabbitMQ message transmission model
Point-to-point model
definition: Messages are sent from the producer to the queue and are received by the consumer from the queue. The message can only be consumed by one consumer.
accomplish:
- Use the default switch (empty string
""
)。 - Send messages directly to the queue.
Code Example:
("", "demo-queue", "Point-to-Point Message");
Publish subscription model
definition: The producer sends a message to a Fanout switch, and the message will be broadcast to all bound queues.
accomplish:
- No Routing Key is required.
- All queues bound to the Fanout switch receive messages.
Code Example:
("fanout-exchange", "", "Fanout Message");
Routing model
definition: The producer sends messages to a switch of type Direct, matching the queue exactly according to the Routing Key.
accomplish:
- Specify the Routing Key when a queue is bound to a switch through Binding.
- The Routing Key of the message must be consistent with the Routing Key of Binding.
Code Example:
("direct-exchange", "key1", "Routing Message");
2. Environmental preparation
2.1 Installation and Configuration RabbitMQ
Download Docker
Visit Docker's official website:Docker: Accelerated Container Application Development。
Download and install Docker Desktop according to your operating system (Windows, macOS, or Linux).
Start Docker
- After the installation is complete, start Docker Desktop.
- Make sure Docker is running (the Docker icon can be seen in the taskbar or menu bar).
Rapid deployment of RabbitMQ with Docker
Docker is the easiest way to deploy RabbitMQ. You can quickly start a RabbitMQ container with the following command:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Parameter description:
-
-d
: Run containers in background mode. -
--name rabbitmq
: Assign a name to the container (rabbitmq
)。 -
-p 5672:5672
: Map the container's port 5672 to the host's port 5672 (RabbitMQ's message communication port). -
-p 15672:15672
: Map the container's port 15672 to the host's port 15672 (the web interface port of the RabbitMQ management plug-in). -
rabbitmq:management
: Use a RabbitMQ image with management plugin.
Verify that RabbitMQ is running
Run the following command to see if the container is running normally:
docker ps
If you seerabbitmq
The container is running, indicating that RabbitMQ has been started successfully.
2.2 Using RabbitMQ Management Plug-in
RabbitMQ provides a web management interface that facilitates you to monitor and manage RabbitMQ.
Access the management interface
- Open the browser and visit
http://localhost:15672
。 - Log in with the default username and password:
- username:
guest
- password:
guest
- username:
Management interface functions
- Overview: Check the overall status of RabbitMQ, such as the number of connections, queues, message rate, etc.
- Connections: View the client currently connected to RabbitMQ.
- Channels: View the currently open channel.
- Exchanges: View and manage Exchange.
- Queues: View and manage Queue.
- Admin: Manage users and permissions.
2.3 User and permission configuration
By default, RabbitMQ has only one userguest
, so is the passwordguest
. For security and permission management, it is recommended to create new users and assign permissions.
1. Create a new user
- In the RabbitMQ management interface:
- Click on the top navigation barAdmin。
- Below the user list, clickAdd a user。
- Enter a username and password, for example:
- username:
admin
- password:
admin123
- username:
- ClickAdd userComplete creation.
2. Assign permissions
- In the user list, find the user you just created (such as
admin
)。 - Click on the right side of the userSet permission。
- On the Permission Settings page:
-
Virtual Host:choose
/
(Default virtual host). -
Configure:Enter
.*
, means that the user is allowed to configure all resources. -
Write:Enter
.*
, means that the user is allowed to write to all resources. -
Read:Enter
.*
, means that the user is allowed to read all resources.
-
Virtual Host:choose
- ClickSet permissionComplete permission allocation.
3. Log in with a new user
- Exit the current user (click on the upper right corner
guest
,chooseLog out)。 - Use new users (such as
admin
)Log in.
2.4 Introducing RabbitMQ dependencies in Spring Boot
existAdd the following dependencies to:
<dependencies> <!-- RabbitMQ rely --> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
spring-boot-starter-amqp
It is a RabbitMQ integration dependency provided by Spring Boot, which contains the following content:
RabbitMQ Client Library:
- Automatically introduce RabbitMQ's Java client library (
amqp-client
), used to communicate with the RabbitMQ server.
Spring AMQP Support:
- Provides Spring's support for AMQP (Advanced Message Queuing Protocol), including
RabbitTemplate
、@RabbitListener
wait.
2.5 Spring Boot Configuration RabbitMQ
In Spring Boot project, you need toor
Configure the connection information of RabbitMQ.
Sample configuration
# RabbitMQ connection configuration=localhost =5672 =admin =admin123
Configuration instructions:
-
: RabbitMQ server address (default
localhost
)。 -
: RabbitMQ message communication port (default
5672
)。 -
:RabbitMQ username.
-
: RabbitMQ Password.
3. Message production and consumption of Spring Boot integrated RabbitMQ
3.1 Message Producer
- In Spring Boot, we use
RabbitTemplate
Come and send a message. It isspring-boot-starter-amqp
Automatically configure it to be a bean, which can be directly passed@Autowired
injection. - If message is not String type processing Spring AMQP (
spring-boot-starter-amqp
) In useRabbitTemplate
When the default message converter (MessageConverter
) Usually serializes objects to JSON or converts string messages to bytes. - If your business data is not
String
, common practices are:- Serialize non-string objects when sent(such as converting to JSON string);
- Or configure custom
MessageConverter
, let Spring help you automatically serialize/deserialize objects.
Typical practice: manually serialize to JSON and send
@Service public class CustomObjectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendCustomObject(String queueName, MyCustomObject obj) { // 1. Serialize custom objects into JSON strings String jsonString = new Gson().toJson(obj); // 2. Send JSON string to RabbitMQ (queueName, jsonString); } }
On the consumer side, you can also deserialize the message (JSON string) toMyCustomObject
。
Configure custom Converter (optional)
Spring AMQP providesJackson2JsonMessageConverter
etc.
@Configuration public class RabbitConfig { @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // Configure RabbitTemplate to use this converter @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); (jsonMessageConverter()); return template; } }
In this way,(queueName, myObject)
Will automaticallymyObject
Convert to JSON to send; the consumer side will automatically resolve to the same Java object.1) Basic message sending
Scene:
Send the message directly to the specified queue, skip the switch's routing, and let RabbitMQ put the message into this queue.
Core code examples:
@Service public class BasicProducer { @Autowired private RabbitTemplate rabbitTemplate; // 1. Automatically injected RabbitTemplate /** * 2. Send basic messages to the specified queue * @param queueName Target queue name * @param message message content */ public void sendToQueue(String queueName, String message) { // 3. Call convertAndSend to directly put the message into the specified queue (queueName, message); ("Message sent to queue: " + queueName + ", content: " + message); } }
Detailed code explanation:
-
@Autowired
private RabbitTemplate rabbitTemplate;`- Spring Boot has automatically configured it for us
RabbitTemplate
, no need to manually define beans. - All methods of interacting with RabbitMQ are used with dependency injection.
- Spring Boot has automatically configured it for us
public void sendToQueue(String queueName, String message)
- Method parameters include:
-
queueName
: The name of the target queue. -
message
: The string type message content to be sent.
-
-
(queueName, message)
-
convertAndSend
The method converts (converts to bytes) and sends the message to the specified queue. - If the queue does not exist, RabbitMQ will attempt to create automatically (provided that the Broker side configuration allows the queue to be created automatically).
-
2) Send to the switch
Scene:
Send messages to a switch (Exchange) and then pass through the switch.Routing Key
Routes the message to the matching queue.
Core code examples:
@Service public class ExchangeProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * Send a message to the specified switch * @param exchangeName Switch name * @param routingKey routing key * @param message message content */ public void sendToExchange(String exchangeName, String routingKey, String message) { // Send the message to the switch specified by exchangeName and use routingKey to route it (exchangeName, routingKey, message); ("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey); } }
Detailed code explanation:
-
exchangeName
- The name of the switch to be sent to, e.g.
"direct-exchange"
、"fanout-exchange"
wait.
- The name of the switch to be sent to, e.g.
-
routingKey
- Routing keys are used to match bindings. For example:
DirectExchange
For example, the routing key during queue binding is required to be the same as the routing key during sending before the message can reach the queue. (exchangeName, routingKey, message)
- Routing keys are used to match bindings. For example:
- Send the message to the switch first, and then deliver the message to the target queue according to the routing key.
3) Send messages with message attributes
Scene:
You need to set attributes such as TTL (expiration time) or priority for the message to control the behavior of the message in the queue.
Core code examples:
@Service public class PropertyProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * Send messages with message attributes (such as TTL, priority) */ public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) { // 1. Create a MessageProperties object to specify the attributes of the message MessageProperties properties = new MessageProperties(); ("10000"); // Expiration time: 10 seconds (unit: milliseconds) (5); // Set priority to 5 // 2. Build Message objects based on message body and attributes Message message = new Message((), properties); // 3. Use the send method (rather than convertAndSend) to directly send Message object (exchange, routingKey, message); ("Message with properties sent: " + messageContent); } }
Detailed code explanation:
-
MessageProperties properties = new MessageProperties();
-
MessageProperties
Used to set various header information for the AMQP protocol layer.
-
-
("10000");
-
setExpiration
Sets the TTL (Time-To-Live) of the message, in milliseconds. If the message has not been consumed after the arrival time, RabbitMQ removes it from the queue and sends it to the dead letter queue (if the dead letter queue is configured). (5);
-
- Set the priority of the message to 5, provided that the queue itself needs to support the priority queue (specified when creating the queue
x-max-priority
)。new Message((), properties)
- Convert plain text messages to
Message
Object, combining message attributes and message body.(exchange, routingKey, message);
- and
convertAndSend
Unlike, it does not attempt to convert messages (such as JSON, strings), but instead sends the complete AMQP directlyMessage
Object.
Message
Constructor
public Message(byte[] body, MessageProperties messageProperties) { = body; = messageProperties; }
body
: byte array of message body.
messageProperties
: Message properties of AMQP, including TTL, priority, headers, etc. ,
If the message body is not a String type
Manually convert to bytes: You can convert custom objects into byte arrays (for example via JSON serialization or Java serialization) and then put them innew Message(...)
The first parameter of .
MyCustomObject obj = new MyCustomObject(); // Suppose you want to use JSONString jsonString = new Gson().toJson(obj); byte[] body = (StandardCharsets.UTF_8); MessageProperties properties = new MessageProperties(); // Set some propertiesMessage message = new Message(body, properties);
- Why doesn't JSON automatically turn? use
new Message(...)
The constructor is a "pure" AMQP layer approach, and does not call Spring's converters, so you have to handle serialization yourself. - use
Message
When constructor, you must process the object tobyte[]
conversion (whether in strings, JSON, or other formats). - If you want Spring AMQP to convert automatically, you usually use
(Object msg)
This advanced API, or configuration customizationMessageConverter
。
3.2 Message Consumer
The core function of the consumer is to listen to messages in a specified queue and process or reject messages based on the configured acknowledgement mode (automatic acknowledgement or manual acknowledgement).
1) Listen to queues and consume messages
Core code example (automatic confirmation mode):
@Service public class Consumer { /** * Use the annotation @RabbitListener to specify the queue to be listened to * Since the default is auto-ack mode, * When the message arrives, RabbitMQ will automatically confirm and delete the message from the queue. */ @RabbitListener(queues = "demo-queue") public void receiveMessage(String message) { // 1. Message contents retrieved from queueName queue ("Received message: " + message); // 2. In auto-ack mode, no manual ack is required // If an exception occurs here, RabbitMQ will not send a message to the consumer again, and the message will be lost. } }
Detailed code explanation (automatic confirmation mode):
-
@RabbitListener(queues = "demo-queue")
- Declare the monitor name
demo-queue
queue. - This method will be automatically called back once a new message arrives in the queue.
- Declare the monitor name
-
public void receiveMessage(String message)
- The default parameter type is a string, and when RabbitMQ receives a message, it will try to convert it to
String
and inject tomessage
middle.
- The default parameter type is a string, and when RabbitMQ receives a message, it will try to convert it to
- Risks of auto-ack
- If a consumer throws an exception when processing a message, the message has been marked as "confirmed" by RabbitMQ and will not be resent or entered into the dead letter queue, resulting in the loss of the message.
2) Confirmation mechanism
Automatic confirmation (auto-ack)
-
Behavior:
- When the consumer gets a message from the queue, RabbitMQ immediately marks the message as acknowledged and deletes it from the queue.
-
question:
- If message processing fails (such as a consumer throws an exception), the message has been acknowledged and deleted from the queue and cannot be reprocessed.
- If the consumer crashes or disconnects, unprocessed messages are lost.
-
Applicable scenarios:
- Scenarios where there is little requirement for the reliability of message processing.
Manual confirmation (manual-ack)
-
Behavior:
- After the consumer has processed the message, it must be called explicitly.
basicAck
Method confirmation message. - If the message processing fails, you can call
basicNack
orbasicReject
Method reject message.
- After the consumer has processed the message, it must be called explicitly.
-
advantage:
- Ensure the reliability of message processing.
- Supports re-enqueue or sending messages to dead letter queues.
-
Applicable scenarios:
- Scenarios with high requirements for message processing reliability.
Core code example:
@Service public class ManualAckConsumer { /** * Configure in: * -mode=manual * Make RabbitMQ use manual confirmation mode */ @RabbitListener(queues = "demo-queue") public void receiveMessage(Message message, Channel channel) throws IOException { try { // 1. Get the message body from the message String body = new String(()); ("Processing message: " + body); // 2. If the business processing is successful, call basicAck to confirm manually (().getDeliveryTag(), false); } catch (Exception e) { ("Message processing failed: " + ()); // 3. If the processing fails, you need to decide whether to re-enter or refuse and enter the dead letter queue // request = true -> Re-join // request = false -> discard or enter the dead letter queue (according to queue configuration) (().getDeliveryTag(), false, false); } } }
Detailed code explanation:
Manual configuration confirmation:
existAdd to
-mode=manual
Indicates that Spring AMQP uses manual-ack.
public void receiveMessage(Message message, Channel channel)
:
Unlike automatic confirmation, not only strings are received, but alsoObjects and
。
Message
: Contains message body and message attributes (headers, etc.).Channel
: Provided to usbasicAck
, basicNack
, basicReject
Wait for the underlying AMQP operation.
Manual confirmation successful:
(deliveryTag, multiple)
:
deliveryTag
:
The only tag of this message, from().getDeliveryTag()
Get it.
multiple = false
: Only confirm the current message.
Here
basicAck(long deliveryTag, boolean multiple)
deliveryTag
Not constructed by youMessage
generated whenRabbitMQ BrokerA automatically allocated by the underlying AMQP protocol when delivering a message to a consumerIncremental sequence number。
long deliveryTag = ().getDeliveryTag();
Manual confirmation failed:
-
(deliveryTag, multiple, requeue)
orbasicReject
:-
requeue = true
: Put the message back to the queue and wait for the next consumption (may cause a dead loop, such as processing fails all the time). -
requeue = false
: Denied message. If the dead letter queue is configured, it will enter the dead letter queue; otherwise, the message will be discarded.
-
3) Dealing with consumption failure
- Processing in automatic confirmation mode
- In automatic acknowledge mode, if message processing fails, RabbitMQ does not resend the message because the message has been acknowledged and deleted from the queue.
- question:
- The message is lost and cannot be reprocessed.
- Processing in manual confirmation mode
- In manual confirmation mode, if message processing fails, it can be handled in the following ways:
- Rejoin the team:
- Call
basicNack
orbasicReject
Method andrequeue
The parameter is set totrue
。 - The message will re-enter the queue and wait for the next consumption.
- Send to the dead letter queue:
- Call
basicNack
orbasicReject
Method andrequeue
The parameter is set tofalse
。 - If the queue is configured with a dead letter queue, the message will be sent to the dead letter queue.
- Retry mechanism(Simple retry provided by Spring AMQP)(Only support manual confirmation mechanism)
The message will be re-entered only if the retry fails, so re-entered first and re-entered later.
# Enable retry=true # Maximum number of retries-attempts=3 # Initial retry interval-interval=1000 # interval multiples=2.0 # Maximum retry interval-interval=10000
- Spring AMQP providesRetry mechanism, you can automatically retry multiple times when the consumer fails to process the message, instead of directly re-entering the message.
Behavior
- When message processing fails, Spring AMQP will be inlocalTry retry (i.e., do not re-enter the message) until the maximum number of retries is reached.
- If the retry is exhausted, the message will be rejected (
basicNack
orbasicReject
) and decide whether to re-enqueue or send to the dead letter queue based on the configuration.
Dead Letter Queue (DLQ)
- When a message is denied or expired, RabbitMQ sends it to our configured dead letter switch (DLX) and routes it to the dead letter queue (DLQ).
- Configuration example:
@Configuration public class RabbitConfig { @Bean public Queue normalQueue() { return ("normal-queue") .withArgument("x-dead-letter-exchange", "dead-letter-exchange") // Specify dead letter switch .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // Specify the dead letter routing key .build(); } @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("dead-letter-exchange"); } @Bean public Queue deadLetterQueue() { return new Queue("dead-letter-queue"); } @Bean public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { return (deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key"); } }
principle:
- Normal queue passes
x-dead-letter-exchange
Specify a dead letter switch, once the message is rejected (requeue=false
) or timeout (TTL expires), RabbitMQ will send the message todead-letter-exchange
。 -
dead-letter-exchange
anddead-letter-queue
Do binding (routing keydead-letter-routing-key
), thereby realizing the storage of dead letter queues.
Re-enqueue vs. Send to dead letter queue
-
Rejoin the team:
(deliveryTag, false, true)
- Suitable for temporary errors, such as database lock conflicts, network jitters, etc., waiting for subsequent reprocessing.
-
Send to the dead letter queue:
(deliveryTag, false, false)
- Applicable to permanent errors, such as message format cannot be parsed, or business logic specifies that it should not be tried again.
This is the end of this article about the RabbitMQ message middleware under the Spring boot framework. For more related contents of Spring boot RabbitMQ message middleware, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!