SoFunction
Updated on 2025-03-05

Detailed explanation of the use of RabbitMQ delay message queue in golang

Delay queues are often used in certain business scenarios, such as order payment timeout, automatic confirmation of order completion after receiving takeout, timing tasks, promotion expiration, etc., using delay queues canSimplify system design and developmentImprove system reliability and usabilityImprove system performance. The following describes the delay message queue using RabbitMQ. Before using it, you must first make RabbitMQ support delay queues.

Install standalone version of rabbitMQ in docker

The configuration file content is as follows:

version: '3'
 
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    hostname: rabbitmq-service
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - $PWD/data:/var/lib/rabbitmq
      - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins
      - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.:/plugins/rabbitmq_delayed_message_exchange-3.12.
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: /

rabbitMQ does not support delayed message queue types by default, and additional plug-ins are required to implement:

  • enabled_plugins is a plugin that is enabled by default, with the content of[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus]
  • rabbitmq_delayed_message_exchange-3.12.It is a delay queue plug-in.

Start rabbitmq:

docker-compose up -d

You can access the management background in the browser http://localhost:15672, the username and password are bothguest

Click the menu [exchange]--> [Add a new exchange]-->[Type] and see it in the drop-down listx-delayed-messageFor type, it means that the delay queue has been supported.

Using a delay queue requires specifying a specific message type (direct, topic, fanout, headers). The following is a direct type delay message queue as an example.

Production side sample code

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"

	amqp "/rabbitmq/amqp091-go"
)

var (
	url          = "amqp://guest:[email protected]:5672/"
	exchangeName = "delayed-message-exchange-demo"
)

func main() {
	conn, err := (url)
	checkErr(err)
	defer ()

	ctx := ()

	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	delayedMessageType := "direct"
	exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
	q, err := NewProducer(queueName, conn, exchange)
	checkErr(err)
	defer ()
	for i := 1; i <= 5; i++ {
		body := ().Format("2006-01-02 15:04:05.000") + " hello world " + (i)
		err = (ctx, *5, []byte(body)) // Send a message		checkErr(err)
		()
	}
}

// Exchange switchtype Exchange struct {
	Name                string // Exchange name	Type                string // Exchange type, supports direct, topic, fanout, headers, x-delayed-message	RoutingKey          string //Route key	XDelayedMessageType string // Delay message type, support direct, topic, fanout, headers}

// NewDelayedMessageExchange Instantiate a delayed-message type switch, parameter delayedMessageType message type direct, topic, fanout, headersfunc NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
	return &Exchange{
		Name:                exchangeName,
		Type:                "x-delayed-message",
		RoutingKey:          routingKey,
		XDelayedMessageType: delayedMessageType,
	}
}

// Producer Producer Objecttype Producer struct {
	queueName string
	exchange  *Exchange
	conn      *
	ch        *
}

// NewProducer instantiates a producerfunc NewProducer(queueName string, conn *, exchange *Exchange) (*Producer, error) {
	// Create a pipeline	ch, err := ()
	if err != nil {
		return nil, err
	}

	// Declare the switch type	err = (
		, // Switch name		, //  x-delayed-message
		true,          // Whether it is persistent		false,         // Whether to delete automatically		false,         // Whether it is public or not, false means public		false,         // Wait or not		{
			"x-delayed-type": , // Types of delay message direct, topic, fanout, headers		},
	)
	if err != nil {
		_ = ()
		return nil, err
	}

	// Declare the queue. If the queue does not exist, it will be created automatically. If it exists, it will be created skipped.	q, err := (
		queueName, // Message queue name		true,      // Whether it is persistent		false,     // Whether to delete automatically		false,     // Is it exclusive (only the program that created it is available)		false,     // Whether to block or not		nil,       // Extra attributes	)
	if err != nil {
		_ = ()
		return nil, err
	}

	// Bind queues and switches	err = (
		,
		,
		,
		false,
		nil,
	)
	if err != nil {
		_ = ()
		return nil, err
	}

	return &Producer{
		queueName: queueName,
		conn:      conn,
		ch:        ch,
		exchange:  exchange,
	}, nil
}

// Publish Send Messagefunc (p *Producer) Publish(ctx , delayTime , body []byte) error {
	err := (
		ctx,
		,       // exchange name
		, // key
		false,                 // If mandatory is true, the queue that meets the criteria cannot be found according to its own exchange type and routingKey rules will return the message to the sender.		false,                 // immediate If true, when exchange sends a message to the queue and finds that there is no consumer on the queue, the message will be returned to the sender.		{
			DeliveryMode: , // If the queue declaration is persistent, the message is also set to persistent			ContentType:  "text/plain",
			Body:         body,
			Headers: {
				"x-delay": int(delayTime / ), // Delay time: milliseconds			},
		},
	)
	if err != nil {
		return err
	}
	("[send]: %s\n", body)
	return nil
}

