SoFunction
Updated on 2025-04-08

Spring Boot 3 Integration RabbitMQ Practical Guide (Principle Analysis)

Spring Boot 3 Integration RabbitMQ Practice Guide

1. RabbitMQ core principle

1.1 What is RabbitMQ

RabbitMQ is an open source message broker and queue server developed using the Erlang language and implemented based on the AMQP (Advanced Message Queuing Protocol) protocol. It supports multiple messaging modes, with high availability, scalability and reliability.

1.2 Core Concepts

1.2.1 Basic Components

Producer (Producer)

  • Sender of the message
  • Responsible for creating messages and publishing them to RabbitMQ

Consumer

  • Recipient of message
  • Connect to the RabbitMQ server and subscribe to the queue

Exchange (switch)

  • Receive messages sent by producers and forward them to the queue according to routing rules
  • type:
    • Direct Exchange: Exact match according to routing key
    • Topic Exchange: Matching according to routing key pattern
    • Fanout Exchange: Broadcast to all bound queues
    • Headers Exchange: Match according to message attributes

Queue (queue)

    • Where message storage
    • Supports persistence, temporary, automatic deletion and other features

Binding

  • Virtual connection between switch and queue
  • Define message routing rules

1.2.2 Advanced Features

Message persistence

  • Switch persistence: Set durable=true when creating
  • Queue persistence: Set durable=true when creating
  • Message persistence: Set delivery-mode=2

Message confirmation mechanism

  • Producer confirmation: Publisher Confirm and Return mechanisms
  • Consumer confirmation: automatic confirmation, manual confirmation, batch confirmation

Dead Letter Queue (DLX)

  • Message rejected and not rejoined
  • Message Expiration (TTL)
  • The queue reaches maximum length

1.3 Application scenarios

Asynchronous processing

  • Send emails and text messages notifications
  • Log processing, report generation
  • File processing, picture processing

Application decoupling

  • Inter-system communication
  • Service decoupling
  • Process separation

Flow control

  • Peak cutting and valley filling
  • Request buffering
  • Flow shaping

Timing tasks

  • Delay queue
  • Timed processing
  • Task Scheduling

2. Environment construction

2.1 Basic environment

  • Spring Boot:
  • Java: 17+
  • RabbitMQ: 3.12+
  • Maven/Gradle

2.2 Dependency configuration

<dependencies>
    <!-- Spring Boot Starter AMQP -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Lombok -->
    <dependency>
        <groupId></groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId></groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

2.3 Basic configuration

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # Message confirmation configuration    publisher-confirm-type: correlated  # Enable release confirmation    publisher-returns: true             # Turn on publishing and return    template:
      mandatory: true                   # Return if the message routing fails    #Consumer configuration    listener:
      simple:
        acknowledge-mode: manual        # Manual confirmation        prefetch: 1                     # Number of messages obtained each time        retry:
          enabled: true                 # Turn on and try again          initial-interval: 1000        # Retry interval          max-attempts: 3               # Maximum number of retries          multiplier: 1.0              # Retry the time multiplier    # SSL configuration (optional)    ssl:
      enabled: false
      key-store: classpath:keystore.p12
      key-store-password: password
      trust-store: classpath:truststore.p12
      trust-store-password: password

3. Core configuration class

3.1 RabbitMQ configuration class

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    // Switch name    public static final String BUSINESS_EXCHANGE = "";
    public static final String DEAD_LETTER_EXCHANGE = "";
    // Queue name    public static final String BUSINESS_QUEUE = "";
    public static final String DEAD_LETTER_QUEUE = "";
    // Routing key    public static final String BUSINESS_KEY = "";
    public static final String DEAD_LETTER_KEY = "";
    // Service switch    @Bean
    public DirectExchange businessExchange() {
        return (BUSINESS_EXCHANGE)
                .durable(true)
                .build();
    }
    // Dead letter switch    @Bean
    public DirectExchange deadLetterExchange() {
        return (DEAD_LETTER_EXCHANGE)
                .durable(true)
                .build();
    }
    // Business queue    @Bean
    public Queue businessQueue() {
        Map&lt;String, Object&gt; args = new HashMap&lt;&gt;(3);
        // Message expiration time        ("x-message-ttl", 60000);
        // Maximum queue length        ("x-max-length", 1000);
        // Dead letter switch        ("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        ("x-dead-letter-routing-key", DEAD_LETTER_KEY);
        return (BUSINESS_QUEUE)
                .withArguments(args)
                .build();
    }
    // Dead letter queue    @Bean
    public Queue deadLetterQueue() {
        return (DEAD_LETTER_QUEUE).build();
    }
    // Business binding    @Bean
    public Binding businessBinding() {
        return (businessQueue())
                .to(businessExchange())
                .with(BUSINESS_KEY);
    }
    // Dead letter binding    @Bean
    public Binding deadLetterBinding() {
        return (deadLetterQueue())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_KEY);
    }
    // Message converter    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // RabbitTemplate configuration    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        (messageConverter());
        return rabbitTemplate;
    }
}

