Study
1.1 Start kafka and zookeeper
kafka and zookeeper are associated
bin/ config/
and
bin/ config/
1.2 Create a topic
bin/ --create --topic hello --bootstrap-server Host Name:9092
1.3 Production news
bin/ --broker-list Host Name:9092 --topic hello
After running, you can send multiple messages, ctrl+c exit
1.4 News before consumption
bin/ --bootstrap-server Host Name:9092 --from-beginning --topic hello
1.5 Specify offset consumption
bin/ --bootstrap-server Host Name:9092 --partition 0 --offset 1 --topic hello
1.6 Latest consumption information
bin/ --bootstrap-server localhost:9092 --topic hello
2 go operations
2.1 Send a message
// Kafka configurationconst ( KafkaBroker = "u8sMaster:9092" // Replace with your Kafka Broker address KafkaTopic = "k8s-version" // Kafka theme) func main() { sendMesgKafka() } func sendMesgKafka() { w := ({ Brokers: []string{KafkaBroker}, Topic: KafkaTopic, Balancer: &{}, }) err := ((), { Key: []byte("Key-A"), Value: []byte("one!"), }, { Key: []byte("Key-B"), Value: []byte("two!"), }, { Key: []byte("Key-C"), Value: []byte("three!"), }, ) if err != nil { ("failed to write messages:", err) } if err := (); err != nil { ("failed to close writer:", err) } ("Message sent successfully") }
2.2 Consumption News
// to consume messages topic := "test" partition := 0 conn, err := ((), "tcp", "u8sMaster:9092", topic, partition) if err != nil { ("failed to dial leader:", err) } (().Add(10*)) batch := (10e3, 1e6) // fetch 10KB min, 1MB max b := make([]byte, 10e3) // 10KB max per message for { n, err := (b) if err != nil { break } (string(b[:n])) } if err := (); err != nil { ("failed to close batch:", err) } if err := (); err != nil { ("failed to close connection:", err) }
2.3 List all topics
func main() { conn, err := ("tcp", "u8sMaster:9092") if err != nil { panic(()) } defer () partitions, err := () if err != nil { panic(()) } m := map[string]struct{}{} for _, p := range partitions { m[] = struct{}{} } for k := range m { (k) } }
2.4 Creating a topic
func main() { conn, err := ((), "tcp", "u9sMaster:9092", "topic2", 0) if err != nil { panic(()) } }
Create topic accurately
func main() { conn, err := ("tcp", "u8sMaster:9092") if err != nil { panic(()) } defer () controller, err := () if err != nil { panic(()) } var connLeader * connLeader, err = ("tcp", (, ())) if err != nil { panic(()) } defer () }
The configuration of kafka cluster is omitted here, and there is a chance to supplement it in the future.
The above is a detailed explanation of the basic operations of Golang connecting kafka. For more information about Go connecting kafka, please follow my other related articles!