目 录CONTENT

文章目录

使用Python构建简单的数据质量DSL

Administrator
2025-12-02 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

📢 转载信息

原文链接:https://www.kdnuggets.com/building-a-simple-data-quality-dsl-in-python

原文作者:Bala Priya C


Building a Simple Data Quality DSL in Python
Image by Author

 

构建一个简单的数据质量DSL

 
在Python中,数据验证代码往往难以维护。业务规则被深层嵌套的if语句掩盖,验证逻辑与错误处理混杂在一起,添加新的检查意味着需要在过程式函数中摸索合适的位置插入代码。是的,你可以使用数据验证框架,但我们将专注于使用Python构建一些非常简单但实用的东西。

我们将通过创建一个专门用于数据验证的词汇表,来编写一种简化的领域特定语言(DSL)。与其编写通用的Python代码,不如构建专门的函数和类,用与你思考问题方式相匹配的术语来表达验证规则。

对于数据验证而言,这意味着规则读起来就像业务需求:“客户年龄必须在18到120岁之间”,或者“电子邮件地址必须包含@符号,并且应具有有效的域名”。你希望DSL来处理检查数据和报告违规行为的底层机制,而你则专注于表达什么是有效数据。最终得到的验证逻辑是可读、易于维护和测试,且易于扩展的。那么,让我们开始编码吧!

🔗 在GitHub上查看代码链接

 

为什么构建DSL?

 
考虑使用Python验证客户数据:

def validate_customers(df): errors = [] if df['customer_id'].duplicated().any(): errors.append("Duplicate IDs") if (df['age'] < 0).any(): errors.append("Negative ages") if not df['email'].str.contains('@').all(): errors.append("Invalid emails") return errors

 

这种方法将验证逻辑硬编码,将业务规则与错误处理混合在一起,并且随着规则的增多,它将变得难以维护。相反,我们希望编写一个DSL来分离关注点并创建可重用的验证组件。

而不是编写过程式的验证函数,DSL允许你表达读起来像业务需求的规则:

# 传统方法 if df['age'].min() < 0 or df['age'].max() > 120: raise ValueError("Invalid ages found") # DSL方法 validator.add_rule(Rule("Valid ages", between('age', 0, 120), "Ages must be 0-120")) 

 

DSL方法将你正在验证的内容(业务规则)与如何处理违规行为(错误报告)分离开来。这使得验证逻辑可测试、可重用,并且非程序员也能读懂。

 

创建样本数据集

 
首先,启动一个包含常见质量问题的、真实的电子商务客户数据样本:

import pandas as pd customers = pd.DataFrame({ 'customer_id': [101, 102, 103, 103, 105], 'email': ['john@gmail.com', 'invalid-email', '', 'sarah@yahoo.com', 'mike@domain.co'], 'age': [25, -5, 35, 200, 28], 'total_spent': [250.50, 1200.00, 0.00, -50.00, 899.99], 'join_date': ['2023-01-15', '2023-13-45', '2023-02-20', '2023-02-20', ''] }) # Note: 2023-13-45 is an intentionally malformed date.

 

该数据集包含重复的客户ID、无效的电子邮件格式、不合常理的年龄、负数的消费金额以及格式错误的日期。这对于测试验证规则来说应该足够好了。

 

编写验证逻辑

 

// 创建 Rule 类

让我们开始编写一个简单的Rule类来封装验证逻辑:

class Rule:
    def __init__(self, name, condition, error_msg):
        self.name = name
        self.condition = condition
        self.error_msg = error_msg

    def check(self, df):
        # condition函数返回True表示有效行。
        # 我们使用~(按位非)来选择违反条件的行。
        violations = df[~self.condition(df)]
        if not violations.empty:
            return {
                'rule': self.name,
                'message': self.error_msg,
                'violations': len(violations),
                'sample_rows': violations.head(3).index.tolist()
            }
        return None

 

condition参数接受任何接受DataFrame并返回布尔Series的函数,以指示有效行。波浪号(~)操作符会反转这个布尔Series以识别违规行为。当存在违规时,check方法返回详细信息,包括规则名称、错误消息、违规计数以及用于调试的样本行索引。

这种设计将验证逻辑与错误报告分离开来。condition函数只关注业务规则,而Rule类则一致地处理错误详情。

 

// 添加多个规则

接下来,我们编写一个DataValidator类来管理规则集合:

class DataValidator:
    def __init__(self):
        self.rules = []

    def add_rule(self, rule):
        self.rules.append(rule)
        return self # 启用方法链式调用

    def validate(self, df):
        results = []
        for rule in self.rules:
            violation = rule.check(df)
            if violation:
                results.append(violation)
        return results

 

