1. User library instructions
You can use third-party library to connect kafka in Golang: /Shopify/sarama
2. Kafka Producer sends messages
package main import ( "fmt" "/Shopify/sarama" ) func main() { config := () = // After sending the data, both the leader and the follower need to confirm = //Write to random partitions, we set 32 partitions by default = true // The message successfully delivered will be returned in the success channel // Construct a message msg := &{} = "task" = ("producer kafka messages...") // Connect to kafka client, err := ([]string{"192.20.216.8:9092"}, config) if err != nil { ("Producer closed, err:", err) return } defer () // Send a message pid, offset, err := (msg) if err != nil { ("send msg failed, err:", err) return } ("pid:%v offset:%v\n", pid, offset) }
3. Kafka Consumer Consumption News
package main import ( "fmt" "/Shopify/sarama" "sync" ) func main() { var wg consumer, err := ([]string{"192.20.216.8:9092"}, nil) if err != nil { ("Failed to start consumer: %s", err) return } partitionList, err := ("task-status-data") // Get all partitions through topic if err != nil { ("Failed to get the list of partition: ", err) return } (partitionList) for partition := range partitionList{ // traverse all partitions pc, err := ("task", int32(partition), ) // Create a partition consumer for each partition if err != nil { ("Failed to start consumer for partition %d: %s\n", partition, err) } (1) go func() { // Open a go coroutine for each partition to get the value for msg := range () { // Block until a value is sent, and then continue to wait ("Partition:%d, Offset:%d, key:%s, value:%s\n", , , string(), string()) } defer () () }(pc) } () () }
This is the end of this article about the implementation example of Golang operation Kafka. For more related content on Golang operation Kafka, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!