python kafka 多线程消费者&手动提交实例

  • Post category:Python

下面是详细介绍Python Kafka多线程消费者和手动提交的实例教程。

一、准备工作

在开始示例前,需要先安装kafka-python,可以通过pip命令进行安装:

pip install kafka-python

除此之外还要准备好kafka的环境,以及建立kafka主题,确保程序能够连接到正确的Kafka服务器,topic和分区。

二、Python Kafka多线程消费者实例

在这个示例中,我们将创建一个多线程消费者,它将消息批量处理到数据库中。

from kafka import KafkaConsumer
from threading import Thread
import logging

class KafkaMultiThreadConsumer:
    def __init__(self, topic, group_id, servers, threads_num):
        self.topic = topic
        self.group_id = group_id
        self.servers = servers
        self.threads_num = threads_num
        self.consumer = KafkaConsumer(topic, group_id=group_id, bootstrap_servers=servers,enable_auto_commit=False)

    def process_message(self, records):
        for record in records:
            print("%-40s %-10d %-10d %150s" % (record.topic, record.partition, record.offset, record.value))

            #在此处加入你的数据处理逻辑

        self.consumer.commit()

    def start_consume_message(self):
        threads = []
        for i in range(self.threads_num):
            threads.append(Thread(target=self.consume_message))

        for t in threads:
            t.start()

        for t in threads:
            t.join()

    def consume_message(self):
        while True:
            try:
                records = self.consumer.poll(1000)
                if records:
                    self.process_message(records)
            except Exception as e:
                logging.error(e)

if __name__ == '__main__':
    multi_thread_consumer = KafkaMultiThreadConsumer('test', 'group1', 'localhost:9092', 2)
    multi_thread_consumer.start_consume_message()
  • KafkaMultiThreadConsumer通过构造器初始化,传入consumer需要的参数,具体的参数可以参考KafkaConsumer的文档
  • Thread类通过target参数来传入consume_message方法,最后启动所有的线程,调用start_consume_message方法,最终进入consume_message循环中,开始不断消费kafka消息。

三、Python Kafka手动提交实例

在这个示例中,我们将创建一个手动提交的消费者,它将在取出整个消息列表并成功处理了所有消息后,手动提交偏移量列表。这个需求是非常常见的,因为如果您的消费者没有处理完所有的消息并提交了偏移量,那么它将会在重启后重新处理已经处理过、但没有被提交偏移量的消息,这可能会导致重复消息的情况出现,甚至导致系统出现故障。

from kafka import KafkaConsumer
import time

if __name__ == "__main__":
    consumer = KafkaConsumer('test', 
                            bootstrap_servers=['localhost:9092'], 
                            group_id='group1',
                            enable_auto_commit=False,
                            auto_offset_reset='earliest')

    while True:
        try:
            message_lst = []
            for message in consumer:

                print("%-40s %-10d %-10d %150s" % (message.topic, message.partition, message.offset, message.value))
                # 在这里面处理您的消息列表

                message_lst.append(message)

            if message_lst:
                consumer.commit()
        except Exception as e:
            print(e)
            # sleep一段时间之后继续消费
            time.sleep(5)

  • 每当您接收一个消息时,将其添加到一个消息列表中,并在处理所有消息之后将其提交到kafka服务器。
  • 在捕获到异常时(如kafka服务器崩溃导致连接中断),程序将等待5秒后重试。

以上是关于Python Kafka多线程消费者和手动提交的示例,如果您有任何问题,请随时向我提问。