SoFunction
Updated on 2025-03-10

Detailed explanation of RabbitMQ message middleware under Spring boot framework

1. RabbitMQ basic concepts

1.1 Message processing flow and components

Producer Send a message. Messages are sent to Exchange first, rather than directly to the queue.

  • Exchange After receiving the message, it decides which Queues to send the message according to the Routing Key and Binding rules.
  • Queue stores messages, waiting for Consumer to consume.
  • Consumer receives and processes messages from the queue.

Producer (Producer)

effect: Responsible for sending messages to the entrance of RabbitMQ, specifying the Exchange and Routing Key of the message.

Key points

  • Producer only needs to know Exchange and Routing Key, not care about queues.
  • Producer does not interact directly with the queue, and the routing and storage of messages is determined by Exchange and Binding.

Code Example

import ;
import ;
import ;
@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange, String routingKey, String message) {
        (exchange, routingKey, message);
        ("Sent message: " + message);
    }
}

Call Example

("direct-exchange", "key1", "Hello RabbitMQ");
  • direct-exchange: Target switch.
  • key1: The routing key of the message.

Exchange (switch)

effect: Receive messages from Producer and decide which queues to send messages to based on the configuration of Routing Key and Binding.

Exchange usually requires manual registration as a bean.

  • RabbitMQ's Exchange is identified by its name.
  • In Spring Boot, you pass@BeanMethods When registering Exchange, it is actually binding the name and type of Exchange to the RabbitMQ server.
  • When sending a message, the RabbitMQ client will find the corresponding Exchange based on the Exchange name and route the message to the queue according to the Routing Key.

type

  • Direct Exchange: Exactly match Routing Key. The Routing Key of the message must be exactly the same as the Routing Key of Binding.
  • Topic Exchange: Supports wildcard matching. For example,with("key.*")Can matchkey.1key.2wait.
  • Fanout Exchange: Ignore the Routing Key, and the message will be broadcast to all bound queues.
  • Headers Exchange: Ignore the Routing Key and match according to the message header attribute.

Code Example(Define the switch):

import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class ExchangeConfig {
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct-exchange");
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers-exchange");
    }
}

Queue (queue)

effect: The message storage container, waiting for the consumer to retrieve the message from it for processing.

Queue also needs to be manually registered as a bean. Spring Boot does not automatically register a queue because the name and attributes of the queue (such as whether it is persisted, whether it is exclusive, etc.) need to be configured according to business needs.

Key points

  • Messages are saved in the queue until consumed.
  • The queue can be persistent (the message still exists after restarting RabbitMQ) or non-persistent.

Code Example(Define the queue):

import ;
import ;
import ;
@Configuration
public class QueueConfig {
    @Bean
    public Queue demoQueue() {
        return new Queue("demo-queue", true); // Persistence queue    }
}

Routing Key

effect: Decide how messages are routed from the switch to the queue.

Key points

  • Routing Key is specified by Producer.
  • In Exchange of Direct and Topic types, Routing Key determines whether the queue receives messages.

Binding

  • effect: Connect the queue to the switch and define routing rules.
  • Key points
    • Binding defines the conditions for a queue to accept messages.
    • Combining Routing Key and switch type, we jointly determine how messages are routed.

Code Example(Define the binding):

import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class BindingConfig {
    @Bean
    public Binding binding(Queue demoQueue, DirectExchange directExchange) {
        return (demoQueue).to(directExchange).with("key1");
    }
}

with("key1")The function isSpecify the Routing Key of Binding. Its meaning is:

  • When a message is sent to Exchange, Exchange matches the message's Routing Key and the Binding's Routing Key.
  • If the match is successful, the message will be routed to the corresponding queue; if the match fails, the message will be discarded or entered into the dead letter queue (if configured).

Consumer

effect: Receive and process messages from the queue.

Key points

  • The consumer is directly associated with the queue.
  • Multiple consumers can listen to the same queue to achieve load balancing.

Code Example

import ;
import ;
@Service
public class Consumer {
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        ("Received message: " + message);
    }
}

1.2 RabbitMQ message transmission model

Point-to-point model

definition: Messages are sent from the producer to the queue and are received by the consumer from the queue. The message can only be consumed by one consumer.

accomplish

  • Use the default switch (empty string"")。
  • Send messages directly to the queue.

Code Example

("", "demo-queue", "Point-to-Point Message");

Publish subscription model

definition: The producer sends a message to a Fanout switch, and the message will be broadcast to all bound queues.

