SoFunction
Updated on 2024-12-19

python real-time get kafka consumption queue information example details

Install pykafka

pip install pykafka

I. Consuming kafka messages

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from  import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
    """kafka consumer; dynamic parameter passing, not profile passing.
      kafka consumers should be kept on a different node from the producer as much as possible; otherwise it is easy to trap the program in a dead end loop.
     """
    _encode = "UTF-8"
    def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
        """ initializationkafkaconsumers;
           1. Setting the default kafka thematic, node address, consumer group id(Use the default value when not passed in)
           2. When you need to set specific parameters you can do so directly in the kwargs pass directly into,Unwrap and pass in the original function;
         Args:
           topics: str; kafka Consumption Themes;
           bootstrap_server: list; kafka consumers地址;
           group_id: str; kafka consumers分组 id,The default is start_task 主要是接收并启动任务consumers,仅此一个consumer groupid;
         """
        if bootstrap_server is None:
            bootstrap_server = bootstrap_servers
         = KafkaClient(hosts=bootstrap_server)
        # Select a topic to consume
        vpn_topic = [topics]
         = vpn_topic.get_simple_consumer(consumer_group=group_id,
                                                      consumer_timeout_ms=200,
                                                      auto_commit_enable=True,# Automatic submission of offsets
                                                      auto_offset_reset=)  #LATEST Get current offset latest news EARLIEST Get information from scratch
    def recv(self):
        """
         Receive data from consumption
         Returns.
         """
        return 
def main():
    """
    kafka consumption queue entry
    :param topic.
    :return.
    """
    obj = KConsumer(topics="topics_name")
    while True:
        for message in ():
            data = eval(('utf-8'))
            handler_data(data)
if __name__ == '__main__':
    main()

II. Producer push messages

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="10.XX0.XX0.XX4:9092")  # Accepts multiple clients
# View all topics
# print()
topic = ['test_78'] # Select a topic
message = "test message2 test message2"
with topic.get_sync_producer() as producer:
    (bytes(message, encoding='utf8')) #python3 needs to be coded
    print(message)

This article on python real-time access to kafka consumption queue information is introduced to this article, more related to python kafka consumption queue information please search my previous posts or continue to browse the following related articles I hope you will support me in the future more!