Spring Boot 3 Integration RabbitMQ Practice Guide
1. RabbitMQ core principle
1.1 What is RabbitMQ
RabbitMQ is an open source message broker and queue server developed using the Erlang language and implemented based on the AMQP (Advanced Message Queuing Protocol) protocol. It supports multiple messaging modes, with high availability, scalability and reliability.
1.2 Core Concepts
1.2.1 Basic Components
Producer (Producer)
- Sender of the message
- Responsible for creating messages and publishing them to RabbitMQ
Consumer
- Recipient of message
- Connect to the RabbitMQ server and subscribe to the queue
Exchange (switch)
- Receive messages sent by producers and forward them to the queue according to routing rules
- type:
- Direct Exchange: Exact match according to routing key
- Topic Exchange: Matching according to routing key pattern
- Fanout Exchange: Broadcast to all bound queues
- Headers Exchange: Match according to message attributes
Queue (queue)
- Where message storage
- Supports persistence, temporary, automatic deletion and other features
Binding
- Virtual connection between switch and queue
- Define message routing rules
1.2.2 Advanced Features
Message persistence
- Switch persistence: Set durable=true when creating
- Queue persistence: Set durable=true when creating
- Message persistence: Set delivery-mode=2
Message confirmation mechanism
- Producer confirmation: Publisher Confirm and Return mechanisms
- Consumer confirmation: automatic confirmation, manual confirmation, batch confirmation
Dead Letter Queue (DLX)
- Message rejected and not rejoined
- Message Expiration (TTL)
- The queue reaches maximum length
1.3 Application scenarios
Asynchronous processing
- Send emails and text messages notifications
- Log processing, report generation
- File processing, picture processing
Application decoupling
- Inter-system communication
- Service decoupling
- Process separation
Flow control
- Peak cutting and valley filling
- Request buffering
- Flow shaping
Timing tasks
- Delay queue
- Timed processing
- Task Scheduling
2. Environment construction
2.1 Basic environment
- Spring Boot:
- Java: 17+
- RabbitMQ: 3.12+
- Maven/Gradle
2.2 Dependency configuration
<dependencies> <!-- Spring Boot Starter AMQP --> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- Spring Boot Starter Web --> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Lombok --> <dependency> <groupId></groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- Jackson --> <dependency> <groupId></groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>
2.3 Basic configuration
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / # Message confirmation configuration publisher-confirm-type: correlated # Enable release confirmation publisher-returns: true # Turn on publishing and return template: mandatory: true # Return if the message routing fails #Consumer configuration listener: simple: acknowledge-mode: manual # Manual confirmation prefetch: 1 # Number of messages obtained each time retry: enabled: true # Turn on and try again initial-interval: 1000 # Retry interval max-attempts: 3 # Maximum number of retries multiplier: 1.0 # Retry the time multiplier # SSL configuration (optional) ssl: enabled: false key-store: classpath:keystore.p12 key-store-password: password trust-store: classpath:truststore.p12 trust-store-password: password
3. Core configuration class
3.1 RabbitMQ configuration class
@Configuration @EnableRabbit public class RabbitMQConfig { // Switch name public static final String BUSINESS_EXCHANGE = ""; public static final String DEAD_LETTER_EXCHANGE = ""; // Queue name public static final String BUSINESS_QUEUE = ""; public static final String DEAD_LETTER_QUEUE = ""; // Routing key public static final String BUSINESS_KEY = ""; public static final String DEAD_LETTER_KEY = ""; // Service switch @Bean public DirectExchange businessExchange() { return (BUSINESS_EXCHANGE) .durable(true) .build(); } // Dead letter switch @Bean public DirectExchange deadLetterExchange() { return (DEAD_LETTER_EXCHANGE) .durable(true) .build(); } // Business queue @Bean public Queue businessQueue() { Map<String, Object> args = new HashMap<>(3); // Message expiration time ("x-message-ttl", 60000); // Maximum queue length ("x-max-length", 1000); // Dead letter switch ("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); ("x-dead-letter-routing-key", DEAD_LETTER_KEY); return (BUSINESS_QUEUE) .withArguments(args) .build(); } // Dead letter queue @Bean public Queue deadLetterQueue() { return (DEAD_LETTER_QUEUE).build(); } // Business binding @Bean public Binding businessBinding() { return (businessQueue()) .to(businessExchange()) .with(BUSINESS_KEY); } // Dead letter binding @Bean public Binding deadLetterBinding() { return (deadLetterQueue()) .to(deadLetterExchange()) .with(DEAD_LETTER_KEY); } // Message converter @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // RabbitTemplate configuration @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); (messageConverter()); return rabbitTemplate; } }
3.2 Message confirmation configuration
@Configuration @Slf4j public class RabbitConfirmConfig implements , { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { (this); (this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { ("The message was sent to the switch successfully: correlationData={}", correlationData); } else { ("Message sent to the switch failed: correlationData={}, cause={}", correlationData, cause); // Processing failure logic, such as retry, alarm, etc. } } @Override public void returnedMessage(ReturnedMessage returned) { ("Message routing to queue failed: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}", (), (), (), (), new String(().getBody())); // Processing failure logic, such as retry, alarm, etc. } }
4. Message Producer
4.1 Message Send Service
@Service @Slf4j public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(Object message, String exchange, String routingKey) { CorrelationData correlationData = new CorrelationData(().toString()); try { (exchange, routingKey, message, correlationData); ("The message was sent successfully: message={}, exchange={}, routingKey={}, correlationData={}", message, exchange, routingKey, correlationData); } catch (Exception e) { ("Message sending exception: message={}, exchange={}, routingKey={}, correlationData={}, error={}", message, exchange, routingKey, correlationData, ()); throw new RuntimeException("Message sending failed", e); } } public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) { CorrelationData correlationData = new CorrelationData(().toString()); MessagePostProcessor messagePostProcessor = msg -> { ().setDelay((int) delayMillis); return msg; }; try { (exchange, routingKey, message, messagePostProcessor, correlationData); ("延迟The message was sent successfully: message={}, exchange={}, routingKey={}, delay={}, correlationData={}", message, exchange, routingKey, delayMillis, correlationData); } catch (Exception e) { ("延迟Message sending exception: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}", message, exchange, routingKey, delayMillis, correlationData, ()); throw new RuntimeException("Delayed message sending failed", e); } } }
5. Message Consumers
5.1 Message Processing Service
@Service @Slf4j public class MessageConsumer { @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE) public void handleMessage(Message message, Channel channel) throws IOException { long deliveryTag = ().getDeliveryTag(); try { // Get the message content String messageBody = new String(()); ("Received a message: message={}, deliveryTag={}", messageBody, deliveryTag); // Business processing processMessage(messageBody); // Manual confirmation of the message (deliveryTag, false); ("Message processing succeeded: deliveryTag={}", deliveryTag); } catch (Exception e) { ("Message processing exception: deliveryTag={}, error={}", deliveryTag, ()); // Determine whether to re-delivery if (().getRedelivered()) { ("The message has been tried again,Reject message: deliveryTag={}", deliveryTag); (deliveryTag, false); } else { ("The first message processing failed,Re-delivery: deliveryTag={}", deliveryTag); (deliveryTag, false, true); } } } private void processMessage(String message) { // Implement specific business logic ("Processing messages: {}", message); } }
5.2 Dead letter message processing
@Service @Slf4j public class DeadLetterConsumer { @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE) public void handleDeadLetter(Message message, Channel channel) throws IOException { long deliveryTag = ().getDeliveryTag(); try { String messageBody = new String(()); ("Received a dead letter message: message={}, deliveryTag={}", messageBody, deliveryTag); // Dead letter message processing logic processDeadLetter(messageBody); (deliveryTag, false); ("Dead letter message processing was successful: deliveryTag={}", deliveryTag); } catch (Exception e) { ("Dead letter message handling exception: deliveryTag={}, error={}", deliveryTag, ()); (deliveryTag, false); } } private void processDeadLetter(String message) { // Implement dead letter message processing logic ("Handle dead letter messages: {}", message); } }
6. Interface controller
@RestController @RequestMapping("/api/mq") @Slf4j public class MessageController { @Autowired private MessageProducer messageProducer; @PostMapping("/send") public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) { try { ((), RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.BUSINESS_KEY); return ("Message sent successfully"); } catch (Exception e) { ("Message sending failed: {}", ()); return (HttpStatus.INTERNAL_SERVER_ERROR) .body("Message sending failed: " + ()); } } @PostMapping("/send/delay") public ResponseEntity<String> sendDelayMessage( @RequestBody MessageDTO message, @RequestParam long delayMillis) { try { ((), RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.BUSINESS_KEY, delayMillis); return ("Delayed message sent successfully"); } catch (Exception e) { ("延迟Message sending failed: {}", ()); return (HttpStatus.INTERNAL_SERVER_ERROR) .body("Delayed message sending failed: " + ()); } } }
7. Monitoring and Operation and Maintenance
7.1 RabbitMQ Management Interface
- Access address: http://localhost:15672
- Default account: guest/guest
- Main functions:
- Queue monitoring
- Switch management
- Connection status
- Message tracking
7.2 Prometheus + Grafana Monitoring
# scrape_configs: - job_name: 'rabbitmq' static_configs: - targets: ['localhost:15692']
7.3 Log configuration
logging: level: : INFO : DEBUG pattern: console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7.4 Alarm configuration
@Configuration public class RabbitMQAlertConfig { @Value("${}") private String webhookUrl; @Bean public AlertService alertService() { return new DingTalkAlertService(webhookUrl); } }
8. Best Practices
8.1 Message Impotence Processing
@Service public class MessageIdempotentHandler { @Autowired private RedisTemplate<String, String> redisTemplate; public boolean isProcessed(String messageId) { String key = "mq:processed:" + messageId; return (().setIfAbsent(key, "1", 24, )); } }
8.2 Message Retry Policy
@Configuration public class RetryConfig { @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); (1000); (backOffPolicy); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); (3); (retryPolicy); return retryTemplate; } }
8.3 Message Serialization
@Configuration public class MessageConverterConfig { @Bean public MessageConverter jsonMessageConverter() { Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); (true); return converter; } }
8.4 Message Tracking
@Aspect @Component @Slf4j public class MessageTraceAspect { @Around("@annotation()") public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable { String messageId = ("messageId"); ("Start processing messages: messageId={}", messageId); try { Object result = (); ("Message processing is completed: messageId={}", messageId); return result; } catch (Exception e) { ("Message processing exception: messageId={}, error={}", messageId, ()); throw e; } } }
9. FAQs and Solutions
9.1 Message Loss Issues
- Producer confirmation mechanism
- Message persistence
- Manual confirmation mode
- High cluster availability
9.2 Repeated consumption of messages
- Idepotency processing
- Repeat the news
- Business inspection
9.3 Message accumulation problem
- Increase the number of consumers
- Improve processing efficiency
- Queue sharding
- Dead letter queue processing
9.4 Performance optimization
- Set the prefetch quantity reasonably
- Batch confirmation message
- Message compression
- Connection pool optimization
10. High availability deployment
10.1 Cluster Configuration
spring: rabbitmq: addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672 username: admin password: password virtual-host: /
10.2 Mirror Queue
# Set mirroring policyrabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
10.3 Load balancing
# upstream rabbitmq_cluster { server rabbit1:15672; server rabbit2:15672; server rabbit3:15672; }
11. Reference resources
- Spring AMQP official documentation
- RabbitMQ official documentation
- Spring Boot official documentation
This is the article about Spring Boot 3 Integration RabbitMQ Practical Guide. For more related Spring Boot 3 Integration RabbitMQ content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!