SoFunction
Updated on 2025-03-04

Go language uses kafka-go to implement Kafka consumption messages

In this tutorial, we will introduce how to use itkafka-goKulai consumes Kafka news and focuses on explainingFetchMessageandReadMessageThe difference and the scenarios they apply to. With this tutorial, you will learn how to use it effectivelykafka-goLibrary to process messages and manage offsets.

Install kafka-go library

First, you need to install it in the projectkafka-goLibrary. The following commands can be used:

go get /segmentio/kafka-go

Initialize Kafka Reader

In order to consume messages from Kafka, we first need to configure and initialize the Kafka Reader. Here is a simple Kafka Reader initialization example:

package main

import (
    "context"
    "log"
    "/segmentio/kafka-go"
)

func main() {
    // Create Kafka Reader    kafkaReader := ({
        Brokers:   []string{"localhost:9092"}, // Kafka broker address        Topic:     "example-topic",            // Subscribed Kafka topic        GroupID:   "example-group",            // Consumer group ID        Partition: 0,                          // Partition number (optional)        MinBytes:  10e3,                       // 10KB
        MaxBytes:  10e6,                       // 10MB
    })

    defer ()
}

Use FetchMessage to consume messages

FetchMessageAllows you to consume messages from Kafka and submit offsets manually, which gives you more precise control over message processing. Here is how to useFetchMessageExample:

func consumeWithFetchMessage() {
    ctx := ()
    
    for {
        // Get the next message from Kafka        m, err := (ctx)
        if err != nil {
            ("Error getting message: %v", err)
            break
        }

        // Print message content        ("Message: %s, Offset: %d", string(), )

        // Process messages (you can do your business logic here)
        // Manually submit offset        if err := (ctx, m); err != nil {
            ("Error submitting offset: %v", err)
        }
    }
}

advantage

  • Precisely control offset: After processing the message, you can manually choose whether to submit the offset, which ensures that the message is only submitted after successful processing.
  • Retry mechanism: The failed messages can be processed flexibly, for example, when processing fails, no offset is submitted, thereby realizing re-consumption of messages.

shortcoming

  • Increased code complexity: The offset commit needs to be processed manually, which will add some additional code volume.

Use ReadMessage to consume messages

ReadMessageis an easier way to get messages from Kafka and automatically submit offsets. Suitable for scenarios that are not very sensitive to consumption logic. The following is usedReadMessageExample:

func consumeWithReadMessage() {
    ctx := ()
    
    for {
        // Read the next message from Kafka and automatically submit the offset        dataInfo, err := (ctx)
        if err != nil {
            ("Error reading message: %v", err)
            break
        }

        // Print message content        ("Message: %s, Offset: %d", string(), )

        // Process messages (you can do your business logic here)    }
}

advantage

  • Simple and easy to useReadMessageAutomatically submit offsets, the code is concise and easy to maintain.
  • Rapid development: Suitable for simple message processing logic and scenarios where message reliability is not high.

shortcoming

  • Lack of flexibility: The message cannot be re-consuming when processing fails because the offset has been automatically submitted.

Summary of choice

method advantage shortcoming Applicable scenarios
FetchMessage Need to manually submit offsets to accurately control message processing and commit logic High code complexity Scenarios where message processing is required, such as processing failure and retry
ReadMessage Simple and easy to use, automatic offset submission, simpler code Unable to reconsum processed failed messages Simple message processing, scenarios where the success rate of message processing is not high

Complete example

Here is a complete Kafka consumer example, includingFetchMessageandReadMessageTwo methods. You can choose the right method according to your needs:

package main

import (
    "context"
    "log"
    "/segmentio/kafka-go"
)

func main() {
    // Create Kafka Reader    kafkaReader := ({
        Brokers:   []string{"localhost:9092"},
        Topic:     "example-topic",
        GroupID:   "example-group",
        MinBytes:  10e3, // 10KB
        MaxBytes:  10e6, // 10MB
    })

    defer ()

    // Use FetchMessage to consume messages    ("Start consume Kafka messages using FetchMessage...")
    consumeWithFetchMessage(kafkaReader)

    // Use ReadMessage to consume messages    ("Start consume Kafka messages using ReadMessage...")
    consumeWithReadMessage(kafkaReader)
}

func consumeWithFetchMessage(kafkaReader *) {
    ctx := ()

    for {
        m, err := (ctx)
        if err != nil {
            ("Error getting message: %v", err)
            break
        }

        ("FetchMessage message: %s, offset: %d", string(), )

        // Manually submit offset        if err := (ctx, m); err != nil {
            ("FetchMessage Error submitting offset: %v", err)
        }
    }
}

func consumeWithReadMessage(kafkaReader *) {
    ctx := ()

    for {
        dataInfo, err := (ctx)
        if err != nil {
            ("ReadMessage error occurred while reading message: %v", err)
            break
        }

        ("ReadMessage message: %s, offset: %d", string(), )
    }
}

Conclusion

Through this tutorial, you have learned how to use itkafka-goofFetchMessageandReadMessageMethods to consume Kafka messages. Select the appropriate consumption method according to project needs and reasonably manage offsets to ensure the reliability and efficiency of message processing.

This is the article about using kafka-go to implement Kafka consumption messages in Go. For more information about using kafka-go to consume messages in Go, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!