In this tutorial, we will introduce how to use itkafka-go
Kulai consumes Kafka news and focuses on explainingFetchMessage
andReadMessage
The difference and the scenarios they apply to. With this tutorial, you will learn how to use it effectivelykafka-go
Library to process messages and manage offsets.
Install kafka-go library
First, you need to install it in the projectkafka-go
Library. The following commands can be used:
go get /segmentio/kafka-go
Initialize Kafka Reader
In order to consume messages from Kafka, we first need to configure and initialize the Kafka Reader. Here is a simple Kafka Reader initialization example:
package main import ( "context" "log" "/segmentio/kafka-go" ) func main() { // Create Kafka Reader kafkaReader := ({ Brokers: []string{"localhost:9092"}, // Kafka broker address Topic: "example-topic", // Subscribed Kafka topic GroupID: "example-group", // Consumer group ID Partition: 0, // Partition number (optional) MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) defer () }
Use FetchMessage to consume messages
FetchMessage
Allows you to consume messages from Kafka and submit offsets manually, which gives you more precise control over message processing. Here is how to useFetchMessage
Example:
func consumeWithFetchMessage() { ctx := () for { // Get the next message from Kafka m, err := (ctx) if err != nil { ("Error getting message: %v", err) break } // Print message content ("Message: %s, Offset: %d", string(), ) // Process messages (you can do your business logic here) // Manually submit offset if err := (ctx, m); err != nil { ("Error submitting offset: %v", err) } } }
advantage
- Precisely control offset: After processing the message, you can manually choose whether to submit the offset, which ensures that the message is only submitted after successful processing.
- Retry mechanism: The failed messages can be processed flexibly, for example, when processing fails, no offset is submitted, thereby realizing re-consumption of messages.
shortcoming
- Increased code complexity: The offset commit needs to be processed manually, which will add some additional code volume.
Use ReadMessage to consume messages
ReadMessage
is an easier way to get messages from Kafka and automatically submit offsets. Suitable for scenarios that are not very sensitive to consumption logic. The following is usedReadMessage
Example:
func consumeWithReadMessage() { ctx := () for { // Read the next message from Kafka and automatically submit the offset dataInfo, err := (ctx) if err != nil { ("Error reading message: %v", err) break } // Print message content ("Message: %s, Offset: %d", string(), ) // Process messages (you can do your business logic here) } }
advantage
-
Simple and easy to use:
ReadMessage
Automatically submit offsets, the code is concise and easy to maintain. - Rapid development: Suitable for simple message processing logic and scenarios where message reliability is not high.
shortcoming
- Lack of flexibility: The message cannot be re-consuming when processing fails because the offset has been automatically submitted.
Summary of choice
method | advantage | shortcoming | Applicable scenarios |
---|---|---|---|
FetchMessage |
Need to manually submit offsets to accurately control message processing and commit logic | High code complexity | Scenarios where message processing is required, such as processing failure and retry |
ReadMessage |
Simple and easy to use, automatic offset submission, simpler code | Unable to reconsum processed failed messages | Simple message processing, scenarios where the success rate of message processing is not high |
Complete example
Here is a complete Kafka consumer example, includingFetchMessage
andReadMessage
Two methods. You can choose the right method according to your needs:
package main import ( "context" "log" "/segmentio/kafka-go" ) func main() { // Create Kafka Reader kafkaReader := ({ Brokers: []string{"localhost:9092"}, Topic: "example-topic", GroupID: "example-group", MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) defer () // Use FetchMessage to consume messages ("Start consume Kafka messages using FetchMessage...") consumeWithFetchMessage(kafkaReader) // Use ReadMessage to consume messages ("Start consume Kafka messages using ReadMessage...") consumeWithReadMessage(kafkaReader) } func consumeWithFetchMessage(kafkaReader *) { ctx := () for { m, err := (ctx) if err != nil { ("Error getting message: %v", err) break } ("FetchMessage message: %s, offset: %d", string(), ) // Manually submit offset if err := (ctx, m); err != nil { ("FetchMessage Error submitting offset: %v", err) } } } func consumeWithReadMessage(kafkaReader *) { ctx := () for { dataInfo, err := (ctx) if err != nil { ("ReadMessage error occurred while reading message: %v", err) break } ("ReadMessage message: %s, offset: %d", string(), ) } }
Conclusion
Through this tutorial, you have learned how to use itkafka-go
ofFetchMessage
andReadMessage
Methods to consume Kafka messages. Select the appropriate consumption method according to project needs and reasonably manage offsets to ensure the reliability and efficiency of message processing.
This is the article about using kafka-go to implement Kafka consumption messages in Go. For more information about using kafka-go to consume messages in Go, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!