SoFunction
Updated on 2025-03-02

How to use SpringBoot integrated RabbitMQ (code example)

1. Use of RabbitTemplate

1.【Import Dependencies】

<!-- rabbitMQ -->
<dependency>
    <groupId></groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.1</version>
</dependency>

2.【Add configuration】

rabbitmq:
    host:   #ip address    port: 5672 #port    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # By default, every time a message is taken out, the next one is taken after the consumption is completed.        acknowledge-mode: manual # Set up the consumer side to manually confirm ack        retry:
          enabled: true # Is it supported to try again    publisher-confirm-type: correlated  #Confirm that the message has been sent to the switch (Exchange)    publisher-returns: true  #Confirm that the message has been sent to the queue(Queue)

3. [Point-to-Point Messaging (Quote Mode)]

How to use:

This method is also called the Queue model. The message sender (Producer) sends a message to the queue, and the message recipient (Consumer) gets the message from the queue for processing. Under this model, only one consumer can receive each message, ensuring reliable delivery and sequential processing of messages.

Code example: Producer

    /**
      * The first model: simple model
      * A message producer A queue A consumer
      * @return
      */
    @GetMapping("hello/world")
    public void helloWorld() {
        SysUser sysUser = new SysUser();
        // Send a message        // The first parameter: String routingKey routing rules [Switch and queue binding rules] Queue name        // The second parameter: object message content//        ("hello_world_queue", "hello world rabbit!");
        /// MessagePostProcessor Message wrapper If you need to wrap the message        ("hello_world_queue", "hello world rabbit!", message -&gt; {
            // Set a unique identifier            ().setMessageId(().toString());
            return message;
        });

consumer

import ;
import .log4j.Log4j2;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Component
@Log4j2
public class HelloWorldConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
      * Listen to hello_world_queue queue consumption messages
      * queues The name of the listening queue requires that this queue must be an existing queue
      * queuesToDeclare Listen to the queue If this queue does not exist, RabbitAdmin in rabbitMQ will help build this queue
      */
    @RabbitListener(queuesToDeclare = @Queue("hello_world_queue"))
    public void helloWorldConsumer(String msg, Message message, Channel channel) {
        // Get the unique identifier of the message        String messageId = ().getMessageId();
        // Add message to Redis' set set set Cannot repeat the return value of the method Add successful number        Long count = ().add("hello_world_queue", messageId);
        if (count != null &amp;&amp; count == 1) {
            // No consumption before            ("hello_world_queueQueue consumer received the message,Message content:{}", message);
        }
    }
}

4. [Publish/Subscribe Messaging]

How to use:

In publish/subscribe mode, the message sender publishes the message to the switch (Exchange) rather than sending it directly to the queue. The switch is responsible for routing messages to one or more bound queues. Each subscriber (Subscriber) can choose to subscribe to the message queue of interest to receive messages.

Code example: Producer

/**
  * Work queue
  * One producer One queue Multiple consumers
  */
@GetMapping("work/queue")
public void workQueue() {
    for (int i = 1; i &lt;= 10; i++) {
        ("work_queue", i + "hello work queue!");
    }
}

consumer

import .log4j.Log4j2;
import ;
import ;
import ;
@Component
@Log4j2
public class WorkQueueConsumer {
    /***
      * Consumer 1
      * @param message
      */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer(String message) throws InterruptedException {
        (200);
        ("work_queueQueue Consumer1Message received,Message content:{}", message);
    }
    /***
      * Consumer 2
      * @param message
      */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer2(String message) throws InterruptedException {
        (400);
        ("work_queueQueue Consumer2Message received,Message content:{}", message);
    }
}

5. [Work Queues Mode]

How to use:

The work queue pattern is also called Task Queues, which can be used to implement asynchronous processing of tasks. Multiple workers listen to the same queue at the same time. When new task messages are sent to the queue, idle workers will acquire and process these tasks to ensure that tasks can be processed in parallel without repeated execution.

Code example: Producer

/**
  * Publish Subscription
  * One producer Multiple queues Multiple consumers Involved to switch fanout
  */
@GetMapping("publish/subscribe")
public void publishSubscribe() {
    // First parameter: Switch name No requirement    // The second parameter: Switch and queue binding rules. If it is publish and subscribe mode, then this rule is not written by default. Only switch and queue binding are required, so there is no rule required.    // The third parameter: Message content    ("publish_subscribe_exchange", "",
            "hello publisher subscribe!!");
}

