from kafka import KafkaProducer, KafkaConsumer
import json
# 创建一个Kafka生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # Kafka服务器地址
value_serializer=lambda x: json.dumps(x).encode('utf-8') # 序列化消息值为JSON格式
)
# 发送消息到指定的主题
producer.send('my-topic', value={'key': 'value'})
producer.flush() # 确保所有消息发送完成
producer.close() # 关闭生产者连接
# 创建一个Kafka消费者
consumer = KafkaConsumer(
'my-topic', # 订阅的主题
bootstrap_servers=['localhost:9092'], # Kafka服务器地址
auto_offset_reset='earliest', # 从最早的消息开始消费
enable_auto_commit=True, # 自动提交偏移量
group_id='my-group', # 消费者组ID
value_deserializer=lambda x: json.loads(x.decode('utf-8')) # 反序列化消息值为JSON格式
)
# 消费消息
for message in consumer:
print(f"Received message: {message.value}")
# 关闭消费者连接
consumer.close()
KafkaProducer:
bootstrap_servers
:Kafka集群的地址。value_serializer
:用于将消息值序列化为JSON格式,以便通过网络传输。发送消息:
send
方法向指定主题发送消息。flush
方法确保所有消息已发送。close
方法关闭生产者连接。KafkaConsumer:
auto_offset_reset
:当没有初始偏移量或偏移量超出范围时,从最早或最晚的消息开始消费。enable_auto_commit
:是否自动提交偏移量。group_id
:消费者组ID,用于协调多个消费者的负载均衡。value_deserializer
:用于将消息值反序列化为JSON格式。消费消息:
for
循环遍历消费者接收到的消息,并打印出来。close
方法关闭消费者连接。下一篇:编写一个python程序
Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3
Laravel 中文站