如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道( 二 )


基于功能的分层定价
Azure Event Hubs
Apache Kafka的高吞吐量事件摄取服务 。与Azure数据服务的集成 。
基于吞吐量单位
托管服务抽象了Kafka操作的复杂性 , 可以让用户专注数据管道 。
接下来,将使用Python、Kafka和云平台构建一个实时管道 。也可以参考以下的指南作为另一个示例 。
构建实时数据管道Kafka的基本实时管道有两个主要组件:向Kafka发布消息的生产者和订阅主题并处理消息的消费者 。
其架构遵循以下流程:

如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道

文章插图
为了进行简化,将使用Confluent Kafka Python客户端库 。
1. Python生产者生产者应用程序从数据源收集数据并将其发布到Kafka主题 。作为一个例子,假设有一个Python服务从一个Web应用程序收集用户点击流事件 。
在Web应用程序中,当用户的行为像是页面浏览或产品评级时,可以捕获这些事件并将它们发送给Kafka 。
可以抽象出Web应用程序如何收集数据的实现细节 。
Pythonfrom confluent_kafka import Producer import json # User event data event = { "timestamp": "2022-01-01T12:22:25","userid": "user123", "page": "/product123","action": "view" } # Convert to JSON event_json = json.dumps(event) # Kafka producer configurationconf = { 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'client.id': 'clickstream-producer'} # Create producer instance producer = Producer(conf) # Publish eventproducer.produce(topic='clickstream', value=https://www.isolves.com/it/sjk/bk/2024-01-26/event_json) # Flush and close producer producer.flush() producer.close()这将事件发布到云托管Kafka集群上的clickstream主题 。
Confluent_Kafka Python客户端在将消息发送到Kafka之前使用内部缓冲区来批处理消息 。与单独发送每条消息相比,这提高了效率 。
在默认情况下,消息会在缓冲区中累积 , 直到:
(1)已达到缓冲区大小限制(默认为32MB) 。
(2)调用flush()方法 。
当调用flush()时,缓冲区中的任何消息都会立即发送到Kafka代理 。
如果不调用flush(),而是依赖于缓冲区大小限制,那么在下一次自动刷新之前,如果发生故障,就有丢失事件的风险 。调用flush()能够更好地控制最小化潜在的消息丢失 。
但是 , 在每次生产后调用flush()会带来额外的开销 。找到合适的缓冲配置取决于特定的可靠性需求和吞吐量需求 。
可以在事件发生时不断添加事件来构建实时流 。这为下游数据消费者提供了连续的事件提要 。
2.Python消费者接下来,有一个消费者应用程序来从Kafka摄取事件并处理它们 。
例如,可能想要解析事件,筛选特定的子类型,并验证模式 。
Pythonfrom confluent_kafka import Consumer import json # Kafka consumer configuration conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092','group.id': 'clickstream-processor', 'auto.offset.reset': 'earliest'} # Create consumer instanceconsumer = Consumer(conf) # Subscribe to 'clickstream' topic consumer.subscribe(['clickstream']) # Poll Kafka for messages infinitelywhile True: msg = consumer.poll(1.0) if msg is None: continue# Parse JSON from message value event = json.loads(msg.value())# Process event based on business logic if event['action'] == 'view': print('User viewed product page')elif event['action'] == 'rating': # Validate rating, insert to DB etcpassprint(event) # Print event# Close consumer consumer.close()这个轮询clickstream主题以获取新消息,使用它们,并根据事件类型采取行动——打印、更新数据库等 。
对于一个简单的管道来说,这很有效 。但如果每秒事件数增加100倍呢?消费者将无法跟上其增长 。这就是像PySpark这样的工具可以帮助扩展处理的地方 。
3.使用PySpark进行扩展PySpark为Apache Spark提供了一个Python API,Apache Spark是一个为大规模数据处理优化的分布式计算框架 。
使用PySpark,可以利用Spark的内存计算和并行执行来更快地使用Kafka流 。
首先,将Kafka数据加载到DataFrame中,DataFrame可以使用Spark SQL或Python进行操作 。
Pythonfrom pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder.AppName('clickstream-consumer').getOrCreate() # Read stream from Kafka 'clickstream'df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "broker1:9092,broker2:9092").option("subscribe", "clickstream").load() # Parse JSON from value df = df.selectExpr("CAST(value AS STRING)") df = df.select(from_json(col("value"), schema).alias("data")) Next, we can express whatever processing logic we need using DataFrame transformations: from pyspark.sql.functions import * # Filter for 'page view' eventsviews = df.filter(col("data.action") == "view") # Count views per page URLcounts = views.groupBy(col("data.page")) .count() .orderBy("count") # Print the streamquery = counts.writeStream.outputMode("complete").format("console").start()query.awAItTermination()


推荐阅读