目 录CONTENT

文章目录

如何构建端到端机器学习血缘(Lineage):DVC、Evently AI 与 Prefect 实践指南

Administrator
2025-10-17 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

📢 转载信息

原文链接:https://www.freecodecamp.org/news/how-to-build-end-to-end-machine-learning-lineage/

原文作者:freeCodeCamp


How to Build End-to-End Machine Learning Lineage

在任何稳健的机器学习(ML)系统中,机器学习血缘(ML Lineage)都是至关重要的。它允许您跟踪数据和模型的版本,从而确保可复现性、可审计性和合规性。

尽管存在许多用于跟踪ML血缘的服务,但创建一个全面且易于管理的血缘系统往往非常复杂。

在本文中,我将引导您为部署在AWS Lambda无服务器架构上的ML应用程序集成一个全面的ML血缘解决方案,涵盖端到端的管道阶段:

  • ETL管道

  • 数据漂移检测

  • 预处理

  • 模型调优

  • 风险和公平性评估。

目录

  1. 什么是机器学习血缘?

  2. 我们将构建什么

  3. 工作流实战

  4. 第 1 步:初始化 DVC 项目

  5. 第 2 步:ML 血缘实现

  6. 第 3 步:部署 DVC 项目

  7. 第 4 步:使用 Prefect 配置定时运行

  8. 第 5 步:部署应用程序

  9. 结论

先决条件:

  • 了解关键的机器学习/深度学习概念,包括完整的生命周期:数据处理、模型训练、调优和验证。

  • 精通 Python,并有使用主流ML库的经验。

  • 对 DevOps 原理有基本了解。

我们将使用的工具:

以下是我们用来跟踪ML血缘的工具总结:

  • DVC (Data Version Control):一个用于数据的开源版本系统。用于跟踪ML血缘。

  • AWS S3:来自AWS的安全对象存储服务。用作远程存储。

  • Evently AI:一个开源的ML和LLM可观测性框架。用于检测数据漂移。

  • Prefect:一个工作流编排引擎。用于管理血缘的定时运行。

什么是机器学习血缘?

机器学习(ML)血缘是一个用于跟踪和理解机器学习模型完整生命周期的框架。

它包含不同层面的信息,例如:

  • 代码: 用于模型训练的脚本、库和配置。

  • 数据: 原始数据、转换和特征。

  • 实验: 训练运行、超参数调优结果。

  • 模型: 训练好的模型及其版本。

  • 预测: 已部署模型的输出。

ML血缘在多个方面至关重要:

  • 可复现性: 重新创建相同的模型和预测以进行验证。

  • 根本原因分析: 当模型在生产中出现故障时,追溯到数据、代码或配置的更改。

  • 合规性: 一些受监管的行业要求提供模型训练的证明,以确保公平性、透明度以及遵守GDPR和欧盟《人工智能法案》等法律。

我们将构建什么

在这个项目中,我将使用DVC(机器学习应用的开源版本控制系统),将ML血缘集成到这个基于AWS Lambda架构的价格预测系统中。

下图说明了我们将集成的系统架构和ML血缘:

Figure A. A comprehensive ML lineage for an ML application on serverless Lambda (Created by Kuriko IWAI)

图 A: 针对无服务器Lambda上ML应用的全面ML血缘(由Kuriko IWAI创建)

系统架构:零售商的AI定价

该系统作为一个容器化的无服务器微服务运行,旨在提供最佳价格建议以最大化零售商的销售额。

其核心智能来自基于历史购买数据训练的AI模型,用于预测在不同价格下产品的销售数量,从而帮助卖家确定最佳价格。

为了实现一致的部署,预测逻辑及其依赖项被打包成一个Docker容器镜像并存储在AWS ECR(Elastic Container Registry)中。

然后,预测服务由AWS Lambda函数提供,该函数从ECR检索并运行容器,并通过AWS API Gateway将结果暴露给Flask应用程序进行消费。

如果您想了解如何从头开始构建此系统,可以参考我的教程如何在无服务器架构上构建机器学习系统

ML血缘

在系统中,GitHub负责代码血缘,而DVC捕获以下血缘:

  • 数据(蓝色方框):ETL和预处理。

  • 实验(浅橙色):超参数调优和验证。

  • 模型预测(深橙色):最终模型工件和预测结果。

