SoFunction
Updated on 2025-04-22

Sample code for implementing message queues based on Redis

Message queues are very important in distributed systems and can effectively decouple various modules of the system and provide asynchronous processing capabilities and buffering capabilities. As a high-performance in-memory database, Redis can also act as a lightweight message queue in addition to caching and persistent storage. Using Redis to process message queues helps improve system throughput and scalability.

1. Use scenarios

The application scenarios of message queues are very wide, including:

  • Asynchronous task processing: For example, time-consuming operations such as sending emails, SMS, and push notifications can be performed asynchronously through message queues to improve user experience.
  • System decoupling: Decoupling producers and consumers so that the two systems do not need to communicate directly and are independent of each other.
  • Flow peak cutting: In high concurrency scenarios, requests are queued through message queues to alleviate the pressure peak of the system.
  • Log processing: Log messages can be pushed to the queue for centralized processing and storage.

2. Principle analysis

Redis provides several different mechanisms to implement message queues, includingListandPub/Sub

1. List-based message queue

Redis's List data structure is the basis for implementing queues. Common operations include:

  • LPUSH: Push the message to the left end of the queue.
  • RPUSH: Push the message to the right end of the queue.
  • RPOP: A message pops up from the right end of the queue (equivalent to first-in-first-out, i.e. FIFO).
  • BLPOP: Blocking pop-up message, waiting until a new message is available when the queue is empty.

2. Pub/Sub-based publish subscription

Redis's **pub/sub** is a different message queue implementation that supports message broadcasting. Its mechanism is as follows:

  • The publisher publishes a message to a channel.
  • All consumers who have subscribed to the channel can receive messages.

But Pub/Sub featuresMessage not persistent, it is more suitable for real-time messaging, and messages will be lost without subscribers.

III. Implementation process

1. Project structure

Our project is based on Spring Boot and includes the following modules:

  • Producer: Message producer, used to push tasks or messages into queues.
  • Consumer: Message consumer, responsible for reading tasks from the queue and processing.

2. Environmental preparation

existAdd Redis and Web dependencies:

<dependencies>
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

existConfigure Redis in:

spring:
  redis:
    host: localhost
    port: 6379

3. Redis configuration class

ConfigurationRedisTemplateUsed to interact with Redis:

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        (redisConnectionFactory);
        return template;
    }
}

4. List-based message queue implementation

Producer (Message Producer)

Producer pushes messages into queue, usingLPUSHorRPUSHoperate:

@Service
public class MessageProducer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String MESSAGE_QUEUE = "message:queue";

    public void produce(String message) {
        ().leftPush(MESSAGE_QUEUE, message);
    }
}

Consumer (Message Consumer)

The consumer blocks the message from the queue and processes it:

@Service
public class MessageConsumer {

    @Autowired
    private RedisTemplate&lt;String, Object&gt; redisTemplate;

    private static final String MESSAGE_QUEUE = "message:queue";

    @Scheduled(fixedRate = 5000) // Check the queue every 5 seconds    public void consume() {
        String message = (String) ().rightPop(MESSAGE_QUEUE);
        if (message != null) {
            ("Consumed message: " + message);
            // Simulate the message processing        }
    }
}

pass@ScheduledAnnotation: Consumers can periodically pull messages from the Redis queue for processing.

5. Pub/Sub-based message queue implementation

Producer (Publisher)

The publisher publishes the message to the specified channel:

@Service
public class PubSubProducer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void publishMessage(String channel, String message) {
        (channel, message);
    }
}

Consumer (subscriber)

Subscribers listen to the channel's messages and process them:

@Service
public class PubSubConsumer implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        ("Received message: " + new String(()));
    }
}

Redis configuration subscription listener

Configure the subscriber and register the channel:

@Configuration
public class RedisPubSubConfig {

    @Bean
    public MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new PubSubConsumer());
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
                                                        MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        (connectionFactory);
        (listenerAdapter, new PatternTopic("pubsub:channel"));
        return container;
    }
}

6. Controller layer

Provide API interface for producers:

@RestController
@RequestMapping("/queue")
public class QueueController {

    @Autowired
    private MessageProducer messageProducer;

    @Autowired
    private PubSubProducer pubSubProducer;

    // Put messages into queue    @PostMapping("/produce")
    public ResponseEntity&lt;String&gt; produceMessage(@RequestParam String message) {
        (message);
        return ("Message produced");
    }

    // Post a message    @PostMapping("/publish")
    public ResponseEntity&lt;String&gt; publishMessage(@RequestParam String message) {
        ("pubsub:channel", message);
        return ("Message published");
    }
}

4. Test results

  • List-based message queue

    • After starting the Spring Boot application, send a message through the API interface:
      • POST request:/queue/produce
      • parameter:message=HelloQueue
    • The consumer will fetch the message from the queue and print it every time the schedule is scheduled.
  • Pub/Sub-based message queue

    • Post a message:
      • POST request:/queue/publish
      • parameter:message=HelloPubSub
    • Subscribers will receive the message immediately and process it.

V. Summary and Optimization

Although Redis is not a special message queue tool, it is very suitable for use in lightweight and high real-time scenarios. A simple task queue is implemented through List, and message broadcast can be implemented through Pub/Sub. In production environment, the following optimization measures are recommended:

  • Message persistence: Ensure that important messages are not lost, and can be combined with the RDB/AOF mechanism.
  • Queue monitoring and alarm: Monitor queue length, processing delay and other indicators to prevent queue backlog.
  • High availability and disaster recovery: Consider using Redis clusters to ensure high availability.

This is the end of this article about the sample code for implementing message queues based on Redis. For more related Redis message queue content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!