accomplish

  • No Routing Key is required.
  • All queues bound to the Fanout switch receive messages.

Code Example

("fanout-exchange", "", "Fanout Message");

Routing model

definition: The producer sends messages to a switch of type Direct, matching the queue exactly according to the Routing Key.

accomplish

  • Specify the Routing Key when a queue is bound to a switch through Binding.
  • The Routing Key of the message must be consistent with the Routing Key of Binding.

Code Example

("direct-exchange", "key1", "Routing Message");

2. Environmental preparation

2.1 Installation and Configuration RabbitMQ

Download Docker

  • Visit Docker's official website:Docker: Accelerated Container Application Development

  • Download and install Docker Desktop according to your operating system (Windows, macOS, or Linux).

Start Docker

  • After the installation is complete, start Docker Desktop.
  • Make sure Docker is running (the Docker icon can be seen in the taskbar or menu bar).

Rapid deployment of RabbitMQ with Docker

Docker is the easiest way to deploy RabbitMQ. You can quickly start a RabbitMQ container with the following command:

  • docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Parameter description

  • -d: Run containers in background mode.
  • --name rabbitmq: Assign a name to the container (rabbitmq)。
  • -p 5672:5672: Map the container's port 5672 to the host's port 5672 (RabbitMQ's message communication port).
  • -p 15672:15672: Map the container's port 15672 to the host's port 15672 (the web interface port of the RabbitMQ management plug-in).
  • rabbitmq:management: Use a RabbitMQ image with management plugin.

Verify that RabbitMQ is running

Run the following command to see if the container is running normally:

docker ps

If you seerabbitmqThe container is running, indicating that RabbitMQ has been started successfully.

2.2 Using RabbitMQ Management Plug-in

RabbitMQ provides a web management interface that facilitates you to monitor and manage RabbitMQ.

Access the management interface

  • Open the browser and visithttp://localhost:15672
  • Log in with the default username and password:
    • username:guest
    • password:guest

Management interface functions

  • Overview: Check the overall status of RabbitMQ, such as the number of connections, queues, message rate, etc.
  • Connections: View the client currently connected to RabbitMQ.
  • Channels: View the currently open channel.
  • Exchanges: View and manage Exchange.
  • Queues: View and manage Queue.
  • Admin: Manage users and permissions.

2.3 User and permission configuration

By default, RabbitMQ has only one userguest, so is the passwordguest. For security and permission management, it is recommended to create new users and assign permissions.

1. Create a new user

  • In the RabbitMQ management interface:
  • Click on the top navigation barAdmin
  • Below the user list, clickAdd a user
  • Enter a username and password, for example:
    • username:admin
    • password:admin123
  • ClickAdd userComplete creation.

2. Assign permissions

  • In the user list, find the user you just created (such asadmin)。
  • Click on the right side of the userSet permission
  • On the Permission Settings page:
    • Virtual Host:choose/(Default virtual host).
    • Configure:Enter.*, means that the user is allowed to configure all resources.
    • Write:Enter.*, means that the user is allowed to write to all resources.
    • Read:Enter.*, means that the user is allowed to read all resources.
  • ClickSet permissionComplete permission allocation.

3. Log in with a new user

  • Exit the current user (click on the upper right cornerguest,chooseLog out)。
  • Use new users (such asadmin)Log in.

2.4  Introducing RabbitMQ dependencies in Spring Boot

existAdd the following dependencies to:

<dependencies>
    <!-- RabbitMQ rely -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

spring-boot-starter-amqpIt is a RabbitMQ integration dependency provided by Spring Boot, which contains the following content:

RabbitMQ Client Library

  • Automatically introduce RabbitMQ's Java client library (amqp-client), used to communicate with the RabbitMQ server.

Spring AMQP Support

  • Provides Spring's support for AMQP (Advanced Message Queuing Protocol), includingRabbitTemplate@RabbitListenerwait.

2.5 Spring Boot Configuration RabbitMQ

In Spring Boot project, you need toorConfigure the connection information of RabbitMQ.

Sample configuration

  • # RabbitMQ connection configuration=localhost
    =5672
    =admin
    =admin123

Configuration instructions

  • : RabbitMQ server address (defaultlocalhost)。
  • : RabbitMQ message communication port (default5672)。
  • :RabbitMQ username.
  • : RabbitMQ Password.

3. Message production and consumption of Spring Boot integrated RabbitMQ

3.1 Message Producer

  • In Spring Boot, we useRabbitTemplateCome and send a message. It isspring-boot-starter-amqpAutomatically configure it to be a bean, which can be directly passed@Autowiredinjection.
  • If message is not String type processing Spring AMQP (spring-boot-starter-amqp) In useRabbitTemplateWhen the default message converter (MessageConverter) Usually serializes objects to JSON or converts string messages to bytes.
  • If your business data is notString, common practices are:
    • Serialize non-string objects when sent(such as converting to JSON string);
    • Or configure customMessageConverter, let Spring help you automatically serialize/deserialize objects.

Typical practice: manually serialize to JSON and send

@Service
public class CustomObjectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendCustomObject(String queueName, MyCustomObject obj) {
        // 1. Serialize custom objects into JSON strings        String jsonString = new Gson().toJson(obj);
        // 2. Send JSON string to RabbitMQ        (queueName, jsonString);
    }
}