// Close Close Producerfunc (p *Producer) Close() {
	if  != nil {
		_ = ()
	}
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

Consumer side sample code

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	amqp "/rabbitmq/amqp091-go"
)

var (
	url          = "amqp://guest:[email protected]:5672/"
	exchangeName = "delayed-message-exchange-demo"
)

func main() {
	conn, err := (url)
	checkErr(err)
	defer ()

	ctx := ()

	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	delayedMessageType := "direct"
	exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
	c, err := NewConsumer(ctx, queueName, exchange, conn)
	checkErr(err)
	() // Consumption news	defer ()

	("exit press CTRL+C")
	interrupt := make(chan , 1)
	(interrupt, , , , )
	<-interrupt
	("exit consume messages")
}

// Exchange switchtype Exchange struct {
	Name                string // Exchange name	Type                string // Exchange type, supports direct, topic, fanout, headers, x-delayed-message	RoutingKey          string //Route key	XDelayedMessageType string // Delay message type, support direct, topic, fanout, headers}

// NewDelayedMessageExchange Instantiate a delayed-message type switch, parameter delayedMessageType message type direct, topic, fanout, headersfunc NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
	return &Exchange{
		Name:                exchangeName,
		Type:                "x-delayed-message",
		RoutingKey:          routingKey,
		XDelayedMessageType: delayedMessageType,
	}
}

// Consumertype Consumer struct {
	ctx       
	queueName string
	conn      *
	ch        *
	delivery  <-chan 
	exchange  *Exchange
}

// NewConsumer instantiates a consumerfunc NewConsumer(ctx , queueName string, exchange *Exchange, conn *) (*Consumer, error) {
	// Create a pipeline	ch, err := ()
	if err != nil {
		return nil, err
	}

	// Declare the switch type	err = (
		, // Switch name		, // The type of switch, supports direct, topic, fanout, headers		true,          // Whether it is persistent		false,         // Whether to delete automatically		false,         // Whether it is public or not, false means public		false,         // Wait or not		{
			"x-delayed-type": , // Types of delay message direct, topic, fanout, headers		},
	)
	if err != nil {
		_ = ()
		return nil, err
	}

	// Declare the queue. If the queue does not exist, it will be created automatically. If it exists, it will be created skipped.	q, err := (
		queueName, // Message queue name		true,      // Whether it is persistent		false,     // Whether to delete automatically		false,     // Is it exclusive (only the program that created it is available)		false,     // Whether to block or not		nil,       // Extra attributes	)
	if err != nil {
		_ = ()
		return nil, err
	}

	// Bind queues and switches	err = (
		,
		,
		,
		false,
		nil,
	)
	if err != nil {
		_ = ()
		return nil, err
	}

	// Register a consumer for the message queue	delivery, err := (
		ctx,
		queueName, // queue name		"",        // consumer is used to distinguish multiple consumers		true,      // Whether auto-ack answers automatically		false,     // Is exclusive?		false,     // no-local If set to true, it cannot pass messages sent by the producer in the same Connection to consumers in this Connection		false,     // No-wait blocks		nil,       // args
	)
	if err != nil {
		_ = ()
		return nil, err
	}

	return &Consumer{
		queueName: queueName,
		conn:      conn,
		ch:        ch,
		delivery:  delivery,
		exchange:  exchange,
	}, nil
}

// Consume receives messagesfunc (c *Consumer) Consume() {
	go func() {
		("waiting for messages, type=%s, queue=%s, key=%s\n", , , )
		for d := range  {
			// Process messages			("%s %s [received]: %s\n", ().Format("2006-01-02 15:04:05.000"), , )
			// _ = (false) // If auto-ack is false, manual ack is required		}
	}()
}

// Closefunc (c *Consumer) Close() {
	if  != nil {
		_ = ()
	}
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

Summarize

The above introduces a simple example of using rabbitMQ delay message queue. In actual use, connecting rabbitMQ should have a network disconnection and reconnection function.

rabbitMQ needs to rely on the plugin rabbitmq_delayed_message_exchange. The current design of this plugin is not really suitable for scenarios that contain a large number of delayed messages (such as more than hundreds of thousands). In addition, one source of variability of this plugin is based on Erlang timers. After using a certain number of long-term timers in the system, they begin to compete for scheduler resources, and time drift continues to accumulate.

If you use the Delayed Message plug-in to implement it, the reliability of messages is very high. You can save it to DB for marking before sending the message. After consumption, the message is marked as consumed. A timed task can be added in the middle for detection, which can further ensure the reliability of your message.

This is/rabbitmq/amqp091-goEncapsulated on the basisrabbitmqlibrary, use various message types out of the box (directtopicfanoutheadersdelayed messagepublisher subscriber)。

This is the introduction to this article about the detailed explanation of the use of RabbitMQ delay message queue in golang. For more related go RabbitMQ delay queue content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!