SoFunction
Updated on 2025-03-02

Example code for using RabbitMQ to produce messages and consume messages in Spring Boot

Introducing RabbitMQ dependencies

<!-- springbootintegratedrabbitMQ -->
<dependency>
    <groupId></groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Add RabbitMQ configuration

  #rabbitmq configurationspring:
  rabbitmq:
    host: ipaddress
    port: 5672
    username: account
    password: password
    virtual-host: /

Configure RabbitMQ switches and queues

package ;
import ;
import ;
import ;
import ;
import ;
import ;
//rabbitMQ binding switch/queue@Configuration
public class RabbitMQConfig {
    //========================================================RabbitMQ Queue========================================================//
    //Create a fanout mode switch    @Bean
    public FanoutExchange fanoutExchangeProcess() {
        return new FanoutExchange("process-data-change-exchange", true, false);
    }
    //Create a queue    @Bean
    public Queue processDataChangeQueue() {
        return new Queue("process-data-change-queue", true);
    }
    //Bind the queue to the switch    @Bean
    public Binding chatBindExchange() {
        return (processDataChangeQueue()).to(fanoutExchangeProcess());
    }
}

Write interfaces and simulate production messages

@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/produceMessage")
@ApiOperation(value = "Production Message", tags = "Test interface")
public void updateTokenTime() {
	//Production message will be sent to the switch, and the switch will be sent to the queue. When the queue is listened to, it will be consumed and the business logic will be executed    ("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");
}

Write message monitoring class and simulate consumption messages

package ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
@Slf4j
@Component
public class RabbitMQDataSyncListenerProcess {
    // Listen to the process-data-change-queue queue -> Consumption    @RabbitListener(queues = "process-data-change-queue")
    public void orderDead(@Payload String productIdAndOrderId) {
        ("Current time:{},Received queue information:{}", new Date().toString(), productIdAndOrderId);
        //Execute your business logic        for (int i = 0; i < 5; i++) {
            ("Number of loops: " + (i + 1));
            try {
                // Pause 2000 milliseconds (2 seconds)                (2000);
            } catch (InterruptedException e) {
                // Handle exceptions                ("Thread interrupted: " + ());
            }
        }
    }
}

The role of switches in RabbitMQ

Exchange in RabbitMQ is the core component of message routing. It is responsible for receiving messages sent by the producer and passing them to one or more queues (Queues) according to specific routing rules.
Main functions and types of switches

1. Message routing:

  • The switch determines which queues the message should be sent to, based on the Binding and Routing Key.

2. Type:

  • Direct Exchange: Messages are sent directly to queues that match the routing keys exactly.
  • Topic Exchange: Messages match one or more queues according to routing key patterns, and support wildcard characters.
  • Fanout Exchange: Broadcast messages to all bound queues, regardless of routing keys.
  • Headers Exchange: Route through the headers of the message instead of using the routing key.

Workflow

  • The producer sends a message to the switch.
  • The switch routes messages to the corresponding queue based on the configured routing rules and the binding relationship between the queue.
  • The consumer gets messages from the queue for processing.

Production message statement in my code:

convertAndSend (switch, routing key is also the queue, the parameters you want to pass)

In Fanout Exchange mode, no routing key is required, because it is useless if specified.

("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");

In Fanout Exchange mode, it should be changed to:

("process-data-change-exchange", "", "hhhhhhhhhhhhhh");

In a fanout switch, the routing key can be set to the empty string "" because the fanout switch sends messages to all bound queues without considering the specific value of the routing key.

  • In fan-out switches, the routing key is ignored.
  • Messages are broadcast to all queues bound to the switch.

Four switch modes

1. Direct Exchange

Direct connection switch: Send to a queue that matches the routing key.

// Create a direct switch@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct-exchange", true, false);
}
// Create a queue@Bean
public Queue directQueue() {
    return new Queue("direct-queue", true);
}
// Bind the queue to the direct connection switch and specify the routing key@Bean
public Binding directBinding() {
    return (directQueue()).to(directExchange()).with("direct-routing-key");
}

Production news:

Direct-connected switch production message: Requires specifying routing keys.

// Send a message to the direct switch("direct-exchange", "direct-routing-key", "Your message here");

2. Topic Exchange

Topic switch: Supports fuzzy matching routing keys.

// Create a theme switch@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topic-exchange", true, false);
}
// Create a queue@Bean
public Queue topicQueue() {
    return new Queue("topic-queue", true);
}
// Bind the queue to the topic switch and specify the routing keys@Bean
public Binding topicBinding() {
    return (topicQueue()).to(topicExchange()).with("topic.#");
}

Production news:

Topic switch production message: You need to specify a routing key that matches the theme mode.

// Send a message to the topic switch("topic-exchange", "", "Your message here");

3. Fanout Exchange

Fanout switch: Broadcast messages to all bound queues.

// Create fan-out switch@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout-exchange", true, false);
}
// Create a queue@Bean
public Queue fanoutQueue1() {
    return new Queue("fanout-queue-1", true);
}
@Bean
public Queue fanoutQueue2() {
    return new Queue("fanout-queue-2", true);
}
// Bind the queue to the fanout switch@Bean
public Binding fanoutBinding1() {
    return (fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
    return (fanoutQueue2()).to(fanoutExchange());
}

Production news:

Fanout switch production message: no routing key is required, just use an empty string.

// Send a message to the fanout switch("fanout-exchange", "", "Your message here");

4. Headers Exchange

Header switch: Route according to matching attributes in the message header.

// Create a header switch@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers-exchange", true, false);
}
// Create a queue@Bean
public Queue headersQueue() {
    return new Queue("headers-queue", true);
}
// Bind the queue to the header switch and specify the header attributes@Bean
public Binding headersBinding() {
    Map<String, Object> headers = new HashMap<>();
    ("format", "pdf");
    ("type", "report");
    return (headersQueue())
            .to(headersExchange())
            .whereAll(headers)
            .match();
}

Production news:

Header switch production message: A message with header attribute needs to be built.

// Send messages to the header switchMessageProperties messageProperties = new MessageProperties();
("format", "pdf");
("type", "report");
Message message = new Message("Your message here".getBytes(), messageProperties);
("headers-exchange", "", message);

This is the article about using RabbitMQ to produce messages and consumption messages in Spring Boot. For more related Spring Boot production messages and consumption messages, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!