SoFunction
Updated on 2025-03-05

golang gin listens to the case code of unlimited consumption of rabbitmq queue

golang gin listens for unlimited consumption of rabbitmq queue

Connect to rabbitmq

package database

import (
	"/streadway/amqp"
	"log"
	"reflect"
	"yy-data-processing/common/config"
)

var RabbitConn *
var RabbitChannel *

func InitRabbitmq() {
	var err error
	RabbitConn, err = ()
	if err != nil {
		("Connecting RabbitMQ failed")
		panic(err)
	}
	RabbitChannel, err = ()
	if err != nil {
		("Failed to get RabbitMQ channel")
		panic(err)
	}
}

// 0 means the channel is not closed, 1 means the channel is closedfunc CheckRabbitClosed(ch ) int64 {
	d := (ch)
	i := ("closed").Int()
	return i
}

Create a producer

package service

import (
	"encoding/json"
	"/streadway/amqp"
	"log"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/model"
)

func Producer() {
	// Declare the queue, if not, create it	// Queue name, whether it is persistent, whether all consumers automatically delete the queue when they are disconnected from the queue, and whether it is exclusive (can the channel with different connections use this queue)	declare, err := (, true, false, false, false, nil)
	if err != nil {
		("Declaration queue %v failed, error: %v", , err)
		panic(err)
	}

	request := {}
	marshal, _ := (request )
	// exchange、routing key、mandatory、immediate
	err = ("", , false, false, {
		ContentType: "text/plain",
		Body:        []byte(marshal),
	})
	if err != nil {
		("Producer failed to send a message, error: %v", err)
	} else {
		("Producer sends the message successfully")
	}
}

Create a consumer

package service

import (
	"encoding/json"
	"log"
	"os"
	"strings"
	"sync"
	"time"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/model"
)

func Consumer() {
	// Declare the queue, if not, create it	// Queue name, whether it is persistent, whether all consumers automatically delete the queue when they are disconnected from the queue, and whether it is exclusive (can the channel with different connections use this queue)	_, err := (, true, false, false, false, nil)
	if err != nil {
		("Declaration queue %v failed, error: %v", , err)
		panic(err)
	}
    
	// Queue name, consumer, auto-ack, whether it is exclusive	// Deliveries is a pipeline. If messages come to the queue, they will consume. Consumers' messages only need to be obtained from the deliverys pipeline	deliveries, err := (, "", true, false, false, false, nil)
	if err != nil {
		("Failed to get data from queue %v, error: %v", , err)
	} else {
		("Successfully retrieved from the consumption queue")
	}
    
    // Block	for {
		select {
		case message := <-deliveries:
			closed := (*)
			if closed == 1 { // The channel has been closed, reconnect it				()
			} else {
				msgData := string()
				request := {}
				err := ([]byte(msgData), &request)
				if err != nil {
					("Parse rabbitmq data %v failed, error: %v", msgData, err)
				} else {
					// TODO...
                    // Processing logic					
				}
			}
		}
	}
}

Main method coroutine call

package main

import (
	"log"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/router"
	"yy-data-processing/service"
)

func main() {
	// Initialize the routing	routers := ()

	// Initialize RabbitMQ	()
	go ()
	go ()

	port := 
	if err := (":" + port); err != nil {
		("Failed to start the service: ", err)
	}

}

This is the article about golang gin monitoring rabbitmq queue unlimited consumption. For more related golang monitoring rabbitmq content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!