SoFunction
Updated on 2025-04-11

A detailed explanation of the basic operations of Golang connecting kafka

Study

1.1 Start kafka and zookeeper

kafka and zookeeper are associated

bin/ config/

and

bin/ config/

1.2 Create a topic

bin/ --create --topic hello --bootstrap-server Host Name:9092

1.3 Production news

bin/ --broker-list Host Name:9092 --topic hello

After running, you can send multiple messages, ctrl+c exit

1.4 News before consumption

bin/ --bootstrap-server Host Name:9092 --from-beginning --topic hello

1.5 Specify offset consumption

bin/ --bootstrap-server Host Name:9092 --partition 0 --offset 1 --topic hello

1.6 Latest consumption information

bin/ --bootstrap-server localhost:9092 --topic hello

2 go operations

2.1 Send a message

// Kafka configurationconst (
	KafkaBroker = "u8sMaster:9092" // Replace with your Kafka Broker address	KafkaTopic  = "k8s-version"          // Kafka theme)

func main() {
	sendMesgKafka()
}

func sendMesgKafka() {
	w := ({
		Brokers:  []string{KafkaBroker},
		Topic:    KafkaTopic,
		Balancer: &{},
	})

	err := ((),
		{
			Key:   []byte("Key-A"),
			Value: []byte("one!"),
		},
		{
			Key:   []byte("Key-B"),
			Value: []byte("two!"),
		},
		{
			Key:   []byte("Key-C"),
			Value: []byte("three!"),
		},
	)

	if err != nil {
		("failed to write messages:", err)
	}

	if err := (); err != nil {
		("failed to close writer:", err)
	}

	("Message sent successfully")

}

2.2 Consumption News

// to consume messages
topic := "test"
partition := 0

conn, err := ((), "tcp", "u8sMaster:9092", topic, partition)
if err != nil {
    ("failed to dial leader:", err)
}

(().Add(10*))
batch := (10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    n, err := (b)
    if err != nil {
        break
    }
    (string(b[:n]))
}

if err := (); err != nil {
    ("failed to close batch:", err)
}

if err := (); err != nil {
    ("failed to close connection:", err)
}

2.3 List all topics

func main() {
    conn, err := ("tcp", "u8sMaster:9092")
    if err != nil {
        panic(())
    }
    defer ()
    
    partitions, err := ()
    if err != nil {
        panic(())
    }
    
    m := map[string]struct{}{}
    
    for _, p := range partitions {
        m[] = struct{}{}
    }
    for k := range m {
        (k)
    }
}

2.4 Creating a topic

func main() {
        conn, err := ((), "tcp", "u9sMaster:9092", "topic2", 0)
        if err != nil {
            panic(())
        }
}

Create topic accurately

func main() {
    conn, err := ("tcp", "u8sMaster:9092")
    if err != nil {
        panic(())
    }
    defer ()
    controller, err := ()
    if err != nil {
        panic(())
    }
    var connLeader *
    connLeader, err = ("tcp", (, ()))
    if err != nil {
        panic(())
    }
    defer ()
}

The configuration of kafka cluster is omitted here, and there is a chance to supplement it in the future.

The above is a detailed explanation of the basic operations of Golang connecting kafka. For more information about Go connecting kafka, please follow my other related articles!