On the consumer side, you can also deserialize the message (JSON string) toMyCustomObject

Configure custom Converter (optional)

Spring AMQP providesJackson2JsonMessageConverteretc.

@Configuration
public class RabbitConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // Configure RabbitTemplate to use this converter    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        (jsonMessageConverter());
        return template;
    }
}

In this way,(queueName, myObject)Will automaticallymyObjectConvert to JSON to send; the consumer side will automatically resolve to the same Java object.1) Basic message sending

Scene
Send the message directly to the specified queue, skip the switch's routing, and let RabbitMQ put the message into this queue.

Core code examples

@Service
public class BasicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;  // 1. Automatically injected RabbitTemplate    /**
      * 2. Send basic messages to the specified queue
      * @param queueName Target queue name
      * @param message message content
      */
    public void sendToQueue(String queueName, String message) {
        // 3. Call convertAndSend to directly put the message into the specified queue        (queueName, message);
        ("Message sent to queue: " + queueName + ", content: " + message);
    }
}

Detailed code explanation

  • @Autowired private RabbitTemplate rabbitTemplate;`
    • Spring Boot has automatically configured it for usRabbitTemplate, no need to manually define beans.
    • All methods of interacting with RabbitMQ are used with dependency injection.
  • public void sendToQueue(String queueName, String message)
  • Method parameters include:
    • queueName: The name of the target queue.
    • message: The string type message content to be sent.
  • (queueName, message)
    • convertAndSendThe method converts (converts to bytes) and sends the message to the specified queue.
    • If the queue does not exist, RabbitMQ will attempt to create automatically (provided that the Broker side configuration allows the queue to be created automatically).

2) Send to the switch

Scene
Send messages to a switch (Exchange) and then pass through the switch.Routing KeyRoutes the message to the matching queue.

Core code examples

@Service
public class ExchangeProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
      * Send a message to the specified switch
      * @param exchangeName Switch name
      * @param routingKey routing key
      * @param message message content
      */
    public void sendToExchange(String exchangeName, String routingKey, String message) {
        // Send the message to the switch specified by exchangeName and use routingKey to route it        (exchangeName, routingKey, message);
        ("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
    }
}

Detailed code explanation

  • exchangeName
    • The name of the switch to be sent to, e.g."direct-exchange""fanout-exchange"wait.
  • routingKey
    • Routing keys are used to match bindings. For example:DirectExchangeFor example, the routing key during queue binding is required to be the same as the routing key during sending before the message can reach the queue.
    • (exchangeName, routingKey, message)
  • Send the message to the switch first, and then deliver the message to the target queue according to the routing key.

3) Send messages with message attributes

Scene
You need to set attributes such as TTL (expiration time) or priority for the message to control the behavior of the message in the queue.

Core code examples

@Service
public class PropertyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
      * Send messages with message attributes (such as TTL, priority)
      */
    public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
        // 1. Create a MessageProperties object to specify the attributes of the message        MessageProperties properties = new MessageProperties();
        ("10000"); // Expiration time: 10 seconds (unit: milliseconds)        (5);        // Set priority to 5        // 2. Build Message objects based on message body and attributes        Message message = new Message((), properties);
        // 3. Use the send method (rather than convertAndSend) to directly send Message object        (exchange, routingKey, message);
        ("Message with properties sent: " + messageContent);
    }
}

Detailed code explanation

  • MessageProperties properties = new MessageProperties();
    • MessagePropertiesUsed to set various header information for the AMQP protocol layer.
  • ("10000");
    • setExpirationSets the TTL (Time-To-Live) of the message, in milliseconds. If the message has not been consumed after the arrival time, RabbitMQ removes it from the queue and sends it to the dead letter queue (if the dead letter queue is configured).
    • (5);
  • Set the priority of the message to 5, provided that the queue itself needs to support the priority queue (specified when creating the queuex-max-priority)。
    • new Message((), properties)
  • Convert plain text messages toMessageObject, combining message attributes and message body.
    • (exchange, routingKey, message);
  • andconvertAndSendUnlike, it does not attempt to convert messages (such as JSON, strings), but instead sends the complete AMQP directlyMessageObject.

MessageConstructor

public Message(byte[] body, MessageProperties messageProperties) {
     = body;
     = messageProperties;
}
body: byte array of message body.
messageProperties: Message properties of AMQP, including TTL, priority, headers, etc. ,

If the message body is not a String type

Manually convert to bytes: You can convert custom objects into byte arrays (for example via JSON serialization or Java serialization) and then put them innew Message(...)The first parameter of .
MyCustomObject obj = new MyCustomObject();
// Suppose you want to use JSONString jsonString = new Gson().toJson(obj);
byte[] body = (StandardCharsets.UTF_8);
MessageProperties properties = new MessageProperties();
// Set some propertiesMessage message = new Message(body, properties);
  • Why doesn't JSON automatically turn? usenew Message(...)The constructor is a "pure" AMQP layer approach, and does not call Spring's converters, so you have to handle serialization yourself.
  • useMessageWhen constructor, you must process the object tobyte[]conversion (whether in strings, JSON, or other formats).
  • If you want Spring AMQP to convert automatically, you usually use(Object msg)This advanced API, or configuration customizationMessageConverter

3.2 Message Consumer

The core function of the consumer is to listen to messages in a specified queue and process or reject messages based on the configured acknowledgement mode (automatic acknowledgement or manual acknowledgement).

1) Listen to queues and consume messages

Core code example (automatic confirmation mode)

@Service
public class Consumer {
    /**
      * Use the annotation @RabbitListener to specify the queue to be listened to
      * Since the default is auto-ack mode,
      * When the message arrives, RabbitMQ will automatically confirm and delete the message from the queue.
      */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        // 1. Message contents retrieved from queueName queue        ("Received message: " + message);
        // 2. In auto-ack mode, no manual ack is required        // If an exception occurs here, RabbitMQ will not send a message to the consumer again, and the message will be lost.    }
}

Detailed code explanation (automatic confirmation mode)

  • @RabbitListener(queues = "demo-queue")
    • Declare the monitor namedemo-queuequeue.
    • This method will be automatically called back once a new message arrives in the queue.
  • public void receiveMessage(String message)
    • The default parameter type is a string, and when RabbitMQ receives a message, it will try to convert it toStringand inject tomessagemiddle.
  • Risks of auto-ack
    • If a consumer throws an exception when processing a message, the message has been marked as "confirmed" by RabbitMQ and will not be resent or entered into the dead letter queue, resulting in the loss of the message.

2) Confirmation mechanism

Automatic confirmation (auto-ack)

  • Behavior
    • When the consumer gets a message from the queue, RabbitMQ immediately marks the message as acknowledged and deletes it from the queue.
  • question
    • If message processing fails (such as a consumer throws an exception), the message has been acknowledged and deleted from the queue and cannot be reprocessed.
    • If the consumer crashes or disconnects, unprocessed messages are lost.
  • Applicable scenarios
    • Scenarios where there is little requirement for the reliability of message processing.

Manual confirmation (manual-ack)

  • Behavior
    • After the consumer has processed the message, it must be called explicitly.basicAckMethod confirmation message.
    • If the message processing fails, you can callbasicNackorbasicRejectMethod reject message.
  • advantage
    • Ensure the reliability of message processing.
    • Supports re-enqueue or sending messages to dead letter queues.
  • Applicable scenarios
    • Scenarios with high requirements for message processing reliability.

Core code example:

@Service
public class ManualAckConsumer {
    /**
      * Configure in:
      * -mode=manual
      * Make RabbitMQ use manual confirmation mode
      */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            // 1. Get the message body from the message            String body = new String(());
            ("Processing message: " + body);
            // 2. If the business processing is successful, call basicAck to confirm manually            (().getDeliveryTag(), false);
        } catch (Exception e) {
            ("Message processing failed: " + ());
            // 3. If the processing fails, you need to decide whether to re-enter or refuse and enter the dead letter queue            // request = true -> Re-join            // request = false -> discard or enter the dead letter queue (according to queue configuration)            (().getDeliveryTag(), false, false);
        }
    }
}

Detailed code explanation

Manual configuration confirmation

existAdd to

-mode=manual

Indicates that Spring AMQP uses manual-ack.

public void receiveMessage(Message message, Channel channel)

Unlike automatic confirmation, not only strings are received, but alsoObjects andMessage: Contains message body and message attributes (headers, etc.).Channel: Provided to usbasicAck, basicNack, basicRejectWait for the underlying AMQP operation.

Manual confirmation successful

(deliveryTag, multiple)

deliveryTag

The only tag of this message, from().getDeliveryTag()Get it.

multiple = false: Only confirm the current message.

basicAck(long deliveryTag, boolean multiple)

HeredeliveryTagNot constructed by youMessagegenerated whenRabbitMQ BrokerA automatically allocated by the underlying AMQP protocol when delivering a message to a consumerIncremental sequence number
long deliveryTag = ().getDeliveryTag();

Manual confirmation failed

  • (deliveryTag, multiple, requeue)orbasicReject
    • requeue = true: Put the message back to the queue and wait for the next consumption (may cause a dead loop, such as processing fails all the time).
    • requeue = false: Denied message. If the dead letter queue is configured, it will enter the dead letter queue; otherwise, the message will be discarded.

3) Dealing with consumption failure

  • Processing in automatic confirmation mode
  • In automatic acknowledge mode, if message processing fails, RabbitMQ does not resend the message because the message has been acknowledged and deleted from the queue.
  • question
  • The message is lost and cannot be reprocessed.
  • Processing in manual confirmation mode
  • In manual confirmation mode, if message processing fails, it can be handled in the following ways:
  • Rejoin the team
  • CallbasicNackorbasicRejectMethod andrequeueThe parameter is set totrue
  • The message will re-enter the queue and wait for the next consumption.
  • Send to the dead letter queue
  • CallbasicNackorbasicRejectMethod andrequeueThe parameter is set tofalse
  • If the queue is configured with a dead letter queue, the message will be sent to the dead letter queue.
  • Retry mechanism(Simple retry provided by Spring AMQP)(Only support manual confirmation mechanism)

The message will be re-entered only if the retry fails, so re-entered first and re-entered later.

# Enable retry=true
# Maximum number of retries-attempts=3
# Initial retry interval-interval=1000
# interval multiples=2.0
# Maximum retry interval-interval=10000
  • Spring AMQP providesRetry mechanism, you can automatically retry multiple times when the consumer fails to process the message, instead of directly re-entering the message.

Behavior

  • When message processing fails, Spring AMQP will be inlocalTry retry (i.e., do not re-enter the message) until the maximum number of retries is reached.
  • If the retry is exhausted, the message will be rejected (basicNackorbasicReject) and decide whether to re-enqueue or send to the dead letter queue based on the configuration.

Dead Letter Queue (DLQ)

  • When a message is denied or expired, RabbitMQ sends it to our configured dead letter switch (DLX) and routes it to the dead letter queue (DLQ).
  • Configuration example
@Configuration
public class RabbitConfig {
    @Bean
    public Queue normalQueue() {
        return ("normal-queue")
                .withArgument("x-dead-letter-exchange", "dead-letter-exchange")  // Specify dead letter switch                .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // Specify the dead letter routing key                .build();
    }
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead-letter-exchange");
    }
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead-letter-queue");
    }
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
        return (deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
    }
}

principle

  • Normal queue passesx-dead-letter-exchangeSpecify a dead letter switch, once the message is rejected (requeue=false) or timeout (TTL expires), RabbitMQ will send the message todead-letter-exchange
  • dead-letter-exchangeanddead-letter-queueDo binding (routing keydead-letter-routing-key), thereby realizing the storage of dead letter queues.

Re-enqueue vs. Send to dead letter queue

  • Rejoin the team(deliveryTag, false, true)
  • Suitable for temporary errors, such as database lock conflicts, network jitters, etc., waiting for subsequent reprocessing.
  • Send to the dead letter queue(deliveryTag, false, false)
  • Applicable to permanent errors, such as message format cannot be parsed, or business logic specifies that it should not be tried again.

This is the end of this article about the RabbitMQ message middleware under the Spring boot framework. For more related contents of Spring boot RabbitMQ message middleware, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!