3.2 Message confirmation configuration

@Configuration
@Slf4j
public class RabbitConfirmConfig implements ,  {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        (this);
        (this);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            ("The message was sent to the switch successfully: correlationData={}", correlationData);
        } else {
            ("Message sent to the switch failed: correlationData={}, cause={}", correlationData, cause);
            // Processing failure logic, such as retry, alarm, etc.        }
    }
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        ("Message routing to queue failed: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
                (),
                (),
                (),
                (),
                new String(().getBody()));
        // Processing failure logic, such as retry, alarm, etc.    }
}

4. Message Producer

4.1 Message Send Service

@Service
@Slf4j
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(Object message, String exchange, String routingKey) {
        CorrelationData correlationData = new CorrelationData(().toString());
        try {
            (exchange, routingKey, message, correlationData);
            ("The message was sent successfully: message={}, exchange={}, routingKey={}, correlationData={}",
                    message, exchange, routingKey, correlationData);
        } catch (Exception e) {
            ("Message sending exception: message={}, exchange={}, routingKey={}, correlationData={}, error={}",
                    message, exchange, routingKey, correlationData, ());
            throw new RuntimeException("Message sending failed", e);
        }
    }
    public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {
        CorrelationData correlationData = new CorrelationData(().toString());
        MessagePostProcessor messagePostProcessor = msg -&gt; {
            ().setDelay((int) delayMillis);
            return msg;
        };
        try {
            (exchange, routingKey, message, messagePostProcessor, correlationData);
            ("延迟The message was sent successfully: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",
                    message, exchange, routingKey, delayMillis, correlationData);
        } catch (Exception e) {
            ("延迟Message sending exception: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",
                    message, exchange, routingKey, delayMillis, correlationData, ());
            throw new RuntimeException("Delayed message sending failed", e);
        }
    }
}

5. Message Consumers

5.1 Message Processing Service

@Service
@Slf4j
public class MessageConsumer {
    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
    public void handleMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = ().getDeliveryTag();
        try {
            // Get the message content            String messageBody = new String(());
            ("Received a message: message={}, deliveryTag={}", messageBody, deliveryTag);
            // Business processing            processMessage(messageBody);
            // Manual confirmation of the message            (deliveryTag, false);
            ("Message processing succeeded: deliveryTag={}", deliveryTag);
        } catch (Exception e) {
            ("Message processing exception: deliveryTag={}, error={}", deliveryTag, ());
            // Determine whether to re-delivery            if (().getRedelivered()) {
                ("The message has been tried again,Reject message: deliveryTag={}", deliveryTag);
                (deliveryTag, false);
            } else {
                ("The first message processing failed,Re-delivery: deliveryTag={}", deliveryTag);
                (deliveryTag, false, true);
            }
        }
    }
    private void processMessage(String message) {
        // Implement specific business logic        ("Processing messages: {}", message);
    }
}

5.2 Dead letter message processing

@Service
@Slf4j
public class DeadLetterConsumer {
    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void handleDeadLetter(Message message, Channel channel) throws IOException {
        long deliveryTag = ().getDeliveryTag();
        try {
            String messageBody = new String(());
            ("Received a dead letter message: message={}, deliveryTag={}", messageBody, deliveryTag);
            // Dead letter message processing logic            processDeadLetter(messageBody);
            (deliveryTag, false);
            ("Dead letter message processing was successful: deliveryTag={}", deliveryTag);
        } catch (Exception e) {
            ("Dead letter message handling exception: deliveryTag={}, error={}", deliveryTag, ());
            (deliveryTag, false);
        }
    }
    private void processDeadLetter(String message) {
        // Implement dead letter message processing logic        ("Handle dead letter messages: {}", message);
    }
}