consumer

import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
import ;
@Component
public class PublisherSubscribeConsumer {
    private static final Logger log = ();
    /**
      * Publish subscription model consumer
      *
      * @param message
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"),
            exchange = @Exchange(name = "publish_subscribe_exchange",
                    type = )))
    public void publisherSubscribe(String message) {
        ("Publish subscription model consumer1Message received,Message content:{}", message);
    }
    /**
      * Publish subscription model consumer
      *
      * @param message
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"),
            exchange = @Exchange(name = "publish_subscribe_exchange", type = )))
    public void publisherSubscribe2(String message) {
        ("Publish subscription model consumer2Message received,Message content:{}", message);
    }
}

6. [Routing mode (Routing)]

How to use:

Routing mode allows the sender to route messages to a specific queue based on the message's Routing Key. The sender sends the message to the switch and enables the message to be routed to different queues by the switch by setting different routing keys. Consumers can choose which queues to listen to to receive messages as needed.

Code example: Producer

/**
  * Routing model
  * One producer Multiple queues Multiple consumers Involved to switch direct
  */
@GetMapping("routing")
public void routing() {
    // First parameter: Switch name No requirement    // The second parameter: Switch and queue binding rules String Random    // The third parameter: Message content    ("routing_exchange", "aaa",
            "hello routing!!");
}

consumer

import .log4j.Log4j2;
import ;
import ;
import ;
import ;
import ;
import ;
@Component
@Log4j2
public class RoutingConsumer {
    /**
      * Routing model consumer
      * @param message
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"),
            exchange = @Exchange(name = "routing_exchange", type = ),
            key = { "abc", "error", "info" }))
    public void routingConsumer(String message) {
        ("Routing model consumer1Message received,Message content:{}", message);
    }
    /**
      * Routing model consumer
      * @param message
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"),
            exchange = @Exchange(name = "routing_exchange", type = ),
            key = { "aaa", "ccc", "waadaffas" }))
    public void routingConsumer2(String message) {
        ("Routing model consumer2Message received,Message content:{}", message);
    }
    /**
      * Routing model consumer
      * @param message
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"),
            exchange = @Exchange(name = "routing_exchange", type = ),
            key = { "bbbb", "asdfasd", "asdfasdf" }))
    public void routingConsumer3(String message) {
        ("Routing model consumer3Message received,Message content:{}", message);
    }
}

7. [Topics]

How to use:

Theme mode is an extension of the routing mode that allows the sender to route messages to one or more queues based on multiple attributes of the message, such as a topic. Topic Exchange uses wildcards to match the pattern of routing keys and queue binding keys, thus enabling more flexible message routing and filtering.

Code example: Producer

/**
  * Theme model
  * One producer Multiple queues Multiple consumers Related to the switch topic
  */
@GetMapping("topic")
public void topic() {
    // First parameter: Switch name No requirement    // The second parameter: Switch and queue binding rules Multiple words Spoofed with "."    // The third parameter: Message content    ("topic_exchange", "",
            "hello topic!!");
}

consumer

import .log4j.Log4j2;
import ;
import ;
import ;
import ;
import ;
import ;
@Component
@Log4j2
public class TopicConsumer {
    /**
      * * means any word
      * # means any word or multiple
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"),
            exchange = @Exchange(name = "topic_exchange", type = ),
            key = { "abc.*", "error.*.info", "#.name" }))
    public void topicConsumer(String message) {
        ("xxxxxxxxx1");
    }
    /**
      * * means any word
      * # means any word or multiple
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"),
            exchange = @Exchange(name = "topic_exchange", type = ),
            key = { "abc.*", "username" }))
    public void topicConsumer2(String message) {
        ("xxxxxxxxx2");
    }
    /**
      * * means any word
      * # means any word or multiple
      */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"),
            exchange = @Exchange(name = "topic_exchange", type = ),
            key = { "bwie.*", "error.*.info" }))
    public void topicConsumer3(String message) {
        ("xxxxxxxxx3");
    }
}

This is the end of this article about the use of SpringBoot integration RabbitMQ. For more related SpringBoot integration RabbitMQ content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!