本文最后更新于:2024年7月6日 早上
测试环境 由于是本地测试,我这里使用的是自己的专门用来测试的云服务器,参考我早前的博客Docker快速搭建Kafka开发环境 。
大概的情况如下:
负载情况:
服务器上暂时没有跑其他的服务,基本跑起来一个zookeeper和两个kafka的节点,已经占去了1.5G的内存。。。所以一般也是用完就给关了
aiokafka 安装:
注意:
1 aiokafka 需要 kafka-python 库.
快速开始 消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from aiokafka import AIOKafkaConsumerimport asyncio loop = asyncio.get_event_loop() KAFKAIP = "106.53.201.23" KAFKAPORT = 32775 async def consume (): consumer = AIOKafkaConsumer( 'my_topic' , 'my_other_topic' , loop=loop, bootstrap_servers=f'{KAFKAIP} :{KAFKAPORT} ' , group_id="my-group" ) await consumer.start() try : async for msg in consumer: print ("consumed: " , msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally : await consumer.stop() loop.run_until_complete(consume())
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from aiokafka import AIOKafkaProducerimport asyncioimport time loop = asyncio.get_event_loop() KAFKAIP = "106.53.201.23" KAFKAPORT = 32775 async def sender (producer: AIOKafkaProducer( ) ): await producer.send("my_topic" , b"Super message" )async def send_one (): producer = AIOKafkaProducer( loop=loop, bootstrap_servers=f'{KAFKAIP} :{KAFKAPORT} ' ) await producer.start() try : task_list = [] s_time = time.time() for i in range (1000 ): task_list.append(loop.create_task(sender(producer), )) await asyncio.wait(task_list) c_time = time.time() print (c_time - s_time) finally : await producer.stop() loop.run_until_complete(send_one())