Delay queues are often used in certain business scenarios, such as order payment timeout, automatic confirmation of order completion after receiving takeout, timing tasks, promotion expiration, etc., using delay queues canSimplify system design and development、Improve system reliability and usability、Improve system performance. The following describes the delay message queue using RabbitMQ. Before using it, you must first make RabbitMQ support delay queues.
Install standalone version of rabbitMQ in docker
The configuration file content is as follows:
version: '3' services: rabbitmq: image: rabbitmq:3.12-management container_name: rabbitmq hostname: rabbitmq-service restart: always ports: - 5672:5672 - 15672:15672 volumes: - $PWD/data:/var/lib/rabbitmq - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.:/plugins/rabbitmq_delayed_message_exchange-3.12. environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest RABBITMQ_DEFAULT_VHOST: /
rabbitMQ does not support delayed message queue types by default, and additional plug-ins are required to implement:
- enabled_plugins is a plugin that is enabled by default, with the content of
[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus]
- rabbitmq_delayed_message_exchange-3.12.It is a delay queue plug-in.
Start rabbitmq:
docker-compose up -d
You can access the management background in the browser http://localhost:15672, the username and password are bothguest
。
Click the menu [exchange]--> [Add a new exchange]-->[Type] and see it in the drop-down listx-delayed-message
For type, it means that the delay queue has been supported.
Using a delay queue requires specifying a specific message type (direct, topic, fanout, headers). The following is a direct type delay message queue as an example.
Production side sample code
package main import ( "context" "fmt" "strconv" "time" amqp "/rabbitmq/amqp091-go" ) var ( url = "amqp://guest:[email protected]:5672/" exchangeName = "delayed-message-exchange-demo" ) func main() { conn, err := (url) checkErr(err) defer () ctx := () queueName := "delayed-message-queue" routingKey := "delayed-key" delayedMessageType := "direct" exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey) q, err := NewProducer(queueName, conn, exchange) checkErr(err) defer () for i := 1; i <= 5; i++ { body := ().Format("2006-01-02 15:04:05.000") + " hello world " + (i) err = (ctx, *5, []byte(body)) // Send a message checkErr(err) () } } // Exchange switchtype Exchange struct { Name string // Exchange name Type string // Exchange type, supports direct, topic, fanout, headers, x-delayed-message RoutingKey string //Route key XDelayedMessageType string // Delay message type, support direct, topic, fanout, headers} // NewDelayedMessageExchange Instantiate a delayed-message type switch, parameter delayedMessageType message type direct, topic, fanout, headersfunc NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange { return &Exchange{ Name: exchangeName, Type: "x-delayed-message", RoutingKey: routingKey, XDelayedMessageType: delayedMessageType, } } // Producer Producer Objecttype Producer struct { queueName string exchange *Exchange conn * ch * } // NewProducer instantiates a producerfunc NewProducer(queueName string, conn *, exchange *Exchange) (*Producer, error) { // Create a pipeline ch, err := () if err != nil { return nil, err } // Declare the switch type err = ( , // Switch name , // x-delayed-message true, // Whether it is persistent false, // Whether to delete automatically false, // Whether it is public or not, false means public false, // Wait or not { "x-delayed-type": , // Types of delay message direct, topic, fanout, headers }, ) if err != nil { _ = () return nil, err } // Declare the queue. If the queue does not exist, it will be created automatically. If it exists, it will be created skipped. q, err := ( queueName, // Message queue name true, // Whether it is persistent false, // Whether to delete automatically false, // Is it exclusive (only the program that created it is available) false, // Whether to block or not nil, // Extra attributes ) if err != nil { _ = () return nil, err } // Bind queues and switches err = ( , , , false, nil, ) if err != nil { _ = () return nil, err } return &Producer{ queueName: queueName, conn: conn, ch: ch, exchange: exchange, }, nil } // Publish Send Messagefunc (p *Producer) Publish(ctx , delayTime , body []byte) error { err := ( ctx, , // exchange name , // key false, // If mandatory is true, the queue that meets the criteria cannot be found according to its own exchange type and routingKey rules will return the message to the sender. false, // immediate If true, when exchange sends a message to the queue and finds that there is no consumer on the queue, the message will be returned to the sender. { DeliveryMode: , // If the queue declaration is persistent, the message is also set to persistent ContentType: "text/plain", Body: body, Headers: { "x-delay": int(delayTime / ), // Delay time: milliseconds }, }, ) if err != nil { return err } ("[send]: %s\n", body) return nil } // Close Close Producerfunc (p *Producer) Close() { if != nil { _ = () } } func checkErr(err error) { if err != nil { panic(err) } }
Consumer side sample code
package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" amqp "/rabbitmq/amqp091-go" ) var ( url = "amqp://guest:[email protected]:5672/" exchangeName = "delayed-message-exchange-demo" ) func main() { conn, err := (url) checkErr(err) defer () ctx := () queueName := "delayed-message-queue" routingKey := "delayed-key" delayedMessageType := "direct" exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey) c, err := NewConsumer(ctx, queueName, exchange, conn) checkErr(err) () // Consumption news defer () ("exit press CTRL+C") interrupt := make(chan , 1) (interrupt, , , , ) <-interrupt ("exit consume messages") } // Exchange switchtype Exchange struct { Name string // Exchange name Type string // Exchange type, supports direct, topic, fanout, headers, x-delayed-message RoutingKey string //Route key XDelayedMessageType string // Delay message type, support direct, topic, fanout, headers} // NewDelayedMessageExchange Instantiate a delayed-message type switch, parameter delayedMessageType message type direct, topic, fanout, headersfunc NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange { return &Exchange{ Name: exchangeName, Type: "x-delayed-message", RoutingKey: routingKey, XDelayedMessageType: delayedMessageType, } } // Consumertype Consumer struct { ctx queueName string conn * ch * delivery <-chan exchange *Exchange } // NewConsumer instantiates a consumerfunc NewConsumer(ctx , queueName string, exchange *Exchange, conn *) (*Consumer, error) { // Create a pipeline ch, err := () if err != nil { return nil, err } // Declare the switch type err = ( , // Switch name , // The type of switch, supports direct, topic, fanout, headers true, // Whether it is persistent false, // Whether to delete automatically false, // Whether it is public or not, false means public false, // Wait or not { "x-delayed-type": , // Types of delay message direct, topic, fanout, headers }, ) if err != nil { _ = () return nil, err } // Declare the queue. If the queue does not exist, it will be created automatically. If it exists, it will be created skipped. q, err := ( queueName, // Message queue name true, // Whether it is persistent false, // Whether to delete automatically false, // Is it exclusive (only the program that created it is available) false, // Whether to block or not nil, // Extra attributes ) if err != nil { _ = () return nil, err } // Bind queues and switches err = ( , , , false, nil, ) if err != nil { _ = () return nil, err } // Register a consumer for the message queue delivery, err := ( ctx, queueName, // queue name "", // consumer is used to distinguish multiple consumers true, // Whether auto-ack answers automatically false, // Is exclusive? false, // no-local If set to true, it cannot pass messages sent by the producer in the same Connection to consumers in this Connection false, // No-wait blocks nil, // args ) if err != nil { _ = () return nil, err } return &Consumer{ queueName: queueName, conn: conn, ch: ch, delivery: delivery, exchange: exchange, }, nil } // Consume receives messagesfunc (c *Consumer) Consume() { go func() { ("waiting for messages, type=%s, queue=%s, key=%s\n", , , ) for d := range { // Process messages ("%s %s [received]: %s\n", ().Format("2006-01-02 15:04:05.000"), , ) // _ = (false) // If auto-ack is false, manual ack is required } }() } // Closefunc (c *Consumer) Close() { if != nil { _ = () } } func checkErr(err error) { if err != nil { panic(err) } }
Summarize
The above introduces a simple example of using rabbitMQ delay message queue. In actual use, connecting rabbitMQ should have a network disconnection and reconnection function.
rabbitMQ needs to rely on the plugin rabbitmq_delayed_message_exchange. The current design of this plugin is not really suitable for scenarios that contain a large number of delayed messages (such as more than hundreds of thousands). In addition, one source of variability of this plugin is based on Erlang timers. After using a certain number of long-term timers in the system, they begin to compete for scheduler resources, and time drift continues to accumulate.
If you use the Delayed Message plug-in to implement it, the reliability of messages is very high. You can save it to DB for marking before sending the message. After consumption, the message is marked as consumed. A timed task can be added in the middle for detection, which can further ensure the reliability of your message.
This is/rabbitmq/amqp091-go
Encapsulated on the basisrabbitmqlibrary, use various message types out of the box (direct
, topic
, fanout
, headers
, delayed message
, publisher subscriber
)。
This is the introduction to this article about the detailed explanation of the use of RabbitMQ delay message queue in golang. For more related go RabbitMQ delay queue content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!