6. Interface controller

@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {
    @Autowired
    private MessageProducer messageProducer;
    @PostMapping("/send")
    public ResponseEntity&lt;String&gt; sendMessage(@RequestBody MessageDTO message) {
        try {
            ((),
                    RabbitMQConfig.BUSINESS_EXCHANGE,
                    RabbitMQConfig.BUSINESS_KEY);
            return ("Message sent successfully");
        } catch (Exception e) {
            ("Message sending failed: {}", ());
            return (HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("Message sending failed: " + ());
        }
    }
    @PostMapping("/send/delay")
    public ResponseEntity&lt;String&gt; sendDelayMessage(
            @RequestBody MessageDTO message,
            @RequestParam long delayMillis) {
        try {
            ((),
                    RabbitMQConfig.BUSINESS_EXCHANGE,
                    RabbitMQConfig.BUSINESS_KEY,
                    delayMillis);
            return ("Delayed message sent successfully");
        } catch (Exception e) {
            ("延迟Message sending failed: {}", ());
            return (HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("Delayed message sending failed: " + ());
        }
    }
}

7. Monitoring and Operation and Maintenance

7.1 RabbitMQ Management Interface

  • Access address: http://localhost:15672
  • Default account: guest/guest
  • Main functions:
    • Queue monitoring
    • Switch management
    • Connection status
    • Message tracking

7.2 Prometheus + Grafana Monitoring

# 
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['localhost:15692']

7.3 Log configuration

logging:
  level:
    : INFO
    : DEBUG
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

7.4 Alarm configuration

@Configuration
public class RabbitMQAlertConfig {
    @Value("${}")
    private String webhookUrl;
    @Bean
    public AlertService alertService() {
        return new DingTalkAlertService(webhookUrl);
    }
}

8. Best Practices

8.1 Message Impotence Processing

@Service
public class MessageIdempotentHandler {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    public boolean isProcessed(String messageId) {
        String key = "mq:processed:" + messageId;
        return (().setIfAbsent(key, "1", 24, ));
    }
}

8.2 Message Retry Policy

@Configuration
public class RetryConfig {
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        (1000);
        (backOffPolicy);
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        (3);
        (retryPolicy);
        return retryTemplate;
    }
}

8.3 Message Serialization

@Configuration
public class MessageConverterConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        (true);
        return converter;
    }
}

8.4 Message Tracking

@Aspect
@Component
@Slf4j
public class MessageTraceAspect {
    @Around("@annotation()")
    public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        String messageId = ("messageId");
        ("Start processing messages: messageId={}", messageId);
        try {
            Object result = ();
            ("Message processing is completed: messageId={}", messageId);
            return result;
        } catch (Exception e) {
            ("Message processing exception: messageId={}, error={}", messageId, ());
            throw e;
        }
    }
}

9. FAQs and Solutions

9.1 Message Loss Issues

  • Producer confirmation mechanism
  • Message persistence
  • Manual confirmation mode
  • High cluster availability

9.2 Repeated consumption of messages

  • Idepotency processing
  • Repeat the news
  • Business inspection

9.3 Message accumulation problem

  • Increase the number of consumers
  • Improve processing efficiency
  • Queue sharding
  • Dead letter queue processing

9.4 Performance optimization

  • Set the prefetch quantity reasonably
  • Batch confirmation message
  • Message compression
  • Connection pool optimization

10. High availability deployment

10.1 Cluster Configuration

spring:
  rabbitmq:
    addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672
    username: admin
    password: password
    virtual-host: /

10.2 Mirror Queue

# Set mirroring policyrabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

10.3 Load balancing

# 
upstream rabbitmq_cluster {
    server rabbit1:15672;
    server rabbit2:15672;
    server rabbit3:15672;
}

11. Reference resources

  • Spring AMQP official documentation
  • RabbitMQ official documentation
  • Spring Boot official documentation

This is the article about Spring Boot 3 Integration RabbitMQ Practical Guide. For more related Spring Boot 3 Integration RabbitMQ content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!