Laravel  
laravel
文档
数据库
架构
入门
php技术
    
Laravelphp
laravel / php / java / vue / mysql / linux / python / javascript / html / css / c++ / c#

python kafka

作者:断念已残   发布日期:2025-09-08   浏览:60

from kafka import KafkaProducer, KafkaConsumer
import json

# 创建一个Kafka生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Kafka服务器地址
    value_serializer=lambda x: json.dumps(x).encode('utf-8')  # 序列化消息值为JSON格式
)

# 发送消息到指定的主题
producer.send('my-topic', value={'key': 'value'})
producer.flush()  # 确保所有消息发送完成
producer.close()  # 关闭生产者连接

# 创建一个Kafka消费者
consumer = KafkaConsumer(
    'my-topic',  # 订阅的主题
    bootstrap_servers=['localhost:9092'],  # Kafka服务器地址
    auto_offset_reset='earliest',  # 从最早的消息开始消费
    enable_auto_commit=True,  # 自动提交偏移量
    group_id='my-group',  # 消费者组ID
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # 反序列化消息值为JSON格式
)

# 消费消息
for message in consumer:
    print(f"Received message: {message.value}")

# 关闭消费者连接
consumer.close()

解释说明:

  1. KafkaProducer:

    • bootstrap_servers:Kafka集群的地址。
    • value_serializer:用于将消息值序列化为JSON格式,以便通过网络传输。
  2. 发送消息:

    • 使用 send 方法向指定主题发送消息。
    • flush 方法确保所有消息已发送。
    • close 方法关闭生产者连接。
  3. KafkaConsumer:

    • auto_offset_reset:当没有初始偏移量或偏移量超出范围时,从最早或最晚的消息开始消费。
    • enable_auto_commit:是否自动提交偏移量。
    • group_id:消费者组ID,用于协调多个消费者的负载均衡。
    • value_deserializer:用于将消息值反序列化为JSON格式。
  4. 消费消息:

    • 使用 for 循环遍历消费者接收到的消息,并打印出来。
    • 最后使用 close 方法关闭消费者连接。

上一篇:reverse=true在python 中

下一篇:编写一个python程序

大家都在看

python时间格式

python读取文件路径

staticmethod在python中有

python开发windows应用程序

python中len是什么意思

python ord和chr

python中的yield

python自定义异常

python判断路径是否存在

python list.pop

Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3

Laravel 中文站