DVC通过从数据提取到公平性测试(图A中的黄色行)的独立阶段来跟踪血缘。

对于每个阶段,DVC使用MD5SHA256哈希来跟踪元数据(如工件、指标和报告),并将它们推送到其在AWS S3上的远程存储。

该管道整合了Evently AI来处理数据漂移测试,这对于识别可能影响模型在生产中泛化能力的分布变化至关重要。

只有成功通过数据漂移和公平性测试的模型才能通过AWS API网关提供预测(图A中的红色框)。

最后,整个血缘过程由开源工作流调度器Prefect每周触发。

Prefect会提示DVC检查数据和脚本的更新,并在检测到更改时执行完整的血缘过程。

工作流实战

构建过程涉及五个主要步骤:

  1. 初始化 DVC 项目

  2. 使用 DVC 脚本 dvc.yaml 和相应的 Python 脚本定义血缘阶段

  3. 部署 DVC 项目

  4. 使用 Prefect 配置定时运行

  5. 部署应用程序

让我们一起遍历每一步。

第 1 步:初始化 DVC 项目

第一步是初始化一个DVC项目:

$dvc init

此命令会自动在项目文件夹的根目录创建一个.dvc目录:

.
.dvc/
│
└── cache/ # [.gitignore] 存储dvc缓存(缓存实际数据文件)
└── tmp/ # [.gitignore]
└── .gitignore # gitignore 缓存, tmp, 和 config.local
└── config # 生产环境的dvc配置
└── config.local # [.gitignore] 本地dvc配置

DVC通过将原始数据与大文件从存储库中分离出来,来维护一个快速、轻量级的Git存储库。

该过程涉及将原始数据缓存到本地.dvc/cache目录中,创建一个包含MD5哈希和指向原始文件路径的微小.dvc元数据文件,将小的元数据文件推送到Git,并将原始数据推送到DVC远程存储。

第 2 步:ML 血缘实现

接下来,我们将使用以下阶段配置ML血缘:

  1. etl_pipeline:提取、清洗、插补原始数据并执行特征工程。

  2. data_drift_check:运行数据漂移测试。如果失败,系统退出。

  3. preprocess:创建训练、验证和测试数据集。

  4. tune_primary_model:调优超参数并训练模型。

  5. inference_primary_model:在测试数据集上执行推理。

  6. assess_model_risk:运行风险和公平性测试。

每个阶段都需要定义DVC命令及其对应的Python脚本。

让我们开始吧。

阶段 1:ETL 管道

第一个阶段是提取、清洗、插补原始数据并执行特征工程。

DVC 配置

我们在项目目录的根目录创建dvc.yaml文件,并添加etl_pipeline阶段:

dvc.yaml

stages: etl_pipeline: # dvc在此阶段将运行的主要命令 cmd: python src/data_handling/etl_pipeline.py # 运行主命令所必需的依赖项 deps: - src/data_handling/etl_pipeline.py - src/data_handling/ - src/_utils/ # dvc跟踪的输出路径 outs: - data/original_df.parquet - data/processed_df.parquet

dvc.yaml文件定义了一个阶段序列,使用如下的段落:

  • cmd:为该阶段执行的shell命令

  • deps:运行cmd所需的依赖项

  • prams:在params.yaml文件中定义的cmd的默认参数

  • metrics:要跟踪的指标文件

  • reports:要跟踪的报告文件

  • plots:DVC绘图文件,用于可视化

  • outscmd产生的输出文件,DVC将对其进行跟踪

该配置通过明确列出每个阶段的依赖项、输出和命令,帮助DVC确保可复现性。它还通过建立工作流的有向无环图(DAG),将每个阶段链接到下一个阶段,从而帮助管理血缘。

Python 脚本

接下来,我们添加Python脚本,确保数据使用dvc.yamlouts部分指定的_文件路径进行存储:

src/data_handling/etl_pipeline.py

import os
import argparse
import src.data_handling.scripts as scripts
from src._utils import main_logger

def etl_pipeline():
    # 提取所有数据
    df = scripts.extract_original_dataframe()
    
    # 保存原始parquet文件
    ORIGINAL_DF_PATH = os.path.join('data', 'original_df.parquet')
    df.to_parquet(ORIGINAL_DF_PATH, index=False)
    # dvc 跟踪
    
    # 转换
    df = scripts.structure_missing_values(df=df)
    df = scripts.handle_feature_engineering(df=df)
    
    PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
    df.to_parquet(PROCESSED_DF_PATH, index=False)
    # dvc 跟踪
    
    return df

