golang gin listens for unlimited consumption of rabbitmq queue
Connect to rabbitmq
package database import ( "/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn * var RabbitChannel * func InitRabbitmq() { var err error RabbitConn, err = () if err != nil { ("Connecting RabbitMQ failed") panic(err) } RabbitChannel, err = () if err != nil { ("Failed to get RabbitMQ channel") panic(err) } } // 0 means the channel is not closed, 1 means the channel is closedfunc CheckRabbitClosed(ch ) int64 { d := (ch) i := ("closed").Int() return i }
Create a producer
package service import ( "encoding/json" "/streadway/amqp" "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Producer() { // Declare the queue, if not, create it // Queue name, whether it is persistent, whether all consumers automatically delete the queue when they are disconnected from the queue, and whether it is exclusive (can the channel with different connections use this queue) declare, err := (, true, false, false, false, nil) if err != nil { ("Declaration queue %v failed, error: %v", , err) panic(err) } request := {} marshal, _ := (request ) // exchange、routing key、mandatory、immediate err = ("", , false, false, { ContentType: "text/plain", Body: []byte(marshal), }) if err != nil { ("Producer failed to send a message, error: %v", err) } else { ("Producer sends the message successfully") } }
Create a consumer
package service import ( "encoding/json" "log" "os" "strings" "sync" "time" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Consumer() { // Declare the queue, if not, create it // Queue name, whether it is persistent, whether all consumers automatically delete the queue when they are disconnected from the queue, and whether it is exclusive (can the channel with different connections use this queue) _, err := (, true, false, false, false, nil) if err != nil { ("Declaration queue %v failed, error: %v", , err) panic(err) } // Queue name, consumer, auto-ack, whether it is exclusive // Deliveries is a pipeline. If messages come to the queue, they will consume. Consumers' messages only need to be obtained from the deliverys pipeline deliveries, err := (, "", true, false, false, false, nil) if err != nil { ("Failed to get data from queue %v, error: %v", , err) } else { ("Successfully retrieved from the consumption queue") } // Block for { select { case message := <-deliveries: closed := (*) if closed == 1 { // The channel has been closed, reconnect it () } else { msgData := string() request := {} err := ([]byte(msgData), &request) if err != nil { ("Parse rabbitmq data %v failed, error: %v", msgData, err) } else { // TODO... // Processing logic } } } } }
Main method coroutine call
package main import ( "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/router" "yy-data-processing/service" ) func main() { // Initialize the routing routers := () // Initialize RabbitMQ () go () go () port := if err := (":" + port); err != nil { ("Failed to start the service: ", err) } }
This is the article about golang gin monitoring rabbitmq queue unlimited consumption. For more related golang monitoring rabbitmq content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!