📢 转载信息
原文链接:https://www.kdnuggets.com/building-your-modern-data-analytics-stack-with-python-parquet-and-duckdb
原文作者:Bala Priya C
Image by Author
二级标题 # 引言
近年来,数据分析的方式发生了变化。将所有数据加载到关系型数据库中并运行 SQL 查询的传统方法仍然有效,但对于某些分析工作负载来说,它通常是“杀鸡用牛刀”。将数据存储在 Parquet 文件中,并使用 DuckDB 直接查询它们,速度更快、更简单、更有效。
在本文中,我将向您展示如何在 Python 中构建一个数据分析堆栈,该堆栈使用 DuckDB 来查询存储在 Parquet 文件中的数据。我们将使用一个示例数据集,探讨每个组件的工作原理,并了解为什么这种方法对您的数据科学项目可能很有用。
二级标题 # 先决条件
在开始之前,请确保您已安装:
- Python 3.10 或更高版本
- 对 SQL 基础知识和 pandas DataFrame 操作的理解
- 熟悉数据分析概念
另外,安装所需的库:
pip install duckdb pandas pyarrow numpy faker
二级标题 # 理解推荐的数据分析堆栈
让我们从理解每个组件的作用以及它们为何能很好地协同工作开始。
Parquet 是一种列式存储格式,最初是为 Hadoop 生态系统创建的。与 CSV 等基于行的格式(其中每一行是一个完整的记录)不同,Parquet 按列组织数据。这似乎是一个微小的差异,但它对分析有着巨大的影响。
当您运行一个查询,而该查询只需要一个包含五十列的表中的三列时,Parquet 允许您只读取这三列。使用 CSV,您必须完整地读取每一行,然后丢弃您不需要的 47 列。这使得 Parquet 对于典型的分析查询更快。此外,列式存储具有良好的压缩性,因为同一列中的值倾向于相似。
DuckDB 是一个嵌入式分析数据库。虽然 SQLite 针对需要大量小规模读写的事务性工作负载进行了优化,但 DuckDB 专为需要扫描大量数据、聚合和连接的分析查询而设计。嵌入式意味着它在您的 Python 进程内部运行,因此无需安装或管理单独的数据库服务器。
DuckDB 在分析方面的独特之处在于它可以直接查询 Parquet 文件。您无需首先将数据导入数据库。将 DuckDB 指向一个 Parquet 文件,编写 SQL,它只会读取需要的部分。这种“原地查询”功能是使整个堆栈有用的关键。
您可以在 Python 开发环境中使用它。您将数据存储在 Parquet 文件中,pandas 处理数据操作,DuckDB 执行分析查询,并且整个 Python 生态系统可用于可视化、机器学习和自动化。
二级标题 # 创建示例数据集
我们将使用一个电子商务数据集。您可以使用 data_generator.py 脚本来生成示例数据集,或者 遵循此 notebook。
该数据集包括下订单的客户、包含多个项目的订单以及具有类别和定价的产品。
数据具有 参照完整性。每笔订单都引用一个有效的客户,每笔订单项都引用一个有效的订单和一个产品。这使我们能够执行有意义的连接和聚合。
二级标题 # 将数据保存到 Parquet 文件
在保存数据之前,让我们了解为什么 Parquet 对分析有效。我们已经讨论了 Parquet 等列式存储格式的优势,但让我们再详细回顾一下。
在 CSV 文件中,数据是逐行存储的。如果您有一百万行,每行有 50 列,而您只想分析其中一列,那么您仍然需要读取所有五千万个值才能跳过不需要的列。这很浪费。
如我们所知,Parquet 按列存储数据。一列的所有值都存储在一起。当您查询一列时,您只读取该列,不读取任何其他内容。对于通常只涉及少量列的分析查询,这要快得多。
列式存储的压缩效果也更好。同一列中的值通常相似——它们通常都是整数、日期,或者都来自同一分类集。压缩算法对相似数据的处理效果比对随机数据的处理效果要好得多。
让我们将数据保存为 Parquet 并查看优势:
# Save tables as Parquet files customers_df.to_parquet('customers.parquet', engine='pyarrow', compression='snappy') products_df.to_parquet('products.parquet', engine='pyarrow', compression='snappy') orders_df.to_parquet('orders.parquet', engine='pyarrow', compression='snappy') order_items_df.to_parquet('order_items.parquet', engine='pyarrow', compression='snappy') # Compare with CSV to see the difference customers_df.to_csv('customers.csv', index=False) orders_df.to_csv('orders.csv', index=False) import os def get_size_mb(filename): return os.path.getsize(filename) / (1024 * 1024) print("Storage Comparison:") print(f"customers.csv: {get_size_mb('customers.csv'):.2f} MB") print(f"customers.parquet: {get_size_mb('customers.parquet'):.2f} MB") print(f"Savings: {(1 - get_size_mb('customers.parquet')/get_size_mb('customers.csv'))*100:.1f}%\n") print(f"orders.csv: {get_size_mb('orders.csv'):.2f} MB") print(f"orders.parquet: {get_size_mb('orders.parquet'):.2f} MB") print(f"Savings: {(1 - get_size_mb('orders.parquet')/get_size_mb('orders.csv'))*100:.1f}%")
输出:
Storage Comparison: customers.csv: 0.73 MB customers.parquet: 0.38 MB Savings: 48.5% orders.csv: 3.01 MB orders.parquet: 1.25 MB Savings: 58.5%
这些压缩率是典型的。与 CSV 相比,Parquet 通常能实现更好的压缩。我们在这里使用的压缩是 Snappy,它优先考虑速度而不是最大压缩率。
注意:Parquet 支持其他编解码器,例如 Gzip,它能提供更好的压缩但速度较慢,以及 Zstd,它在压缩和速度之间取得了良好的平衡。
二级标题 # 使用 DuckDB 查询 Parquet 文件
现在是最有趣的部分。我们可以直接使用 SQL 查询这些 Parquet 文件,而无需首先将它们加载到数据库中。
import duckdb # Create a DuckDB connection con = duckdb.connect(database=':memory:') # Query the Parquet file directly query = """ SELECT customer_segment, COUNT(*) as num_customers, COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () as percentage FROM 'customers.parquet' GROUP BY customer_segment ORDER BY num_customers DESC """ result = con.execute(query).fetchdf() print("Customer Distribution:") print(result)
输出:
Customer Distribution: customer_segment num_customers percentage 0 Standard 5070 50.70 1 Basic 2887 28.87 2 Premium 2043 20.43
请看查询语法:FROM 'customers.parquet'。DuckDB 直接读取文件。没有导入步骤,没有 CREATE TABLE 语句,也不需要等待数据加载。您编写 SQL,DuckDB 找出它需要从文件中读取的内容,然后返回结果。
在传统的工作流程中,您需要创建数据库、定义模式、导入数据、创建索引,然后才能查询。使用 DuckDB 和 Parquet,您可以跳过所有这些步骤。在底层,DuckDB 读取 Parquet 文件元数据以了解架构,然后使用谓词下推(predicate pushdown)来跳过读取不符合您 WHERE 子句的数据。它只读取查询实际使用的列。对于大文件,这使得查询超级快。
二级标题 # 执行复杂分析
让我们运行一个稍微复杂一点的分析查询。我们将按客户细分来分析每月的收入趋势。
query = """ SELECT strftime(o.order_date, '%Y-%m') as month, c.customer_segment, COUNT(DISTINCT o.order_id) as num_orders, COUNT(DISTINCT o.customer_id) as unique_customers, ROUND(SUM(o.order_total), 2) as total_revenue, ROUND(AVG(o.order_total), 2) as avg_order_value FROM 'orders.parquet' AS o JOIN 'customers.parquet' AS c ON o.customer_id = c.customer_id WHERE o.payment_status = 'completed' GROUP BY month, c.customer_segment ORDER BY month DESC, total_revenue DESC LIMIT 15 """ monthly_revenue = con.execute(query).fetchdf() print("Recent Monthly Revenue by Segment:") print(monthly_revenue.to_string(index=False))
输出:
Recent Monthly Revenue by Segment: month customer_segment num_orders unique_customers total_revenue avg_order_value 2026-01 Standard 2600 1468 1683223.68 647.39 2026-01 Basic 1585 857 1031126.44 650.55 2026-01 Premium 970 560 914105.61 942.38 2025-12 Standard 2254 1571 1533076.22 680.16 2025-12 Premium 885 613 921775.85 1041.55 2025-12 Basic 1297 876 889270.86 685.64 2025-11 Standard 1795 1359 1241006.08 691.37 2025-11 Premium 725 554 717625.75 989.83 2025-11 Basic 1012 767 682270.44 674.18 2025-10 Standard 1646 1296 1118400.61 679.47 2025-10 Premium 702 550 695913.24 991.33 2025-10 Basic 988 769 688428.86 696.79 2025-09 Standard 1446 1181 970017.17 670.83 2025-09 Premium 594 485 577486.81 972.20 2025-09 Basic 750 618 495726.69 660.97
此查询按两个维度(月份和细分市场)分组,聚合多个指标,并按付款状态进行过滤。这是分析工作中经常需要编写的查询类型。strftime 函数直接在 SQL 中格式化日期。ROUND 函数用于清理小数位。多个聚合高效运行并给出预期的结果。
二级标题 # 连接多个表
真正的分析很少只涉及一个表。让我们连接这些表来回答一个业务问题:哪些产品类别的收入最高,以及这在客户细分市场中如何变化?
query = """ SELECT p.category, c.customer_segment, COUNT(DISTINCT oi.order_id) as num_orders, SUM(oi.quantity) as units_sold, ROUND(SUM(oi.item_total), 2) as total_revenue, ROUND(AVG(oi.item_total), 2) as avg_item_value FROM 'order_items.parquet' oi JOIN 'orders.parquet' o ON oi.order_id = o.order_id JOIN 'products.parquet' p ON oi.product_id = p.product_id JOIN 'customers.parquet' c ON o.customer_id = c.customer_id WHERE o.payment_status = 'completed' GROUP BY p.category, c.customer_segment ORDER BY total_revenue DESC LIMIT 20 """ category_analysis = con.execute(query).fetchdf() print("Revenue by Category and Customer Segment:") print(category_analysis.to_string(index=False))
截断的输出:
Revenue by Category and Customer Segment: category customer_segment num_orders units_sold total_revenue avg_item_value Electronics Standard 4729 6431.0 6638814.75 1299.18 Electronics Premium 2597 3723.0 3816429.62 1292.39 Electronics Basic 2685 3566.0 3585652.92 1240.28 Automotive Standard 4506 5926.0 3050679.12 633.18 Sports Standard 5049 6898.0 2745487.54 497.55 ... ... Clothing Premium 3028 4342.0 400704.25 114.55 Clothing Basic 3102 4285.0 400391.18 117.49 Books Standard 6196 8511.0 252357.39 36.74
此查询连接了三个表。DuckDB 自动确定最佳的连接顺序和执行策略。请注意 SQL 的可读性与等效的 pandas 代码相比如何。对于复杂的分析逻辑,SQL 通常比 DataFrame 操作更能清晰地表达意图。
二级标题 # 理解查询性能
让我们将 DuckDB 与 pandas 在一项常见的分析任务中进行比较。
三级标题 // 方法 1:使用 Pandas
import time # Analytical task: Calculate customer purchase patterns print("Performance Comparison: Customer Purchase Analysis\n") start_time = time.time() # Merge dataframes merged = order_items_df.merge(orders_df, on='order_id') merged = merged.merge(products_df, on='product_id') # Filter completed orders completed = merged[merged['payment_status'] == 'completed'] # Group and aggregate customer_patterns = completed.groupby('customer_id').agg({ 'order_id': 'nunique', 'product_id': 'nunique', 'item_total': ['sum', 'mean'], 'category': lambda x: x.mode()[0] if len(x) > 0 else None }) customer_patterns.columns = ['num_orders', 'unique_products', 'total_spent', 'avg_spent', 'favorite_category'] customer_patterns = customer_patterns.sort_values('total_spent', ascending=False).head(100) pandas_time = time.time() - start_time
三级标题 // 方法 2:使用 DuckDB
start_time = time.time() query = """ SELECT o.customer_id, COUNT(DISTINCT oi.order_id) as num_orders, COUNT(DISTINCT oi.product_id) as unique_products, ROUND(SUM(oi.item_total), 2) as total_spent, ROUND(AVG(oi.item_total), 2) as avg_spent, MODE(p.category) as favorite_category FROM 'order_items.parquet' oi JOIN 'orders.parquet' o ON oi.order_id = o.order_id JOIN 'products.parquet' p ON oi.product_id = p.product_id WHERE o.payment_status = 'completed' GROUP BY o.customer_id ORDER BY total_spent DESC LIMIT 100 """ duckdb_result = con.execute(query).fetchdf() duckdb_time = time.time() - start_time print(f"Pandas execution time: {pandas_time:.4f} seconds") print(f"DuckDB execution time: {duckdb_time:.4f} seconds") print(f"Speedup: {pandas_time/duckdb_time:.1f}x faster with DuckDB\n") print("Top 5 customers by total spent:") print(duckdb_result.head().to_string(index=False))
输出:
Performance Comparison: Customer Purchase Analysis Pandas execution time: 1.9872 seconds DuckDB execution time: 0.1171 seconds Speedup: 17.0x faster with DuckDB
Top 5 customers by total spent: customer_id num_orders unique_products total_spent avg_spent favorite_category 8747 8 24 21103.21 879.30 Electronics 617 9 27 19596.22 725.79 Electronics 2579 9 18 17011.30 895.33 Sports 6242 7 23 16781.11 729.61 Electronics 5443 8 22 16697.02 758.96 Automotive
DuckDB 的速度快了大约 17 倍。随着数据集的增大,性能差距会更加明显。Pandas 方法将所有数据加载到内存中,执行多次合并操作(这会创建副本),然后进行聚合。DuckDB 直接从 Parquet 文件读取,下推过滤器以避免读取不必要的数据,并使用优化的连接算法。
二级标题 # 构建可重用的分析查询
在生产分析中,您会反复运行类似的查询,只是参数不同。让我们构建一个遵循此工作流程最佳实践的可重用函数。
def analyze_product_performance(con, category=None, min_revenue=None, date_from=None, top_n=20): """ 分析具有灵活过滤的产品性能。这展示了如何构建可参数化以适应不同用例的可重用分析查询。在生产环境中,您会为此类常见分析问题构建一个函数库。 """ # 根据参数动态构建 WHERE 子句 where_clauses = ["o.payment_status = 'completed'"] if category: where_clauses.append(f"p.category = '{category}'") if date_from: where_clauses.append(f"o.order_date >= '{date_from}'") where_clause = " AND ".join(where_clauses) # 主要分析查询 query = f""" WITH product_metrics AS ( SELECT p.product_id, p.product_name, p.category, p.base_price, COUNT(DISTINCT oi.order_id) as times_ordered, SUM(oi.quantity) as units_sold, ROUND(SUM(oi.item_total), 2) as total_revenue, ROUND(AVG(oi.unit_price), 2) as avg_selling_price, ROUND(SUM(oi.item_total) - (p.cost * SUM(oi.quantity)), 2) as profit FROM 'order_items.parquet' oi JOIN 'orders.parquet' o ON oi.order_id = o.order_id JOIN 'products.parquet' p ON oi.product_id = p.product_id WHERE {where_clause} GROUP BY p.product_id, p.product_name, p.category, p.base_price, p.cost ) SELECT *, ROUND(100.0 * profit / total_revenue, 2) as profit_margin_pct, ROUND(avg_selling_price / base_price, 2) as price_realization FROM product_metrics """ # 如果指定了收入过滤器,则添加 if min_revenue: query += f" WHERE total_revenue >= {min_revenue}" query += f""" ORDER BY total_revenue DESC LIMIT {top_n} """ return con.execute(query).fetchdf()
此函数执行以下操作:首先,它根据参数动态构建 SQL,允许灵活过滤,而无需为每种情况编写单独的查询。其次,它使用 公用表表达式 (CTE) 将复杂逻辑组织成易于阅读的步骤。第三,它计算派生指标,如利润率和价格实现率,这些指标需要多个源列。
利润计算通过同时使用订单项和产品表中的数据来减去成本。这种跨表的计算在 SQL 中很直接,但在使用多个 pandas 操作时会很麻烦。DuckDB 在单个查询中高效地处理了它。
这是一个使用上述函数的示例:
# Example 1: Top electronics products electronics = analyze_product_performance(con, category='Electronics', top_n=10) print("Top 10 Electronics Products:") print(electronics[['product_name', 'units_sold', 'total_revenue', 'profit_margin_pct']].to_string(index=False))
输出:
Top 10 Electronics Products: product_name units_sold total_revenue profit_margin_pct Electronics Item 113 262.0 510331.81 38.57 Electronics Item 154 289.0 486307.74 38.28 Electronics Item 122 229.0 448680.64 38.88 Electronics Item 472 251.0 444680.20 38.51 Electronics Item 368 222.0 424057.14 38.96 Electronics Item 241 219.0 407648.10 38.75 Electronics Item 410 243.0 400078.65 38.31 Electronics Item 104 233.0 400036.84 38.73 Electronics Item 2 213.0 382583.85 38.76 Electronics Item 341 240.0 376722.94 38.94
这是另一个示例:
# Example 2: High-revenue products across all categories print("\n\nHigh-Revenue Products (>$50k revenue):") high_revenue = analyze_product_performance(con, min_revenue=50000, top_n=10) print(high_revenue[['product_name', 'category', 'total_revenue', 'profit']].to_string(index=False))
输出:
High-Revenue Products (>$50k revenue): product_name category total_revenue profit Electronics Item 113 Electronics 510331.81 196846.19 Electronics Item 154 Electronics 486307.74 186140.78 Electronics Item 122 Electronics 448680.64 174439.40 Electronics Item 472 Electronics 444680.20 171240.80 Electronics Item 368 Electronics 424057.14 165194.04 Electronics Item 241 Electronics 407648.10 157955.25 Electronics Item 410 Electronics 400078.65 153270.84 Electronics Item 104 Electronics 400036.84 154953.46 Electronics Item 2 Electronics 382583.85 148305.15 Electronics Item 341 Electronics 376722.94 146682.94
二级标题 # 总结
在本文中,我们分析了电子商务数据。我们生成了关系型数据,将其存储为 Parquet,并使用 DuckDB 进行查询。性能比较显示与传统的 pandas 方法相比,速度得到了显著提升。
当您执行分析工作负载处理结构化数据时,请使用此堆栈。如果您正在聚合、过滤、连接和计算指标,这将非常有用。它适用于以批处理而不是持续更新方式变化的数据。如果您正在分析昨天的销售额、处理月度报告或探索历史趋势,定期更新的 Parquet 文件效果很好。您不需要一个持续接受写入的实时数据库。
然而,此堆栈并非适用于所有场景:
- 如果您需要具有许多并发写入者的实时更新,则需要一个具有 ACID 事务的传统数据库
- 如果您正在构建需要毫秒级响应的用户查询应用程序,则索引数据库更好
- 如果多个用户需要同时以不同的访问权限进行查询,数据库服务器可以提供更好的控制
最佳应用场景是处理大型数据集的分析工作,其中数据更新是定期进行的,并且您需要快速、灵活的查询和分析能力。
祝您分析愉快!
Bala Priya C 是来自印度的一名开发人员和技术作家。她喜欢在数学、编程、数据科学和内容创作的交叉点工作。她的兴趣和专业领域包括 DevOps、数据科学和自然语言处理。她喜欢阅读、写作、编码和咖啡!目前,她正在通过撰写教程、操作指南、观点文章等方式学习并将自己的知识与开发者社区分享。Bala 还创建了引人入胜的资源概览和编码教程。
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区