SoFunction
Updated on 2025-03-03

How to access kafka using sarama

The following client code example accesses the kafka server to send and receive messages.

How to use

1. Command line parameters

$ ./kafkaclient -h
Usage of ./client:
 -ca string
  CA Certificate (default "")
 -cert string
  Client Certificate (default "")
 -command string
  consumer|producer (default "consumer")
 -host string
  Common separated kafka hosts (default "localhost:9093")
 -key string
  Client Key (default "")
 -partition int
  Kafka topic partition
 -tls
  TLS enable
 -topic string
  Kafka topic (default "test--topic")

2. Start as a producer

$ ./kafkaclient -command producer \
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
 -tls -cert  -key  -ca  \
 -host kafka1:9093,kafka2:9093

The producer sends a message to kafka:

> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit

3. Start as a consumer

$ ./kafkaclient -command consumer \
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
 -tls -cert  -key  -ca  \
 -host kafka1:9093,kafka2:9093

Consumer accepts messages from kafka:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

The complete source code is as follows

This code is used in the Shopify/sarama library, please download it yourself.

$ cat 
package main

import (
 "flag"
 "fmt"
 "log"
 "os"
 "io/ioutil"
 "bufio"
 "strings"

 "crypto/tls"
 "crypto/x509"

 "/Shopify/sarama"
)

var (
 command  string
 tlsEnable bool
 hosts  string
 topic  string
 partition int
 clientcert string
 clientkey string
 cacert  string
)

func main() {
 (&command, "command",  "consumer",   "consumer|producer")
 (&tlsEnable, "tls",   false,    "TLS enable")
 (&hosts,  "host",   "localhost:9093", "Common separated kafka hosts")
 (&topic,  "topic",  "test--topic",  "Kafka topic")
 (&partition,  "partition", 0,     "Kafka topic partition")
 (&clientcert, "cert",   "",   "Client Certificate")
 (&clientkey, "key",   "",   "Client Key")
 (&cacert,  "ca",   "",   "CA Certificate")
 ()

 config := ()
 if tlsEnable {
  // = (, "[sarama] ", )
  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  if err != nil {
   (err)
  }

   = true
   = tlsConfig
 }
 client, err := ((hosts, ","), config)
 if err != nil {
  ("unable to create kafka client: %q", err)
 }

 if command == "consumer" {
  consumer, err := (client)
  if err != nil {
   (err)
  }
  defer ()
  loopConsumer(consumer, topic, partition)
 } else {
  producer, err := (client)
  if err != nil {
   (err)
  }
  defer ()
  loopProducer(producer, topic, partition)
 }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*, error) {
 // load client cert
 clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
 if err != nil {
  return nil, err
 }

 // load ca cert pool
 cacert, err := (cacertfile)
 if err != nil {
  return nil, err
 }
 cacertpool := ()
 (cacert)

 // generate tlcconfig
 tlsConfig := {}
  = cacertpool
  = []{clientcert}
 ()
 //  = true // This can be used on test server if domain does not match cert:
 return &tlsConfig, err
}

func loopProducer(producer , topic string, partition int) {
 scanner := ()
 ("> ")
 for () {
  text := ()
  if text == "" {
  } else if text == "exit" || text == "quit" {
   break
  } else {
   () <- &{Topic: topic, Key: nil, Value: (text)}
   ("Produced message: [%s]\n",text)
  }
  ("> ")
 }
}

func loopConsumer(consumer , topic string, partition int) {
 partitionConsumer, err := (topic, int32(partition), )
 if err != nil {
  (err)
  return
 }
 defer ()

 for {
  msg := <-()
  ("Consumed message: [%s], offset: [%d]\n", , )
 }
}

Compilation:

$ go build 

The above is all the content of this article. I hope it will be helpful to everyone's study and I hope everyone will support me more.