add_rule方法返回self以支持方法链式调用。validate方法独立执行所有规则并收集违规报告。这种方法确保了一条规则失败不会阻止其他规则的执行。

 

// 构建可读的条件

回想一下,在实例化Rule类对象时,我们也需要一个condition函数。它可以是任何接受DataFrame并返回布尔Series的函数。虽然简单的lambda函数可行,但可读性不高。因此,我们编写辅助函数来创建可读的验证词汇表:

def not_null(column):
    return lambda df: df[column].notna()

def unique_values(column):
    return lambda df: ~df.duplicated(subset=[column], keep=False)

def between(column, min_val, max_val):
    return lambda df: df[column].between(min_val, max_val)

 

每个辅助函数都返回一个与pandas布尔操作配合使用的lambda函数。

  • not_null辅助函数使用pandas的notna()方法来识别非空值。
  • unique_values辅助函数使用duplicated(..., keep=False)subset参数来标记所有重复出现的记录,从而实现更准确的违规计数。
  • between辅助函数使用pandas的between()方法来自动处理范围检查。

对于模式匹配,正则表达式会变得很直接:

import re

def matches_pattern(column, pattern):
    return lambda df: df[column].str.match(pattern, na=False)

 

na=False参数确保缺失值被视为验证失败而不是匹配,这通常是必需字段所需行为。

 

为样本数据集构建数据验证器

 
现在让我们为客户数据集构建一个验证器,看看这个DSL是如何工作的:

validator = DataValidator()
validator.add_rule(Rule(
    "Unique customer IDs", 
    unique_values('customer_id'), 
    "Customer IDs must be unique across all records"
))
validator.add_rule(Rule(
    "Valid email format", 
    matches_pattern('email', r'^[^@\s]+@[^@\s]+\.[^@\s]+$'), 
    "Email addresses must contain @ symbol and domain"
))
validator.add_rule(Rule(
    "Reasonable customer age", 
    between('age', 13, 120), 
    "Customer age must be between 13 and 120 years"
))
validator.add_rule(Rule(
    "Non-negative spending", 
    lambda df: df['total_spent'] >= 0, 
    "Total spending amount cannot be negative"
))

 

每条规则都遵循相同的模式:描述性名称、验证条件和错误消息

  • 第一条规则使用unique_values辅助函数来检查重复的客户ID。
  • 第二条规则应用正则表达式模式匹配来验证电子邮件格式。该模式要求@符号前后至少有一个字符,以及一个域名扩展。
  • 第三条规则使用between辅助函数进行范围验证,为客户设置合理的年龄限制。
  • 最后一条规则使用lambda函数进行内联条件检查,确保total_spent值非负。

请注意,每条规则读起来几乎就像一条业务需求。验证器收集这些规则,并可以对任何具有匹配列名称的DataFrame执行它们:

issues = validator.validate(customers)
for issue in issues:
    print(f"❌ Rule: {issue['rule']}")
    print(f"Problem: {issue['message']}")
    print(f"Affected rows: {issue['sample_rows']}")
    print()

 

输出清晰地指出了数据集中的具体问题及其位置,使调试变得简单明了。对于样本数据,你将得到以下输出:

Validation Results: 
❌ Rule: Unique customer IDs
Problem: Customer IDs must be unique across all records
Violations: 2
Affected rows: [2, 3]
❌ Rule: Valid email format
Problem: Email addresses must contain @ symbol and domain
Violations: 3
Affected rows: [1, 2, 4]
❌ Rule: Reasonable customer age
Problem: Customer age must be between 13 and 120 years
Violations: 2
Affected rows: [1, 3]
❌ Rule: Non-negative spending
Problem: Total spending amount cannot be negative
Violations: 1
Affected rows: [3]

 

添加跨列验证

 

真正的业务规则通常涉及列之间的关系。自定义lambda函数可以处理复杂的验证逻辑:

def high_spender_email_required(df):
    high_spenders = df['total_spent'] > 500
    has_valid_email = df['email'].str.contains('@', na=False)
    # Passes if: (Not a high spender) OR (Has a valid email)
    return ~high_spenders | has_valid_email

validator.add_rule(Rule(
    "High Spenders Need Valid Email", 
    high_spender_email_required, 
    "Customers spending over $500 must have valid email addresses"
))

 

此规则使用布尔逻辑,要求高消费客户必须拥有有效的电子邮件,但低消费客户可以没有联系信息。表达式~high_spenders | has_valid_email意为“不是高消费者 或 有效电子邮件”,这允许低消费者无论电子邮件状态如何都能通过验证。

 

处理日期验证

 
日期验证需要仔细处理,因为日期解析可能会失败:

