The following client code example accesses the kafka server to send and receive messages.
How to use
1. Command line parameters
$ ./kafkaclient -h Usage of ./client: -ca string CA Certificate (default "") -cert string Client Certificate (default "") -command string consumer|producer (default "consumer") -host string Common separated kafka hosts (default "localhost:9093") -key string Client Key (default "") -partition int Kafka topic partition -tls TLS enable -topic string Kafka topic (default "test--topic")
2. Start as a producer
$ ./kafkaclient -command producer \ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command producer \ -tls -cert -key -ca \ -host kafka1:9093,kafka2:9093
The producer sends a message to kafka:
> aaa 2018/12/15 07:11:21 Produced message: [aaa] > bbb 2018/12/15 07:11:30 Produced message: [bbb] > quit
3. Start as a consumer
$ ./kafkaclient -command consumer \ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command consumer \ -tls -cert -key -ca \ -host kafka1:9093,kafka2:9093
Consumer accepts messages from kafka:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]
The complete source code is as follows
This code is used in the Shopify/sarama library, please download it yourself.
$ cat package main import ( "flag" "fmt" "log" "os" "io/ioutil" "bufio" "strings" "crypto/tls" "crypto/x509" "/Shopify/sarama" ) var ( command string tlsEnable bool hosts string topic string partition int clientcert string clientkey string cacert string ) func main() { (&command, "command", "consumer", "consumer|producer") (&tlsEnable, "tls", false, "TLS enable") (&hosts, "host", "localhost:9093", "Common separated kafka hosts") (&topic, "topic", "test--topic", "Kafka topic") (&partition, "partition", 0, "Kafka topic partition") (&clientcert, "cert", "", "Client Certificate") (&clientkey, "key", "", "Client Key") (&cacert, "ca", "", "CA Certificate") () config := () if tlsEnable { // = (, "[sarama] ", ) tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert) if err != nil { (err) } = true = tlsConfig } client, err := ((hosts, ","), config) if err != nil { ("unable to create kafka client: %q", err) } if command == "consumer" { consumer, err := (client) if err != nil { (err) } defer () loopConsumer(consumer, topic, partition) } else { producer, err := (client) if err != nil { (err) } defer () loopProducer(producer, topic, partition) } } func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil { return nil, err } // load ca cert pool cacert, err := (cacertfile) if err != nil { return nil, err } cacertpool := () (cacert) // generate tlcconfig tlsConfig := {} = cacertpool = []{clientcert} () // = true // This can be used on test server if domain does not match cert: return &tlsConfig, err } func loopProducer(producer , topic string, partition int) { scanner := () ("> ") for () { text := () if text == "" { } else if text == "exit" || text == "quit" { break } else { () <- &{Topic: topic, Key: nil, Value: (text)} ("Produced message: [%s]\n",text) } ("> ") } } func loopConsumer(consumer , topic string, partition int) { partitionConsumer, err := (topic, int32(partition), ) if err != nil { (err) return } defer () for { msg := <-() ("Consumed message: [%s], offset: [%d]\n", , ) } }
Compilation:
$ go build
The above is all the content of this article. I hope it will be helpful to everyone's study and I hope everyone will support me more.