SoFunction
Updated on 2025-03-05

Example of implementation of Go+Kafka implementation of delayed messages

Preface

Delay queues are a very useful tool. We often encounter scenarios where we need to use delay queues, such as delay notifications, order closures, etc.

This article mainly uses Go+Kafka to implement delayed messages.

UsedsaramaClient.

principle

Kafka implements delayed messages into the following three steps:

  • The producer sends the message toDelay queue
  • Delayed serviceDelay queueWrite messages that exceed the delay timeReal queue
  • Consumer consumptionReal queueNews in

Simple implementation

Producer

The producer just sends the message toDelay queue

msg := &{
   Topic: kafka_delay_queue_test.DelayTopic,
   Value: ("test" + (i)),
}
if _, _, err := (msg); err != nil {
   (err)
}

Delayed Service

Delayed service will be subscribedDelay queueandTimeout messageSend toReal queue

if err = ((),
   []string{kafka_delay_queue_test.DelayTopic}, consumer); err != nil {
   break
}
type Consumer struct {
   producer 
   delay    
}

func NewConsumer(producer , delay ) *Consumer {
   return &Consumer{
      producer: producer,
      delay:    delay,
   }
}

func (c *Consumer) ConsumeClaim(session , claim ) error {
   for message := range () {
      // If the message has timed out, send the message to the real queue      now := ()
      if () >=  {
         _, _, err := (&{
            Topic: kafka_delay_queue_test.RealTopic,
            Key:   (),
            Value: (),
         })
         if err == nil {
            (message, "")
         }
         continue
      }
      // Otherwise sleep for one second      ()
      return nil
   }
   return nil
}

consumer

Consumers only subscribeReal queueAnd consume news

if err = ((), 
   []string{kafka_delay_queue_test.RealTopic}, consumer); err != nil {
   break
}
type Consumer struct{}

func NewConsumer() *Consumer {
   return &Consumer{}
}

func (c *Consumer) ConsumeClaim(session , claim ) error {
   for message := range () {
      ("Received the message:", , )
      (message, "")
   }
   return nil
}

Improvement points

Universal delay service

The delay service can be encapsulated into a common service, so that the producer can directly send messages to the delay service, allowing the delay service to process the remaining logic.

Delay service can provide multiple delay levels, such as 5s, 10s, 30s, 1m, 5m, 10m, 1h, 2h, etc., similar to RocketMQ.

Producers are responsible for delayed services

It can also be held responsible for the delay service, allowing the producer to send messages in the delay queue to the real queue by themselves.

Here is a simple implementation:

// KafkaDelayQueueProducer Delay queue producer, including producer and delay servicetype KafkaDelayQueueProducer struct {
   producer    // Producer   delayTopic string              // Delay service topic}

// NewKafkaDelayQueueProducer Creates Delay Queue Producer// producer// delayServiceConsumerGroup Delayed service to consumers// delayTime delay time// delayTopic Delay Service Topic// realTopic real queue themefunc NewKafkaDelayQueueProducer(producer , delayServiceConsumerGroup ,
   delayTime , delayTopic, realTopic string) *KafkaDelayQueueProducer {
   // Start the delayed service   consumer := NewDelayServiceConsumer(producer, delayTime, realTopic)
   go func() {
      for {
         if err := ((),
            []string{delayTopic}, consumer); err != nil {
            break
         }
      }
   }()
   return &KafkaDelayQueueProducer{
      producer:   producer,
      delayTopic: delayTopic,
   }
}

// SendMessage Send Messagefunc (q *KafkaDelayQueueProducer) SendMessage(msg *) (partition int32, offset int64, err error) {
    = 
   return (msg)
}

// DelayServiceConsumer Delayed service to consumerstype DelayServiceConsumer struct {
   producer  
   delay     
   realTopic string
}

func NewDelayServiceConsumer(producer , delay ,
   realTopic string) *DelayServiceConsumer {
   return &DelayServiceConsumer{
      producer:  producer,
      delay:     delay,
      realTopic: realTopic,
   }
}

func (c *DelayServiceConsumer) ConsumeClaim(session ,
   claim ) error {
   for message := range () {
      // If the message has timed out, send the message to the real queue      now := ()
      if () >=  {
         _, _, err := (&{
            Topic: ,
            Key:   (),
            Value: (),
         })
         if err == nil {
            (message, "")
         }
         continue
      }
      // Otherwise sleep for one second      ()
      return nil
   }
   return nil
}

func (c *DelayServiceConsumer) Setup() error {
   return nil
}

func (c *DelayServiceConsumer) Cleanup() error {
   return nil
}

Summarize

useIntermediate queue+pollingDelayed messages can be easily implemented in Kafka. If a general delay queue is required, a general delay service can be implemented, and consumers can also be responsible for the functions of the delay service.

Complete code:

  • Simple implementation example:/jiaxwu/dq/blob/main/kafka_delay_queue_producer.go
  • Producers containing delayed services:/jiaxwu/dq/tree/main/kafka_delay_queue_example

This is the article about the implementation example of Go+Kafka delayed message implementation. For more related Go Kafka delayed message content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!