Preface
Delay queues are a very useful tool. We often encounter scenarios where we need to use delay queues, such as delay notifications, order closures, etc.
This article mainly uses Go+Kafka to implement delayed messages.
UsedsaramaClient.
principle
Kafka implements delayed messages into the following three steps:
- The producer sends the message to
Delay queue
- Delayed service
Delay queue
Write messages that exceed the delay timeReal queue
- Consumer consumption
Real queue
News in
Simple implementation
Producer
The producer just sends the message toDelay queue
msg := &{ Topic: kafka_delay_queue_test.DelayTopic, Value: ("test" + (i)), } if _, _, err := (msg); err != nil { (err) }
Delayed Service
Delayed service will be subscribedDelay queue
andTimeout message
Send toReal queue
if err = ((), []string{kafka_delay_queue_test.DelayTopic}, consumer); err != nil { break }
type Consumer struct { producer delay } func NewConsumer(producer , delay ) *Consumer { return &Consumer{ producer: producer, delay: delay, } } func (c *Consumer) ConsumeClaim(session , claim ) error { for message := range () { // If the message has timed out, send the message to the real queue now := () if () >= { _, _, err := (&{ Topic: kafka_delay_queue_test.RealTopic, Key: (), Value: (), }) if err == nil { (message, "") } continue } // Otherwise sleep for one second () return nil } return nil }
consumer
Consumers only subscribeReal queue
And consume news
if err = ((), []string{kafka_delay_queue_test.RealTopic}, consumer); err != nil { break }
type Consumer struct{} func NewConsumer() *Consumer { return &Consumer{} } func (c *Consumer) ConsumeClaim(session , claim ) error { for message := range () { ("Received the message:", , ) (message, "") } return nil }
Improvement points
Universal delay service
The delay service can be encapsulated into a common service, so that the producer can directly send messages to the delay service, allowing the delay service to process the remaining logic.
Delay service can provide multiple delay levels, such as 5s, 10s, 30s, 1m, 5m, 10m, 1h, 2h, etc., similar to RocketMQ.
Producers are responsible for delayed services
It can also be held responsible for the delay service, allowing the producer to send messages in the delay queue to the real queue by themselves.
Here is a simple implementation:
// KafkaDelayQueueProducer Delay queue producer, including producer and delay servicetype KafkaDelayQueueProducer struct { producer // Producer delayTopic string // Delay service topic} // NewKafkaDelayQueueProducer Creates Delay Queue Producer// producer// delayServiceConsumerGroup Delayed service to consumers// delayTime delay time// delayTopic Delay Service Topic// realTopic real queue themefunc NewKafkaDelayQueueProducer(producer , delayServiceConsumerGroup , delayTime , delayTopic, realTopic string) *KafkaDelayQueueProducer { // Start the delayed service consumer := NewDelayServiceConsumer(producer, delayTime, realTopic) go func() { for { if err := ((), []string{delayTopic}, consumer); err != nil { break } } }() return &KafkaDelayQueueProducer{ producer: producer, delayTopic: delayTopic, } } // SendMessage Send Messagefunc (q *KafkaDelayQueueProducer) SendMessage(msg *) (partition int32, offset int64, err error) { = return (msg) } // DelayServiceConsumer Delayed service to consumerstype DelayServiceConsumer struct { producer delay realTopic string } func NewDelayServiceConsumer(producer , delay , realTopic string) *DelayServiceConsumer { return &DelayServiceConsumer{ producer: producer, delay: delay, realTopic: realTopic, } } func (c *DelayServiceConsumer) ConsumeClaim(session , claim ) error { for message := range () { // If the message has timed out, send the message to the real queue now := () if () >= { _, _, err := (&{ Topic: , Key: (), Value: (), }) if err == nil { (message, "") } continue } // Otherwise sleep for one second () return nil } return nil } func (c *DelayServiceConsumer) Setup() error { return nil } func (c *DelayServiceConsumer) Cleanup() error { return nil }
Summarize
useIntermediate queue
+polling
Delayed messages can be easily implemented in Kafka. If a general delay queue is required, a general delay service can be implemented, and consumers can also be responsible for the functions of the delay service.
Complete code:
- Simple implementation example:/jiaxwu/dq/blob/main/kafka_delay_queue_producer.go
- Producers containing delayed services:/jiaxwu/dq/tree/main/kafka_delay_queue_example
This is the article about the implementation example of Go+Kafka delayed message implementation. For more related Go Kafka delayed message content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!