# 用于dvc执行
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="run etl pipeline")
    parser.add_argument('--stockcode', type=str, default='', help="specific stockcode to process. empty runs full pipeline.")
    parser.add_argument('--impute', action='store_true', help="flag to create imputation values")
    args = parser.parse_args()
    etl_pipeline(stockcode=args.stockcode, impute_stockcode=args.impute)

输出

Pandas DataFrame中的原始数据和结构化数据存储在DVC缓存中:

  • data/original_df.parquet

  • data/processed_df.parquet

阶段 2:数据漂移检查

在进行预处理之前,我们将运行数据漂移测试,以确保数据中没有明显的漂移。为此,我们将使用Evently AI,一个开源的ML和LLM可观测性框架。

什么是数据漂移?

数据漂移指的是模型训练时所依据的数据的统计特性(如均值、方差或分布)发生的任何变化。

有三种主要的数据漂移类型:

  • 协变量漂移(特征漂移):输入特征分布的变化。

  • 先验概率漂移(标签漂移):目标变量分布的变化。

  • 概念漂移: 输入数据与目标变量之间关系的变化。

数据漂移会随着时间的推移损害模型的泛化能力,因此在部署后进行检测至关重要。

DVC 配置

我们在etl_pipeline阶段之后添加data_drift_check阶段:

dvc.yaml

stages: 
    etl_pipeline:
        ###
    data_drift_check:
        # dvc在此阶段将运行的主要命令
        cmd: > python src/data_handling/report_data_drift.py data/processed/processed_df.csv data/processed_df_${params.stockcode}.parquet reports/data_drift_report_${params.stockcode}.html metrics/data_drift_${params.stockcode}.json ${params.stockcode}

        # 传递给参数的默认值(在param.yaml文件中定义)
        params:
        - params.stockcode
        # 运行主命令所必需的依赖项
        deps:
        - src/data_handling/report_data_drift.py
        - src/
        # dvc跟踪的输出文件路径
        plots:
        - reports/data_drift_report_${params.stockcode}.html:
        metrics:
        - metrics/data_drift_${params.stockcode}.json:
        type: json

然后,向传递给DVC命令的参数添加默认值:

params.yaml

params:
    stockcode: <STOCKCODE OF CHOICE>

Python 脚本

从EventlyAI工作区生成API令牌后,我们将添加一个Python脚本来检测数据漂移并将结果存储在metrics变量中:

src/data_handling/report_data_drift.py

import os
import sys
import json
import pandas as pd
import datetime
from dotenv import load_dotenv
from evidently import Dataset, DataDefinition, Report
from evidently.presets import DataDriftPreset
from evidently.ui.workspace import CloudWorkspace

import src.data_handling.scripts as scripts
from src._utils import main_logger