def valid_date_format(column, date_format='%Y-%m-%d'):
    def check_dates(df):
        # pd.to_datetime with errors='coerce' turns invalid dates into NaT (Not a Time)
        parsed_dates = pd.to_datetime(df[column], format=date_format, errors='coerce')
        # A row is valid if the original value is not null AND the parsed date is not NaT
        return df[column].notna() & parsed_dates.notna()
    return check_dates

validator.add_rule(Rule(
    "Valid Join Dates", 
    valid_date_format('join_date'), 
    "Join dates must follow YYYY-MM-DD format"
))

 

仅当原始值不为空且解析后的日期有效(即不是NaT)时,验证才通过。我们依赖pd.to_datetime中的errors='coerce'来优雅地处理格式错误的字符串(将其转换为NaT),然后使用parsed_dates.notna()来捕获它,从而移除了不必要的try-except块。

 

编写装饰器集成模式

 
对于生产管道,你可以编写装饰器模式以实现干净的集成:

def validate_dataframe(validator):
    def decorator(func):
        def wrapper(df, *args, **kwargs):
            issues = validator.validate(df)
            if issues:
                error_details = [f"{issue['rule']}: {issue['violations']} violations" for issue in issues]
                raise ValueError(f"Data validation failed: {'; '.join(error_details)}")
            return func(df, *args, **kwargs)
        return wrapper
    return decorator

# Note: 'customer_validator' needs to be defined globally or passed in a real implementation
# Assuming 'customer_validator' is the instance we built earlier
# @validate_dataframe(customer_validator)
# def process_customer_data(df):
#     return df.groupby('age').agg({'total_spent': 'sum'})

 

该装饰器确保在处理开始前数据通过验证,从而防止损坏的数据在管道中传播。装饰器会引发包含具体验证失败信息的描述性错误。代码片段中添加了注释,以指出customer_validator需要在装饰器中是可访问的。

 

扩展模式

 
根据需要,你可以扩展DSL以包含其他验证规则:

# 统计异常值检测

def within_standard_deviations(column, std_devs=3):
    # Valid if absolute difference from mean is within N standard deviations
    return lambda df: abs(df[column] - df[column].mean()) <= std_devs * df[column].std()

# 数据集间的引用完整性

def foreign_key_exists(column, reference_df, reference_column):
    # Valid if value in column is present in the reference_column of the reference_df
    return lambda df: df[column].isin(reference_df[reference_column])

# 自定义业务逻辑

def profit_margin_reasonable(df):
    # Ensures 0 <= margin <= 1
    margin = (df['revenue'] - df['cost']) / df['revenue']
    return (margin >= 0) & (margin <= 1)

 

这就是如何将验证逻辑构建为返回布尔序列的可组合函数。

这是一个在样本数据上使用我们构建的数据验证DSL的示例,假设辅助函数位于名为data_quality_dsl的模块中:

import pandas as pd
from data_quality_dsl import DataValidator, Rule, unique_values, between, matches_pattern

# Sample data
df = pd.DataFrame({
    'user_id': [1, 2, 2, 3],
    'email': ['user@test.com', 'invalid', 'user@real.com', ''],
    'age': [25, -5, 30, 150]
})

# Build validator
validator = DataValidator()
validator.add_rule(Rule("Unique users", unique_values('user_id'), "User IDs must be unique"))
validator.add_rule(Rule("Valid emails", matches_pattern('email', r'^[^@]+@[^@]+\.[^@]+$'), "Invalid email format"))
validator.add_rule(Rule("Reasonable ages", between('age', 0, 120), "Age must be 0-120"))

# Run validation
issues = validator.validate(df)
for issue in issues:
    print(f"❌ {issue['rule']}: {issue['violations']} violations")

 

结论

 
这个DSL虽然简单,但之所以有效,是因为它与数据专业人员思考验证的方式相吻合。规则以易于理解的需求形式表达业务逻辑,同时允许我们利用pandas的性能和灵活性。

关注点的分离使得验证逻辑可测试且易于维护。这种方法不需要pandas以外的外部依赖,对于已经熟悉pandas操作的人来说,也无需额外的学习曲线。

这是我在几次晚间编程冲刺和几杯咖啡(当然!)之后完成的工作。你可以将此版本用作起点,构建更酷的东西。祝你编码愉快!
 
 

Bala Priya C是来自印度的一名开发人员和技术作家。她喜欢在数学、编程、数据科学和内容创作的交叉点上工作。她的兴趣和专业领域包括DevOps、数据科学和自然语言处理。她喜欢阅读、写作、编码和咖啡!目前,她正致力于通过撰写教程、操作指南、观点文章等内容来学习并与开发者社区分享她的知识。Bala还创作引人入胜的资源概览和编码教程。




🚀 想要体验更好更全面的AI调用?

欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。

0

评论区