Install pykafka
pip install pykafka
I. Consuming kafka messages
#!/usr/bin/env python # -*- coding: utf-8 -*- from pykafka import KafkaClient from import OffsetType from vpn_data_handler import handler_data bootstrap_servers = '10.*.**.**:9092' group_id = 'test1' class KConsumer(object): """kafka consumer; dynamic parameter passing, not profile passing. kafka consumers should be kept on a different node from the producer as much as possible; otherwise it is easy to trap the program in a dead end loop. """ _encode = "UTF-8" def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None): """ initializationkafkaconsumers; 1. Setting the default kafka thematic, node address, consumer group id(Use the default value when not passed in) 2. When you need to set specific parameters you can do so directly in the kwargs pass directly into,Unwrap and pass in the original function; Args: topics: str; kafka Consumption Themes; bootstrap_server: list; kafka consumers地址; group_id: str; kafka consumers分组 id,The default is start_task 主要是接收并启动任务consumers,仅此一个consumer groupid; """ if bootstrap_server is None: bootstrap_server = bootstrap_servers = KafkaClient(hosts=bootstrap_server) # Select a topic to consume vpn_topic = [topics] = vpn_topic.get_simple_consumer(consumer_group=group_id, consumer_timeout_ms=200, auto_commit_enable=True,# Automatic submission of offsets auto_offset_reset=) #LATEST Get current offset latest news EARLIEST Get information from scratch def recv(self): """ Receive data from consumption Returns. """ return def main(): """ kafka consumption queue entry :param topic. :return. """ obj = KConsumer(topics="topics_name") while True: for message in (): data = eval(('utf-8')) handler_data(data) if __name__ == '__main__': main()
II. Producer push messages
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # Accepts multiple clients # View all topics # print() topic = ['test_78'] # Select a topic message = "test message2 test message2" with topic.get_sync_producer() as producer: (bytes(message, encoding='utf8')) #python3 needs to be coded print(message)
This article on python real-time access to kafka consumption queue information is introduced to this article, more related to python kafka consumption queue information please search my previous posts or continue to browse the following related articles I hope you will support me in the future more!