if __name__ == '__main__':
    # 初始化evently云工作区
    load_dotenv(override=True)
    ws = CloudWorkspace(token=os.getenv('EVENTLY_API_TOKEN'), url='https://app.evidently.cloud')
    
    # 检索evently项目
    project = ws.get_project('EVENTLY AI PROJECT ID')
    
    # 从命令行参数中检索路径
    REFERENCE_DATA_PATH = sys.argv[1]
    CURRENT_DATA_PATH = sys.argv[2]
    REPORT_OUTPUT_PATH = sys.argv[3]
    METRICS_OUTPUT_PATH = sys.argv[4]
    STOCKCODE = sys.argv[5]

    # 如果不存在则创建文件夹
    os.makedirs(os.path.dirname(REPORT_OUTPUT_PATH), exist_ok=True)
    os.makedirs(os.path.dirname(METRICS_OUTPUT_PATH), exist_ok=True)

    # 提取数据集
    reference_data_full = pd.read_csv(REFERENCE_DATA_PATH)
    reference_data_stockcode = reference_data_full[reference_data_full['stockcode'] == STOCKCODE]
    current_data_stockcode = pd.read_parquet(CURRENT_DATA_PATH)

    # 定义数据模式
    nums, cats = scripts.categorize_num_cat_cols(df=reference_data_stockcode)
    for col in nums:
        current_data_stockcode[col] = pd.to_numeric(current_data_stockcode[col], errors='coerce')
    schema = DataDefinition(numerical_columns=nums, categorical_columns=cats)

    # 使用数据模式定义evently数据集
    eval_data_1 = Dataset.from_pandas(reference_data_stockcode, data_definition=schema)
    eval_data_2 = Dataset.from_pandas(current_data_stockcode, data_definition=schema)

    # 执行漂移检测
    report = Report(metrics=[DataDriftPreset()])
    data_eval = report.run(reference_data=eval_data_1, current_data=eval_data_2)
    data_eval.save_html(REPORT_OUTPUT_PATH)

    # 为dvc跟踪创建指标
    report_dict = json.loads(data_eval.json())
    num_drifts = report_dict['metrics'][0]['value']['count']
    shared_drifts = report_dict['metrics'][0]['value']['share']
    metrics = dict(
        drift_detected=bool(num_drifts > 0.0),
        num_drifts=num_drifts,
        shared_drifts=shared_drifts,
        num_cols=nums,
        cat_cols=cats,
        stockcode=STOCKCODE,
        timestamp=datetime.datetime.now().isoformat(),
    )

    # 加载指标文件
    with open(METRICS_OUTPUT_PATH, 'w') as f:
        json.dump(metrics, f, indent=4)

    main_logger.info(f'... drift metrics saved to {METRICS_OUTPUT_PATH}... ')

    # 如果发现数据漂移,则停止系统
    if num_drifts > 0.0:
        sys.exit('❌ FATAL: data drift detected. stopping pipeline')

如果发现数据漂移,脚本将使用最终的sys.exit命令立即退出。

输出

该脚本生成DVC将跟踪的两个文件:

  • reports/data_drift_report.html:HTML格式的数据漂移报告。

  • metrics/data_drift.json:包含漂移结果以及特征列和时间戳的JSON格式的数据漂移指标:

metrics/data_drift.json

{
    "drift_detected": false,
    "num_drifts": 0.0,
    "shared_drifts": 0.0,
    "num_cols": [
        "invoiceno",
        "invoicedate",
        "unitprice",
        "product_avg_quantity_last_month",
        "product_max_price_all_time",
        "unitprice_vs_max",
        "unitprice_to_avg",
        "unitprice_squared",
        "unitprice_log"
    ],
    "cat_cols": [
        "stockcode",
        "customerid",
        "country",
        "year",
        "year_month",
        "day_of_week",
        "is_registered"
    ],
    "timestamp": "2025-10-07T00:24:29.899495"
}

漂移测试的结果也可以在Evently工作区仪表板上查看,以供进一步分析:

Figure B. Screenshot of the Evently workspace dashboard

图 B. Evently 工作区仪表板截图

阶段 3:预处理

如果没有检测到数据漂移,血缘将继续进行预处理阶段。

DVC 配置

我们在data_drift_check阶段之后添加preprocess阶段:

dvc.yaml

stages:
    etl_pipeline:
        ###
    data_drift_check:
        ### 
    preprocess:
        cmd: > python src/data_handling/preprocess.py --target_col ${params.target_col} --should_scale ${params.should_scale} --verbose ${params.verbose}

        deps:
        - src/data_handling/preprocess.py
        - src/data_handling/
        - src/_utils
        # params from params.yaml
        params:
        - params.target_col
        - params.should_scale
        - params.verbose
        outs:
        # train, val, test datasets
        - data/x_train_df.parquet
        - data/x_val_df.parquet
        - data/x_test_df.parquet
        - data/y_train_df.parquet
        - data/y_val_df.parquet
        - data/y_test_df.parquet
        # preprocessed input datasets
        - data/x_train_processed.parquet
        - data/x_val_processed.parquet
        - data/x_test_processed.parquet
        # trained preprocessor and human readable feature names for shap analysis
        - preprocessors/column_transformer.pkl
        - preprocessors/feature_names.json

然后在cmd中使用的参数添加默认值:

params.yaml

params:
    target_col: "quantity"
    should_scale: True
    verbose: False

Python 脚本

接下来,我们将添加一个Python脚本,用于创建训练、验证和测试数据集,并预处理输入数据:

import os
import argparse
import json
import joblib
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import src.data_handling.scripts as scripts
from src._utils import main_logger

def preprocess(stockcode: str = '', target_col: str = 'quantity', should_scale: bool = True, verbose: bool = False):
    # 初始化要跟踪的指标 (dvc)
    DATA_DRIFT_METRICS_PATH = os.path.join('metrics', f'data_drift_{args.stockcode}.json')

    if os.path.exists(DATA_DRIFT_METRICS_PATH):
        with open(DATA_DRIFT_METRICS_PATH, 'r') as f:
            metrics = json.load(f)
    else:
        metrics = dict()

    # 从dvc缓存加载处理后的df
    PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
    df = pd.read_parquet(PROCESSED_DF_PATH)

    # 分类数值和分类列
    num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)

    if verbose:
        main_logger.info(f'num_cols: {num_cols} 
cat_cols: {cat_cols}')

    # 结构化分类列
    if cat_cols:
        for col in cat_cols:
            df[col] = df[col].astype('string')

    # 初始化预处理器(要么从dvc缓存加载,要么从头创建)
    PREPROCESSOR_PATH = os.path.join('preprocessors', 'column_transformer.pkl')
    try:
        preprocessor = joblib.load(PREPROCESSOR_PATH)
    except:
        preprocessor = scripts.create_preprocessor(num_cols=num_cols if should_scale else [], cat_cols=cat_cols)

    # 创建训练、验证、测试数据集
    y = df[target_col]
    X = df.copy().drop(target_col, axis='columns')

    # 分割
    test_size, random_state = 50000, 42
    X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)
    X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=test_size, random_state=random_state, shuffle=False)

    # 存储训练、验证、测试数据集 (dvc 跟踪)
    X_train.to_parquet('data/x_train_df.parquet', index=False)
    X_val.to_parquet('data/x_val_df.parquet', index=False)
    X_test.to_parquet('data/x_test_df.parquet', index=False)
    y_train.to_frame(name=target_col).to_parquet('data/y_train_df.parquet', index=False)
    y_val.to_frame(name=target_col).to_parquet('data/y_val_df.parquet', index=False)
    y_test.to_frame(name=target_col).to_parquet('data/y_test_df.parquet', index=False)

    # 预处理
    X_train = preprocessor.fit_transform(X_train)
    X_val = preprocessor.transform(X_val)
    X_test = preprocessor.transform(X_test)

    # 存储预处理后的输入数据 (dvc 跟踪)
    pd.DataFrame(X_train).to_parquet(f'data/x_train_processed.parquet', index=False)
    pd.DataFrame(X_val).to_parquet(f'data/x_val_processed.parquet', index=False)
    pd.DataFrame(X_test).to_parquet(f'data/x_test_processed.parquet', index=False)

    # 保存特征名称 (dvc 跟踪) 以用于shap分析
    with open('preprocessors/feature_names.json', 'w') as f:
        feature_names = preprocessor.get_feature_names_out()
        json.dump(feature_names.tolist(), f)

    return X_train, X_val, X_test, y_train, y_val, y_test, preprocessor

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='run data preprocessing')
    parser.add_argument('--stockcode', type=str, default='', help='specific stockcode')
    parser.add_argument('--target_col', type=str, default='quantity', help='the target column name')
    parser.add_argument('--should_scale', type=bool, default=True, help='flag to scale numerical features')
    parser.add_argument('--verbose', type=bool, default=False, help='flag for verbose logging')
    args = parser.parse_args()

    X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = preprocess(
        target_col=args.target_col,
        should_scale=args.should_scale,
        verbose=args.verbose,
        stockcode=args.stockcode,
    )

输出

此阶段为模型训练和推理生成必要的_数据集:

输入特征:

  • data/x_train_df.parquet

  • data/x_val_df.parquet

  • data/x_test_df.parquet

预处理后的输入特征:

  • data/x_train_processed_df.parquet

  • data/x_val_processed_df.parquet

  • data/x_test_processed_df.parquet

目标变量:

  • data/y_train_df.parquet

  • data/y_val_df.parquet

  • data/y_test_df.parquet

可复用预处理器和特征名称:

  • preprocessors/column_transformer.pkl

  • preprocessors/feature_names.json




🚀 想要体验更好更全面的AI调用?

欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。

0

评论区