📢 转载信息
原文链接:https://www.freecodecamp.org/news/how-to-build-end-to-end-machine-learning-lineage/
原文作者:freeCodeCamp
在任何稳健的机器学习(ML)系统中,机器学习血缘(ML Lineage)都是至关重要的。它允许您跟踪数据和模型的版本,从而确保可复现性、可审计性和合规性。
尽管存在许多用于跟踪ML血缘的服务,但创建一个全面且易于管理的血缘系统往往非常复杂。
在本文中,我将引导您为部署在AWS Lambda无服务器架构上的ML应用程序集成一个全面的ML血缘解决方案,涵盖端到端的管道阶段:
-
ETL管道
-
数据漂移检测
-
预处理
-
模型调优
-
风险和公平性评估。
目录
先决条件:
-
了解关键的机器学习/深度学习概念,包括完整的生命周期:数据处理、模型训练、调优和验证。
-
精通 Python,并有使用主流ML库的经验。
-
对 DevOps 原理有基本了解。
我们将使用的工具:
以下是我们用来跟踪ML血缘的工具总结:
-
DVC (Data Version Control):一个用于数据的开源版本系统。用于跟踪ML血缘。
-
AWS S3:来自AWS的安全对象存储服务。用作远程存储。
-
Evently AI:一个开源的ML和LLM可观测性框架。用于检测数据漂移。
-
Prefect:一个工作流编排引擎。用于管理血缘的定时运行。
什么是机器学习血缘?
机器学习(ML)血缘是一个用于跟踪和理解机器学习模型完整生命周期的框架。
它包含不同层面的信息,例如:
-
代码: 用于模型训练的脚本、库和配置。
-
数据: 原始数据、转换和特征。
-
实验: 训练运行、超参数调优结果。
-
模型: 训练好的模型及其版本。
-
预测: 已部署模型的输出。
ML血缘在多个方面至关重要:
-
可复现性: 重新创建相同的模型和预测以进行验证。
-
根本原因分析: 当模型在生产中出现故障时,追溯到数据、代码或配置的更改。
-
合规性: 一些受监管的行业要求提供模型训练的证明,以确保公平性、透明度以及遵守GDPR和欧盟《人工智能法案》等法律。
我们将构建什么
在这个项目中,我将使用DVC(机器学习应用的开源版本控制系统),将ML血缘集成到这个基于AWS Lambda架构的价格预测系统中。
下图说明了我们将集成的系统架构和ML血缘:
图 A: 针对无服务器Lambda上ML应用的全面ML血缘(由Kuriko IWAI创建)
系统架构:零售商的AI定价
该系统作为一个容器化的无服务器微服务运行,旨在提供最佳价格建议以最大化零售商的销售额。
其核心智能来自基于历史购买数据训练的AI模型,用于预测在不同价格下产品的销售数量,从而帮助卖家确定最佳价格。
为了实现一致的部署,预测逻辑及其依赖项被打包成一个Docker容器镜像并存储在AWS ECR(Elastic Container Registry)中。
然后,预测服务由AWS Lambda函数提供,该函数从ECR检索并运行容器,并通过AWS API Gateway将结果暴露给Flask应用程序进行消费。
如果您想了解如何从头开始构建此系统,可以参考我的教程如何在无服务器架构上构建机器学习系统。
ML血缘
在系统中,GitHub负责代码血缘,而DVC捕获以下血缘:
-
数据(蓝色方框):ETL和预处理。
-
实验(浅橙色):超参数调优和验证。
-
模型和预测(深橙色):最终模型工件和预测结果。
DVC通过从数据提取到公平性测试(图A中的黄色行)的独立阶段来跟踪血缘。
对于每个阶段,DVC使用MD5或SHA256哈希来跟踪元数据(如工件、指标和报告),并将它们推送到其在AWS S3上的远程存储。
该管道整合了Evently AI来处理数据漂移测试,这对于识别可能影响模型在生产中泛化能力的分布变化至关重要。
只有成功通过数据漂移和公平性测试的模型才能通过AWS API网关提供预测(图A中的红色框)。
最后,整个血缘过程由开源工作流调度器Prefect每周触发。
Prefect会提示DVC检查数据和脚本的更新,并在检测到更改时执行完整的血缘过程。
工作流实战
构建过程涉及五个主要步骤:
-
初始化 DVC 项目
-
使用 DVC 脚本
dvc.yaml
和相应的 Python 脚本定义血缘阶段 -
部署 DVC 项目
-
使用 Prefect 配置定时运行
-
部署应用程序
让我们一起遍历每一步。
第 1 步:初始化 DVC 项目
第一步是初始化一个DVC项目:
$dvc init
此命令会自动在项目文件夹的根目录创建一个.dvc
目录:
.
.dvc/
│
└── cache/ # [.gitignore] 存储dvc缓存(缓存实际数据文件)
└── tmp/ # [.gitignore]
└── .gitignore # gitignore 缓存, tmp, 和 config.local
└── config # 生产环境的dvc配置
└── config.local # [.gitignore] 本地dvc配置
DVC通过将原始数据与大文件从存储库中分离出来,来维护一个快速、轻量级的Git存储库。
该过程涉及将原始数据缓存到本地.dvc/cache
目录中,创建一个包含MD5哈希和指向原始文件路径的微小.dvc
元数据文件,将仅小的元数据文件推送到Git,并将原始数据推送到DVC远程存储。
第 2 步:ML 血缘实现
接下来,我们将使用以下阶段配置ML血缘:
-
etl_pipeline
:提取、清洗、插补原始数据并执行特征工程。 -
data_drift_check
:运行数据漂移测试。如果失败,系统退出。 -
preprocess
:创建训练、验证和测试数据集。 -
tune_primary_model
:调优超参数并训练模型。 -
inference_primary_model
:在测试数据集上执行推理。 -
assess_model_risk
:运行风险和公平性测试。
每个阶段都需要定义DVC命令及其对应的Python脚本。
让我们开始吧。
阶段 1:ETL 管道
第一个阶段是提取、清洗、插补原始数据并执行特征工程。
DVC 配置
我们在项目目录的根目录创建dvc.yaml
文件,并添加etl_pipeline
阶段:
dvc.yaml
stages: etl_pipeline: # dvc在此阶段将运行的主要命令 cmd: python src/data_handling/etl_pipeline.py # 运行主命令所必需的依赖项 deps: - src/data_handling/etl_pipeline.py - src/data_handling/ - src/_utils/ # dvc跟踪的输出路径 outs: - data/original_df.parquet - data/processed_df.parquet
dvc.yaml
文件定义了一个阶段序列,使用如下的段落:
-
cmd
:为该阶段执行的shell命令 -
deps
:运行cmd
所需的依赖项 -
prams
:在params.yaml
文件中定义的cmd
的默认参数 -
metrics
:要跟踪的指标文件 -
reports
:要跟踪的报告文件 -
plots
:DVC绘图文件,用于可视化 -
outs
:cmd
产生的输出文件,DVC将对其进行跟踪
该配置通过明确列出每个阶段的依赖项、输出和命令,帮助DVC确保可复现性。它还通过建立工作流的有向无环图(DAG),将每个阶段链接到下一个阶段,从而帮助管理血缘。
Python 脚本
接下来,我们添加Python脚本,确保数据使用dvc.yaml
中outs
部分指定的_文件路径进行存储:
src/data_handling/etl_pipeline.py
:
import os
import argparse
import src.data_handling.scripts as scripts
from src._utils import main_logger
def etl_pipeline():
# 提取所有数据
df = scripts.extract_original_dataframe()
# 保存原始parquet文件
ORIGINAL_DF_PATH = os.path.join('data', 'original_df.parquet')
df.to_parquet(ORIGINAL_DF_PATH, index=False)
# dvc 跟踪
# 转换
df = scripts.structure_missing_values(df=df)
df = scripts.handle_feature_engineering(df=df)
PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
df.to_parquet(PROCESSED_DF_PATH, index=False)
# dvc 跟踪
return df
# 用于dvc执行
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="run etl pipeline")
parser.add_argument('--stockcode', type=str, default='', help="specific stockcode to process. empty runs full pipeline.")
parser.add_argument('--impute', action='store_true', help="flag to create imputation values")
args = parser.parse_args()
etl_pipeline(stockcode=args.stockcode, impute_stockcode=args.impute)
输出
Pandas DataFrame中的原始数据和结构化数据存储在DVC缓存中:
-
data/original_df.parquet
-
data/processed_df.parquet
阶段 2:数据漂移检查
在进行预处理之前,我们将运行数据漂移测试,以确保数据中没有明显的漂移。为此,我们将使用Evently AI,一个开源的ML和LLM可观测性框架。
什么是数据漂移?
数据漂移指的是模型训练时所依据的数据的统计特性(如均值、方差或分布)发生的任何变化。
有三种主要的数据漂移类型:
-
协变量漂移(特征漂移):输入特征分布的变化。
-
先验概率漂移(标签漂移):目标变量分布的变化。
-
概念漂移: 输入数据与目标变量之间关系的变化。
数据漂移会随着时间的推移损害模型的泛化能力,因此在部署后进行检测至关重要。
DVC 配置
我们在etl_pipeline
阶段之后添加data_drift_check
阶段:
dvc.yaml
:
stages:
etl_pipeline:
###
data_drift_check:
# dvc在此阶段将运行的主要命令
cmd: > python src/data_handling/report_data_drift.py data/processed/processed_df.csv data/processed_df_${params.stockcode}.parquet reports/data_drift_report_${params.stockcode}.html metrics/data_drift_${params.stockcode}.json ${params.stockcode}
# 传递给参数的默认值(在param.yaml文件中定义)
params:
- params.stockcode
# 运行主命令所必需的依赖项
deps:
- src/data_handling/report_data_drift.py
- src/
# dvc跟踪的输出文件路径
plots:
- reports/data_drift_report_${params.stockcode}.html:
metrics:
- metrics/data_drift_${params.stockcode}.json:
type: json
然后,向传递给DVC命令的参数添加默认值:
params.yaml
:
params:
stockcode: <STOCKCODE OF CHOICE>
Python 脚本
在从EventlyAI工作区生成API令牌后,我们将添加一个Python脚本来检测数据漂移并将结果存储在metrics
变量中:
src/data_handling/report_data_drift.py
:
import os
import sys
import json
import pandas as pd
import datetime
from dotenv import load_dotenv
from evidently import Dataset, DataDefinition, Report
from evidently.presets import DataDriftPreset
from evidently.ui.workspace import CloudWorkspace
import src.data_handling.scripts as scripts
from src._utils import main_logger
if __name__ == '__main__':
# 初始化evently云工作区
load_dotenv(override=True)
ws = CloudWorkspace(token=os.getenv('EVENTLY_API_TOKEN'), url='https://app.evidently.cloud')
# 检索evently项目
project = ws.get_project('EVENTLY AI PROJECT ID')
# 从命令行参数中检索路径
REFERENCE_DATA_PATH = sys.argv[1]
CURRENT_DATA_PATH = sys.argv[2]
REPORT_OUTPUT_PATH = sys.argv[3]
METRICS_OUTPUT_PATH = sys.argv[4]
STOCKCODE = sys.argv[5]
# 如果不存在则创建文件夹
os.makedirs(os.path.dirname(REPORT_OUTPUT_PATH), exist_ok=True)
os.makedirs(os.path.dirname(METRICS_OUTPUT_PATH), exist_ok=True)
# 提取数据集
reference_data_full = pd.read_csv(REFERENCE_DATA_PATH)
reference_data_stockcode = reference_data_full[reference_data_full['stockcode'] == STOCKCODE]
current_data_stockcode = pd.read_parquet(CURRENT_DATA_PATH)
# 定义数据模式
nums, cats = scripts.categorize_num_cat_cols(df=reference_data_stockcode)
for col in nums:
current_data_stockcode[col] = pd.to_numeric(current_data_stockcode[col], errors='coerce')
schema = DataDefinition(numerical_columns=nums, categorical_columns=cats)
# 使用数据模式定义evently数据集
eval_data_1 = Dataset.from_pandas(reference_data_stockcode, data_definition=schema)
eval_data_2 = Dataset.from_pandas(current_data_stockcode, data_definition=schema)
# 执行漂移检测
report = Report(metrics=[DataDriftPreset()])
data_eval = report.run(reference_data=eval_data_1, current_data=eval_data_2)
data_eval.save_html(REPORT_OUTPUT_PATH)
# 为dvc跟踪创建指标
report_dict = json.loads(data_eval.json())
num_drifts = report_dict['metrics'][0]['value']['count']
shared_drifts = report_dict['metrics'][0]['value']['share']
metrics = dict(
drift_detected=bool(num_drifts > 0.0),
num_drifts=num_drifts,
shared_drifts=shared_drifts,
num_cols=nums,
cat_cols=cats,
stockcode=STOCKCODE,
timestamp=datetime.datetime.now().isoformat(),
)
# 加载指标文件
with open(METRICS_OUTPUT_PATH, 'w') as f:
json.dump(metrics, f, indent=4)
main_logger.info(f'... drift metrics saved to {METRICS_OUTPUT_PATH}... ')
# 如果发现数据漂移,则停止系统
if num_drifts > 0.0:
sys.exit('❌ FATAL: data drift detected. stopping pipeline')
如果发现数据漂移,脚本将使用最终的sys.exit
命令立即退出。
输出
该脚本生成DVC将跟踪的两个文件:
-
reports/data_drift_report.html
:HTML格式的数据漂移报告。 -
metrics/data_drift.json
:包含漂移结果以及特征列和时间戳的JSON格式的数据漂移指标:
metrics/data_drift.json
:
{
"drift_detected": false,
"num_drifts": 0.0,
"shared_drifts": 0.0,
"num_cols": [
"invoiceno",
"invoicedate",
"unitprice",
"product_avg_quantity_last_month",
"product_max_price_all_time",
"unitprice_vs_max",
"unitprice_to_avg",
"unitprice_squared",
"unitprice_log"
],
"cat_cols": [
"stockcode",
"customerid",
"country",
"year",
"year_month",
"day_of_week",
"is_registered"
],
"timestamp": "2025-10-07T00:24:29.899495"
}
漂移测试的结果也可以在Evently工作区仪表板上查看,以供进一步分析:
图 B. Evently 工作区仪表板截图
阶段 3:预处理
如果没有检测到数据漂移,血缘将继续进行预处理阶段。
DVC 配置
我们在data_drift_check
阶段之后添加preprocess
阶段:
dvc.yaml
:
stages:
etl_pipeline:
###
data_drift_check:
###
preprocess:
cmd: > python src/data_handling/preprocess.py --target_col ${params.target_col} --should_scale ${params.should_scale} --verbose ${params.verbose}
deps:
- src/data_handling/preprocess.py
- src/data_handling/
- src/_utils
# params from params.yaml
params:
- params.target_col
- params.should_scale
- params.verbose
outs:
# train, val, test datasets
- data/x_train_df.parquet
- data/x_val_df.parquet
- data/x_test_df.parquet
- data/y_train_df.parquet
- data/y_val_df.parquet
- data/y_test_df.parquet
# preprocessed input datasets
- data/x_train_processed.parquet
- data/x_val_processed.parquet
- data/x_test_processed.parquet
# trained preprocessor and human readable feature names for shap analysis
- preprocessors/column_transformer.pkl
- preprocessors/feature_names.json
然后在cmd
中使用的参数添加默认值:
params.yaml
:
params:
target_col: "quantity"
should_scale: True
verbose: False
Python 脚本
接下来,我们将添加一个Python脚本,用于创建训练、验证和测试数据集,并预处理输入数据:
import os
import argparse
import json
import joblib
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import src.data_handling.scripts as scripts
from src._utils import main_logger
def preprocess(stockcode: str = '', target_col: str = 'quantity', should_scale: bool = True, verbose: bool = False):
# 初始化要跟踪的指标 (dvc)
DATA_DRIFT_METRICS_PATH = os.path.join('metrics', f'data_drift_{args.stockcode}.json')
if os.path.exists(DATA_DRIFT_METRICS_PATH):
with open(DATA_DRIFT_METRICS_PATH, 'r') as f:
metrics = json.load(f)
else:
metrics = dict()
# 从dvc缓存加载处理后的df
PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
df = pd.read_parquet(PROCESSED_DF_PATH)
# 分类数值和分类列
num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)
if verbose:
main_logger.info(f'num_cols: {num_cols}
cat_cols: {cat_cols}')
# 结构化分类列
if cat_cols:
for col in cat_cols:
df[col] = df[col].astype('string')
# 初始化预处理器(要么从dvc缓存加载,要么从头创建)
PREPROCESSOR_PATH = os.path.join('preprocessors', 'column_transformer.pkl')
try:
preprocessor = joblib.load(PREPROCESSOR_PATH)
except:
preprocessor = scripts.create_preprocessor(num_cols=num_cols if should_scale else [], cat_cols=cat_cols)
# 创建训练、验证、测试数据集
y = df[target_col]
X = df.copy().drop(target_col, axis='columns')
# 分割
test_size, random_state = 50000, 42
X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)
X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=test_size, random_state=random_state, shuffle=False)
# 存储训练、验证、测试数据集 (dvc 跟踪)
X_train.to_parquet('data/x_train_df.parquet', index=False)
X_val.to_parquet('data/x_val_df.parquet', index=False)
X_test.to_parquet('data/x_test_df.parquet', index=False)
y_train.to_frame(name=target_col).to_parquet('data/y_train_df.parquet', index=False)
y_val.to_frame(name=target_col).to_parquet('data/y_val_df.parquet', index=False)
y_test.to_frame(name=target_col).to_parquet('data/y_test_df.parquet', index=False)
# 预处理
X_train = preprocessor.fit_transform(X_train)
X_val = preprocessor.transform(X_val)
X_test = preprocessor.transform(X_test)
# 存储预处理后的输入数据 (dvc 跟踪)
pd.DataFrame(X_train).to_parquet(f'data/x_train_processed.parquet', index=False)
pd.DataFrame(X_val).to_parquet(f'data/x_val_processed.parquet', index=False)
pd.DataFrame(X_test).to_parquet(f'data/x_test_processed.parquet', index=False)
# 保存特征名称 (dvc 跟踪) 以用于shap分析
with open('preprocessors/feature_names.json', 'w') as f:
feature_names = preprocessor.get_feature_names_out()
json.dump(feature_names.tolist(), f)
return X_train, X_val, X_test, y_train, y_val, y_test, preprocessor
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='run data preprocessing')
parser.add_argument('--stockcode', type=str, default='', help='specific stockcode')
parser.add_argument('--target_col', type=str, default='quantity', help='the target column name')
parser.add_argument('--should_scale', type=bool, default=True, help='flag to scale numerical features')
parser.add_argument('--verbose', type=bool, default=False, help='flag for verbose logging')
args = parser.parse_args()
X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = preprocess(
target_col=args.target_col,
should_scale=args.should_scale,
verbose=args.verbose,
stockcode=args.stockcode,
)
输出
此阶段为模型训练和推理生成必要的_数据集:
输入特征:
-
data/x_train_df.parquet
-
data/x_val_df.parquet
-
data/x_test_df.parquet
预处理后的输入特征:
-
data/x_train_processed_df.parquet
-
data/x_val_processed_df.parquet
-
data/x_test_processed_df.parquet
目标变量:
-
data/y_train_df.parquet
-
data/y_val_df.parquet
-
data/y_test_df.parquet
可复用预处理器和特征名称:
-
preprocessors/column_transformer.pkl
-
preprocessors/feature_names.json
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区