优化 PySpark 中的数据处理性能

PySpark 技术和策略解决常见的性能挑战:实践演练Apache Spark 近年来凭借其强大的分布式数据处理能力成为领先的分析引擎之一。PySpark 是 Spark 的 Python API,通常用于个人和企业项目以解决数据挑战。例如,我们可以使用 PySpark 高效地实现时间序列数据的特征工程,包括提取、提取和可视化。然而,尽管它能够处理大型数据集,但在极端数据分布和复杂的数据转换工作流等各种情况下仍然会出现性能瓶颈。本文将研究使用 Databricks 上的 PySpark 进行数据处理时的各种常见性能问题,并介绍各种微调策略以实现更快的执行速度。照片由 Veri Ivanova 在 Unsplash 上拍摄假设您开了一家在线零售店,提供各种产品,主要针对美国客户。您计划分析当前交易中的购买习惯,以满足更多现有客户的需求并服务更多新客户。这激励您在准备步骤中投入大量精力来处理交易记录。#0 模拟数据我们首先在 CSV 文件中模拟 100 万条交易记录(在实际大数据场景中肯定会处理更大的数据集)。每条记录都包括客户 ID、购买的产品和交易详细信息(例如付款方式)

来源:走向数据科学

假设您开了一家在线零售店,提供各种产品,主要针对美国客户。您计划分析当前交易的购买习惯,以满足更多现有客户的需求并服务更多新客户。这促使您在准备步骤中投入大量精力来处理交易记录。

#0 模拟数据

我们首先在 CSV 文件中模拟 100 万条交易记录(在真实的大数据场景中,肯定会处理更大的数据集)。每条记录都包含客户 ID、购买的产品以及交易详细信息,例如付款方式和总金额。值得一提的是,客户 ID 为 #100 的产品代理拥有大量客户群,因此占据了您店铺中代发货购买的很大一部分。

以下是演示此场景的代码:

import csvimport datetimeimport numpy as npimport random# 删除现有的“retail_transactions.csv”文件(如果有)! rm -f /p/a/t/h retail_transactions.csv# 设置交易数量和其他配置no_of_iterations = 1000000data = []csvFile = 'retail_transactions.csv'# 以写入模式打开文件with open(csvFile, 'w', newline='') as f: fieldnames = ['orderID', 'customerID', 'productID', 'state', 'paymentMthd', 'totalAmt', 'invoiceTime'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for num in range(no_of_iterations): # 使用随机值创建交易记录 new_txn = { 'orderID': num, 'customerID': random.choice([100, random.randint(1, 100000)]), 'productID': np.random.randint(10000, size=random.randint(1, 5)).tolist(), 'state': random.choice(['CA', 'TX', 'FL', 'NY', 'PA', 'OTHERS']), 'paymentMthd': random.choice(['信用卡', '借记卡', '数字钱包', '货到付款', '加密货币']), 'totalAmt': round(random.random() * 5000, 2), 'invoiceTime': datetime.datetime.now().isoformat() } data.append(new_txn) writer.writerows(data)