📢 转载信息
原文链接:https://www.kdnuggets.com/working-with-billion-row-datasets-in-python-using-vaex
原文作者:Shittu Olumide
Image by Author
# 引言
处理包含数十亿行的海量数据集是数据科学和分析中的一项重大挑战。像Pandas这样的传统工具非常适合能装入系统内存的中小型数据集,但随着数据集大小的增加,它们会变得缓慢,需要大量的随机存取内存(RAM)才能运行,并且经常因内存不足(OOM)错误而崩溃。
这时,Vaex——一个用于out-of-core(超出内存)数据处理的高性能Python库——就派上用场了。Vaex 允许您在标准笔记本电脑上也能高效且内存友好地检查、修改、可视化和分析大型表格数据集。
# 什么是 Vaex?
Vaex 是一个用于惰性、out-of-core DataFrames(类似于 Pandas)的 Python 库,专为处理大于您系统 RAM 容量的数据集而设计。
关键特性:
Vaex 通过直接在磁盘上操作数据并仅读取所需部分来高效处理海量数据集,避免将整个文件加载到内存中。
Vaex 使用惰性求值,这意味着操作只有在实际请求结果时才会被计算。此外,它可以通过内存映射即时打开列式数据库(它们按列而非按行存储数据),如 HDF5、Apache Arrow 和 Parquet。
Vaex 基于优化的 C/C++ 后端构建,每秒可以对数十亿行数据计算统计信息并执行操作,即使在普通硬件上也能实现快速的大规模分析。
它拥有类似 Pandas 的应用程序编程接口(API),为已经熟悉 Pandas 的用户提供了更平滑的过渡,帮助他们利用大数据能力,而无需陡峭的学习曲线。
# 比较 Vaex 和 Dask
Vaex 在整体上与 Dask 不完全相同,但与 Dask DataFrames 相似,后者是基于 Pandas DataFrames 构建的。这意味着 Dask 继承了某些 Pandas 的问题,例如在某些情况下需要将数据完全加载到 RAM 中进行处理。Vaex 则没有这种情况。Vaex 不会创建 DataFrame 副本,因此可以在主内存较少的机器上处理更大的 DataFrames。Vaex 和 Dask 都使用惰性处理。主要区别在于,Vaex 仅在需要时才计算字段,而使用 Dask 时,我们需要显式调用 compute() 函数。数据需要采用 HDF5 或 Apache Arrow 格式才能充分发挥 Vaex 的优势。
# 为什么传统工具会遇到困难
像 Pandas 这样的工具会在处理前将整个数据集加载到 RAM 中。对于大于内存的数据集,这会导致:
- 性能缓慢
- 系统崩溃(OOM 错误)
- 交互性受限
Vaex 从不将整个数据集加载到内存中;相反,它会:
- 从磁盘流式传输数据
- 使用虚拟列和惰性求值来延迟计算
- 仅在明确需要结果时才具体化(materialize)
这使得即使在普通硬件上也能对大型数据集进行分析。
# Vaex 的底层工作原理
// Out-of-Core 执行
Vaex 使用内存映射按需从磁盘读取数据。这使其能够处理远大于 RAM 容量的数据文件。
// 惰性求值 (Lazy Evaluation)
Vaex 不会立即执行每一步操作,而是构建一个计算图。只有当您请求结果时(例如打印或绘图时),计算才会被执行。
// 虚拟列 (Virtual Columns)
虚拟列是定义在数据集上的表达式,在计算之前不占用内存。这节省了 RAM 并加速了工作流程。
# Vaex 入门
// 安装 Vaex
创建一个干净的虚拟环境:
conda create -n vaex_demo python=3.9 conda activate vaex_demo
使用 pip 安装 Vaex:
pip install vaex-core vaex-hdf5 vaex-viz
升级 Vaex:
pip install --upgrade vaex
安装支持库:
pip install pandas numpy matplotlib
// 打开大型数据集
Vaex 支持各种流行的存储格式来处理大型数据集。它可以直接处理 HDF5、Apache Arrow 和 Parquet 文件,所有这些文件都经过优化,可以实现高效的磁盘访问和快速分析。虽然 Vaex 也可以读取 CSV 文件,但它需要先将它们转换为更高效的格式,以提高处理大型数据集时的性能。
如何打开 Parquet 文件:
import vaex
df = vaex.open("your_huge_dataset.parquet")
print(df)
现在,您可以在不将数据集加载到内存中的情况下检查数据集结构。
// Vaex 中的核心操作
数据筛选:
filtered = df[df.sales > 1000]
这不会立即计算结果;相反,筛选条件被注册,并且只在需要时应用。
分组和聚合:
result = df.groupby("category", agg=vaex.agg.mean("sales"))
print(result)
Vaex 使用并行算法和最少内存高效地计算聚合。
计算统计数据:
mean_price = df["price"].mean()
print(mean_price)
Vaex 通过分块扫描数据集来动态计算此值。
// 使用出租车数据集演示
我们将创建一个包含 5000 万行真实出租车数据集来演示 Vaex 的功能:
import vaex
import numpy as np
import pandas as pd
import time
设置随机种子以确保可重现性:
np.random.seed(42)
print("Creating 50 million row dataset...")
n = 50_000_000
生成真实的出租车行程数据:
data = {
'passenger_count': np.random.randint(1, 7, n),
'trip_distance': np.random.exponential(3, n),
'fare_amount': np.random.gamma(10, 1.5, n),
'tip_amount': np.random.gamma(2, 1, n),
'total_amount': np.random.gamma(12, 1.8, n),
'payment_type': np.random.choice(['credit', 'cash', 'mobile'], n),
'pickup_hour': np.random.randint(0, 24, n),
'pickup_day': np.random.randint(1, 8, n),
}
创建 Vaex DataFrame:
df_vaex = vaex.from_dict(data)
导出为 HDF5 格式(对 Vaex 高效):
df_vaex.export_hdf5('taxi_50M.hdf5')
print(f"Created dataset with {n:,} rows")
输出:
Shape: (50000000, 8)
Created dataset with 50,000,000 rows
我们现在有了一个包含 8 列的 5000 万行数据集。
// Vaex 与 Pandas 的性能对比
使用 Vaex 的内存映射打开大文件:
start = time.time()
df_vaex = vaex.open('taxi_50M.hdf5')
vaex_time = time.time() - start
print(f"Vaex opened {df_vaex.shape[0]:,} rows in {vaex_time:.4f} seconds")
print(f"Memory usage: ~0 MB (memory-mapped)")
输出:
Vaex opened 50,000,000 rows in 0.0199 seconds
Memory usage: ~0 MB (memory-mapped)
Pandas:加载到内存(请勿尝试用 50M 行执行此操作!):
# This would fail on most machines
df_pandas = pd.read_hdf('taxi_50M.hdf5')
这将导致内存错误!Vaex 打开文件几乎是瞬时的,无论文件大小如何,因为它不会将数据加载到内存中。
基本聚合:计算 5000 万行的统计数据:
start = time.time()
stats = {
'mean_fare': df_vaex.fare_amount.mean(),
'mean_distance': df_vaex.trip_distance.mean(),
'total_revenue': df_vaex.total_amount.sum(),
'max_fare': df_vaex.fare_amount.max(),
'min_fare': df_vaex.fare_amount.min(),
}
agg_time = time.time() - start
print(f"\nComputed 5 aggregations in {agg_time:.4f} seconds:")
print(f" Mean fare: ${stats['mean_fare']:.2f}")
print(f" Mean distance: {stats['mean_distance']:.2f} miles")
print(f" Total revenue: ${stats['total_revenue']:,.2f}")
print(f" Fare range: ${stats['min_fare']:.2f} - ${stats['max_fare']:.2f}")
输出:
Computed 5 aggregations in 0.8771 seconds:
Mean fare: $15.00
Mean distance: 3.00 miles
Total revenue: $1,080,035,827.27
Fare range: $1.25 - $55.30
筛选操作:筛选长途行程:
start = time.time()
long_trips = df_vaex[df_vaex.trip_distance > 10]
filter_time = time.time() - start
print(f"\nFiltered for trips > 10 miles in {filter_time:.4f} seconds")
print(f" Found: {len(long_trips):,} long trips")
print(f" Percentage: {(len(long_trips)/len(df_vaex)*100):.2f}%")
输出:
Filtered for trips > 10 miles in 0.0486 seconds
Found: 1,784,122 long trips
Percentage: 3.57%
多条件筛选:
start = time.time()
premium_trips = df_vaex[(df_vaex.trip_distance > 5) & (df_vaex.fare_amount > 20) & (df_vaex.payment_type == 'credit')]
multi_filter_time = time.time() - start
print(f"\nMultiple condition filter in {multi_filter_time:.4f} seconds")
print(f" Premium trips (>5mi, >$20, credit): {len(premium_trips):,}")
输出:
Multiple condition filter in 0.0582 seconds
Premium trips (>5mi, >$20, credit): 457,191
分组操作:
start = time.time()
by_payment = df_vaex.groupby('payment_type', agg={
'mean_fare': vaex.agg.mean('fare_amount'),
'mean_tip': vaex.agg.mean('tip_amount'),
'total_trips': vaex.agg.count(),
'total_revenue': vaex.agg.sum('total_amount')
})
groupby_time = time.time() - start
print(f"\nGroupBy operation in {groupby_time:.4f} seconds")
print(by_payment.to_pandas_df())
输出:
GroupBy operation in 5.6362 seconds
payment_type mean_fare mean_tip total_trips total_revenue
0 credit 15.001817 2.000065 16663623 3.599456e+08
1 mobile 15.001200 1.999679 16667691 3.600165e+08
2 cash 14.999397 2.000115 16668686 3.600737e+08
更复杂的 Group-by:
start = time.time()
by_hour = df_vaex.groupby('pickup_hour', agg={
'avg_distance': vaex.agg.mean('trip_distance'),
'avg_fare': vaex.agg.mean('fare_amount'),
'trip_count': vaex.agg.count()
})
complex_groupby_time = time.time() - start
print(f"\nGroupBy by hour in {complex_groupby_time:.4f} seconds")
print(by_hour.to_pandas_df().head(10))
输出:
GroupBy by hour in 1.6910 seconds
pickup_hour avg_distance avg_fare trip_count
0 0 2.998120 14.997462 2083481
1 1 3.000969 14.998814 2084650
2 2 3.003834 15.001777 2081962
3 3 3.001263 14.998196 2081715
4 4 2.998343 14.999593 2083882
5 5 2.997586 15.003988 2083421
6 6 2.999887 15.011615 2083213
7 7 3.000240 14.996892 2085156
8 8 3.002640 15.000326 2082704
9 9 2.999857 14.997857 2082284
// 高级 Vaex 特性
虚拟列(计算列)允许在不复制数据的情况下添加列:
df_vaex['tip_percentage'] = (df_vaex.tip_amount / df_vaex.fare_amount) * 100
df_vaex['is_generous_tipper'] = df_vaex.tip_percentage > 20
df_vaex['rush_hour'] = (df_vaex.pickup_hour >= 7) & (df_vaex.pickup_hour <= 9) | \
(df_vaex.pickup_hour >= 17) & (df_vaex.pickup_hour <= 19)
这些是在动态计算的,没有内存开销:
print("Added 3 virtual columns with zero memory overhead")
generous_tippers = df_vaex[df_vaex.is_generous_tipper]
print(f"Generous tippers (>20% tip): {len(generous_tippers):,}")
rush_hour_trips = df_vaex[df_vaex.rush_hour]
print(f"Rush hour trips: {len(rush_hour_trips):,}")
输出:
VIRTUAL COLUMNS
Added 3 virtual columns with zero memory overhead
Generous tippers (>20% tip): 11,997,433
Rush hour trips: 12,498,848
相关性分析:
corr = df_vaex.correlation(df_vaex.trip_distance, df_vaex.fare_amount)
print(f"Correlation (distance vs fare): {corr:.4f}")
百分位数:
try:
percentiles = df_vaex.percentile_approx('fare_amount', [25, 50, 75, 90, 95, 99])
except AttributeError:
percentiles = [
df_vaex.fare_amount.quantile(0.25),
df_vaex.fare_amount.quantile(0.50),
df_vaex.fare_amount.quantile(0.75),
df_vaex.fare_amount.quantile(0.90),
df_vaex.fare_amount.quantile(0.95),
df_vaex.fare_amount.quantile(0.99),
]
print(f"\nFare percentiles:")
print(f"25th: ${percentiles[0]:.2f}")
print(f"50th (median): ${percentiles[1]:.2f}")
print(f"75th: ${percentiles[2]:.2f}")
print(f"90th: ${percentiles[3]:.2f}")
print(f"95th: ${percentiles[4]:.2f}")
print(f"99th: ${percentiles[5]:.2f}")
标准差:
std_fare = df_vaex.fare_amount.std()
print(f"\nStandard deviation of fares: ${std_fare:.2f}")
其他有用统计数据:
print(f"\nAdditional statistics:")
print(f"Mean: ${df_vaex.fare_amount.mean():.2f}")
print(f"Min: ${df_vaex.fare_amount.min():.2f}")
print(f"Max: ${df_vaex.fare_amount.max():.2f}")
输出:
Correlation (distance vs fare): -0.0001
Fare percentiles:
25th: $11.57
50th (median): $nan
75th: $nan
90th: $nan
95th: $nan
99th: $nan
Standard deviation of fares: $4.74
Additional statistics:
Mean: $15.00
Min: $1.25
Max: $55.30
// 数据导出
# Export filtered data
high_value_trips = df_vaex[df_vaex.total_amount > 50]
导出到不同格式:
start = time.time()
high_value_trips.export_hdf5('high_value_trips.hdf5')
export_time = time.time() - start
print(f"Exported {len(high_value_trips):,} rows to HDF5 in {export_time:.4f}s")
您也可以导出到 CSV、Parquet 等:
high_value_trips.export_csv('high_value_trips.csv')
high_value_trips.export_parquet('high_value_trips.parquet')
输出:
Exported 13,054 rows to HDF5 in 5.4508s
性能总结仪表板
print("VAEX PERFORMANCE SUMMARY")
print(f"Dataset size: {n:,} rows")
print(f"File size on disk: ~2.4 GB")
print(f"RAM usage: ~0 MB (memory-mapped)")
print()
print(f"Open time: {vaex_time:.4f} seconds")
print(f"Single aggregation: {agg_time:.4f} seconds")
print(f"Simple filter: {filter_time:.4f} seconds")
print(f"Complex filter: {multi_filter_time:.4f} seconds")
print(f"GroupBy operation: {groupby_time:.4f} seconds")
print()
print(f"Throughput: ~{n/groupby_time:,.0f} rows/second")
输出:
VAEX PERFORMANCE SUMMARY
Dataset size: 50,000,000 rows
File size on disk: ~2.4 GB
RAM usage: ~0 MB (memory-mapped)
Open time: 0.0199 seconds
Single aggregation: 0.8771 seconds
Simple filter: 0.0486 seconds
Complex filter: 0.0582 seconds
GroupBy operation: 5.6362 seconds
Throughput: ~8,871,262 rows/second
# 总结思考
当您处理大于 1GB 且无法装入 RAM 的大型数据集、探索大数据、对数百万行进行特征工程或构建数据预处理管道时,Vaex 是理想选择。
对于小于 100MB 的数据集,您不应使用 Vaex。对于这些数据集,使用 Pandas 会更简单。如果您需要处理涉及多个表的复杂连接,使用结构化查询语言(SQL)数据库可能会更好。当您需要完整的 Pandas API 功能时,请注意 Vaex 的兼容性是有限的。对于实时流数据,其他工具更为合适。
Vaex 填补了 Python 数据科学生态系统中的一个空白:能够在不将所有内容加载到内存中的情况下,高效且交互式地处理包含数十亿行的数据集。其 out-of-core 架构、lazy execution 模型和优化算法使其成为强大的大数据探索工具,即使在笔记本电脑上也能发挥作用。无论您是在探索海量日志、科学调查还是高频时间序列,Vaex 都能帮助您弥合易用性与大数据可伸缩性之间的差距。
Shittu Olumide 是一位软件工程师和技术作家,热衷于利用尖端技术来构建引人入胜的叙事,对细节有敏锐的洞察力,并擅长简化复杂概念。您也可以在Twitter上找到 Shittu。
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区