SoFunction
Updated on 2025-03-03

Example of implementation of Golang operating Kafka

1. User library instructions

You can use third-party library to connect kafka in Golang: /Shopify/sarama

2. Kafka Producer sends messages

package main 

import (
    "fmt"
    "/Shopify/sarama"
)

func main() {
    config := ()
     =  // After sending the data, both the leader and the follower need to confirm     =   //Write to random partitions, we set 32 ​​partitions by default     = true // The message successfully delivered will be returned in the success channel
    // Construct a message    msg := &{}
     = "task"
     = ("producer kafka messages...")

    // Connect to kafka    client, err := ([]string{"192.20.216.8:9092"}, config)
    if err != nil {
        ("Producer closed, err:", err)
        return
    }
    defer ()

    // Send a message    pid, offset, err := (msg)
    if err != nil {
        ("send msg failed, err:", err)
        return
    }
    ("pid:%v offset:%v\n", pid, offset)
}

3. Kafka Consumer Consumption News

package main

import (
    "fmt"
    "/Shopify/sarama"
    "sync"
)

func main() {
    var wg 
    consumer, err := ([]string{"192.20.216.8:9092"}, nil)
    if err != nil {
        ("Failed to start consumer: %s", err)
        return
    }
    partitionList, err := ("task-status-data") // Get all partitions through topic    if err != nil {
        ("Failed to get the list of partition: ", err)
        return
    }
    (partitionList)

    for partition := range partitionList{ // traverse all partitions        pc, err := ("task", int32(partition), ) // Create a partition consumer for each partition        if err != nil {
            ("Failed to start consumer for partition %d: %s\n", partition, err)
        }
        (1)
        go func() { // Open a go coroutine for each partition to get the value            for msg := range () { // Block until a value is sent, and then continue to wait                ("Partition:%d, Offset:%d, key:%s, value:%s\n", , , string(), string())
            }
            defer ()
            ()
        }(pc)
    }
    ()
    ()
}

This is the end of this article about the implementation example of Golang operation Kafka. For more related content on Golang operation Kafka, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!