📢 转载信息
原文链接:https://www.kdnuggets.com/5-powerful-python-decorators-for-high-performance-data-pipelines
原文作者:Iván Palomares Carrascosa
#
导言在数据科学和机器学习项目中,数据管道是一种非常实用且通用的自动化数据处理工作流的方法。但有时我们的代码可能会给核心逻辑带来额外的复杂性。Python 装饰器可以克服这个常见挑战。本文介绍了五种有用且有效的 Python 装饰器,用于构建和优化高性能数据管道。
以下是用于数据管道的代码示例的前导代码,我已将加州住房数据集的一个版本提供在一个公共 GitHub 存储库中供您使用:
import pandas as pd import numpy as np # 加载数据集 DATA_URL = "https://raw.githubusercontent.com/gakudo-ai/open-datasets/main/housing.csv" print("Downloading data pipeline source...") df_pipeline = pd.read_csv(DATA_URL) print(f"Loaded {df_pipeline.shape[0]} rows and {df_pipeline.shape[1]} columns.")
#
1. JIT 编译虽然 Python 循环以其显著的缓慢和在对数据集进行复杂运算(如数学变换)时引起瓶颈而声名狼藉,但有一个快速的解决方案。它叫做 @njit,它是 Numba 库中的一个装饰器,可以在运行时将 Python 函数转换为类 C 语言的优化机器码。对于大型数据集和复杂的数据管道,这可以带来巨大的速度提升。
from numba import njit import time # 提取一个数值列作为 NumPy 数组以进行快速处理 incomes = df_pipeline['median_income'].fillna(0).values @njit def compute_complex_metric(income_array): result = np.zeros_like(income_array) # 在纯 Python 中,这样的循环通常会拖慢速度 for i in range(len(income_array)): result[i] = np.log1p(income_array[i] * 2.5) ** 1.5 return result start = time.time() df_pipeline['income_metric'] = compute_complex_metric(incomes) print(f"Processed array in {time.time() - start:.5f} seconds!")
#
2. 中间缓存当数据管道包含计算密集型的聚合或可能需要数分钟至数小时才能运行的数据连接时,可以使用 memory.cache 来序列化函数输出。在脚本重新启动或从崩溃中恢复的情况下,此装饰器可以从磁盘重新加载序列化的数组数据,跳过繁重的计算,从而节省资源和时间。
from joblib import Memory import time # 为管道构件创建一个本地缓存目录 memory = Memory(".pipeline_cache", verbose=0) @memory.cache def expensive_aggregation(df): print("Running heavy grouping operation...") time.sleep(1.5) # 模拟长时间运行的管道步骤 # 按 ocean_proximity 对数据点进行分组并计算属性级均值 return df.groupby('ocean_proximity', as_index=False).mean(numeric_only=True) # 第一次运行执行代码;第二次从磁盘加载以实现即时加载 agg_df = expensive_aggregation(df_pipeline) agg_df_cached = expensive_aggregation(df_pipeline)
#
3. 模式验证Pandera 是一个统计类型(模式验证)库,旨在防止由于数据质量差而导致的分析模型(如机器学习预测器或仪表板)的渐进式、微妙的损坏。在下面的示例中,它只需要与并行处理 Dask 库结合使用,以检查初始管道是否符合指定的模式。如果不符合,将引发错误,以帮助及早发现潜在问题。
import pandera as pa import pandas as pd import numpy as np from dask import delayed, compute # 定义一个模式以强制执行数据类型和有效范围 housing_schema = pa.DataFrameSchema({ "median_income": pa.Column(float, pa.Check.greater_than(0)), "total_rooms": pa.Column(float, pa.Check.gt(0)), "ocean_proximity": pa.Column(str, pa.Check.isin(['NEAR BAY', '<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'ISLAND'])) }) @delayed @pa.check_types def validate_and_process(df: pa.typing.DataFrame) -> pa.typing.DataFrame: """ 验证数据帧块是否符合定义的模式。如果数据损坏,Pandera 会引发 SchemaError。 """ return housing_schema.validate(df) # 将管道数据分成 4 个块以进行并行验证 chunks = np.array_split(df_pipeline, 4) lazy_validations = [validate_and_process(chunk) for chunk in chunks] print("Starting parallel schema validation...") try: # 触发 Dask 图以并行验证块 validated_chunks = compute(*lazy_validations) df_parallel = pd.concat(validated_chunks) print(f"Validation successful. Processed {len(df_parallel)} rows.") except pa.errors.SchemaError as e: print(f"Data Integrity Error: {e}")
#
4. 懒惰并行化顺序运行独立的管道步骤可能无法最佳利用 CPU 等处理单元。在这些转换函数之上使用 @delayed 装饰器可以构建一个依赖图,以便稍后以优化的方式并行执行任务,这有助于减少总体运行时间。
from dask import delayed, compute @delayed def process_chunk(df_chunk): # 模拟独立的转换任务 df_chunk_copy = df_chunk.copy() df_chunk_copy['value_per_room'] = df_chunk_copy['median_house_value'] / df_chunk_copy['total_rooms'] return df_chunk_copy # 将数据集分成 4 个并行处理的块 chunks = np.array_split(df_pipeline, 4) # 懒惰计算图(Dask 的工作方式!) lazy_results = [process_chunk(chunk) for chunk in chunks] # 同时跨多个 CPU 触发执行 processed_chunks = compute(*lazy_results) df_parallel = pd.concat(processed_chunks) print(f"Parallelized output shape: {df_parallel.shape}")
#
5. 内存分析@profile 装饰器旨在帮助检测无声的内存泄漏——这些泄漏有时可能导致服务器在处理大量文件时崩溃。该模式包括逐步监视包装的函数,观察每个步骤的 RAM 消耗水平或释放的内存。最终,这是轻松识别代码中的低效并以清晰的方向优化内存使用的好方法。
from memory_profiler import profile # 一个装饰函数,它会向控制台打印逐行内存细分 @profile(precision=2) def memory_intensive_step(df): print("Running memory diagnostics...") # 创建一个巨大的临时副本以引起故意内存峰值 df_temp = df.copy() df_temp['new_col'] = df_temp['total_bedrooms'] * 100 # 删除临时数据帧会释放 RAM del df_temp return df.dropna(subset=['total_bedrooms']) # 运行管道步骤:您可能会在终端中看到内存报告 final_df = memory_intensive_step(df_pipeline)
#
总结本文介绍了五种有用且强大的 Python 装饰器,用于优化计算成本高昂的数据管道。借助并行计算和 Dask、Numba 等高效处理库,这些装饰器不仅可以加速繁重的数据转换过程,还可以使其更具容错性和抗故障能力。
Iván Palomares Carrascosa 是人工智能、机器学习、深度学习和 LLM 领域的领导者、作家、演讲者和顾问。他培训和指导他人利用现实世界中的 AI。
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区