📢 转载信息
原文链接:https://www.kdnuggets.com/building-a-simple-data-quality-dsl-in-python
原文作者:Bala Priya C
Image by Author
构建一个简单的数据质量DSL
在Python中,数据验证代码往往难以维护。业务规则被深层嵌套的if语句掩盖,验证逻辑与错误处理混杂在一起,添加新的检查意味着需要在过程式函数中摸索合适的位置插入代码。是的,你可以使用数据验证框架,但我们将专注于使用Python构建一些非常简单但实用的东西。
我们将通过创建一个专门用于数据验证的词汇表,来编写一种简化的领域特定语言(DSL)。与其编写通用的Python代码,不如构建专门的函数和类,用与你思考问题方式相匹配的术语来表达验证规则。
对于数据验证而言,这意味着规则读起来就像业务需求:“客户年龄必须在18到120岁之间”,或者“电子邮件地址必须包含@符号,并且应具有有效的域名”。你希望DSL来处理检查数据和报告违规行为的底层机制,而你则专注于表达什么是有效数据。最终得到的验证逻辑是可读、易于维护和测试,且易于扩展的。那么,让我们开始编码吧!
为什么构建DSL?
考虑使用Python验证客户数据:
def validate_customers(df): errors = [] if df['customer_id'].duplicated().any(): errors.append("Duplicate IDs") if (df['age'] < 0).any(): errors.append("Negative ages") if not df['email'].str.contains('@').all(): errors.append("Invalid emails") return errors
这种方法将验证逻辑硬编码,将业务规则与错误处理混合在一起,并且随着规则的增多,它将变得难以维护。相反,我们希望编写一个DSL来分离关注点并创建可重用的验证组件。
而不是编写过程式的验证函数,DSL允许你表达读起来像业务需求的规则:
# 传统方法 if df['age'].min() < 0 or df['age'].max() > 120: raise ValueError("Invalid ages found") # DSL方法 validator.add_rule(Rule("Valid ages", between('age', 0, 120), "Ages must be 0-120"))
DSL方法将你正在验证的内容(业务规则)与如何处理违规行为(错误报告)分离开来。这使得验证逻辑可测试、可重用,并且非程序员也能读懂。
创建样本数据集
首先,启动一个包含常见质量问题的、真实的电子商务客户数据样本:
import pandas as pd customers = pd.DataFrame({ 'customer_id': [101, 102, 103, 103, 105], 'email': ['john@gmail.com', 'invalid-email', '', 'sarah@yahoo.com', 'mike@domain.co'], 'age': [25, -5, 35, 200, 28], 'total_spent': [250.50, 1200.00, 0.00, -50.00, 899.99], 'join_date': ['2023-01-15', '2023-13-45', '2023-02-20', '2023-02-20', ''] }) # Note: 2023-13-45 is an intentionally malformed date.
该数据集包含重复的客户ID、无效的电子邮件格式、不合常理的年龄、负数的消费金额以及格式错误的日期。这对于测试验证规则来说应该足够好了。
编写验证逻辑
// 创建 Rule 类
让我们开始编写一个简单的Rule类来封装验证逻辑:
class Rule:
def __init__(self, name, condition, error_msg):
self.name = name
self.condition = condition
self.error_msg = error_msg
def check(self, df):
# condition函数返回True表示有效行。
# 我们使用~(按位非)来选择违反条件的行。
violations = df[~self.condition(df)]
if not violations.empty:
return {
'rule': self.name,
'message': self.error_msg,
'violations': len(violations),
'sample_rows': violations.head(3).index.tolist()
}
return None
condition参数接受任何接受DataFrame并返回布尔Series的函数,以指示有效行。波浪号(~)操作符会反转这个布尔Series以识别违规行为。当存在违规时,check方法返回详细信息,包括规则名称、错误消息、违规计数以及用于调试的样本行索引。
这种设计将验证逻辑与错误报告分离开来。condition函数只关注业务规则,而Rule类则一致地处理错误详情。
// 添加多个规则
接下来,我们编写一个DataValidator类来管理规则集合:
class DataValidator:
def __init__(self):
self.rules = []
def add_rule(self, rule):
self.rules.append(rule)
return self # 启用方法链式调用
def validate(self, df):
results = []
for rule in self.rules:
violation = rule.check(df)
if violation:
results.append(violation)
return results
add_rule方法返回self以支持方法链式调用。validate方法独立执行所有规则并收集违规报告。这种方法确保了一条规则失败不会阻止其他规则的执行。
// 构建可读的条件
回想一下,在实例化Rule类对象时,我们也需要一个condition函数。它可以是任何接受DataFrame并返回布尔Series的函数。虽然简单的lambda函数可行,但可读性不高。因此,我们编写辅助函数来创建可读的验证词汇表:
def not_null(column):
return lambda df: df[column].notna()
def unique_values(column):
return lambda df: ~df.duplicated(subset=[column], keep=False)
def between(column, min_val, max_val):
return lambda df: df[column].between(min_val, max_val)
每个辅助函数都返回一个与pandas布尔操作配合使用的lambda函数。
not_null辅助函数使用pandas的notna()方法来识别非空值。unique_values辅助函数使用duplicated(..., keep=False)和subset参数来标记所有重复出现的记录,从而实现更准确的违规计数。between辅助函数使用pandas的between()方法来自动处理范围检查。
对于模式匹配,正则表达式会变得很直接:
import re
def matches_pattern(column, pattern):
return lambda df: df[column].str.match(pattern, na=False)
na=False参数确保缺失值被视为验证失败而不是匹配,这通常是必需字段所需行为。
为样本数据集构建数据验证器
现在让我们为客户数据集构建一个验证器,看看这个DSL是如何工作的:
validator = DataValidator()
validator.add_rule(Rule(
"Unique customer IDs",
unique_values('customer_id'),
"Customer IDs must be unique across all records"
))
validator.add_rule(Rule(
"Valid email format",
matches_pattern('email', r'^[^@\s]+@[^@\s]+\.[^@\s]+$'),
"Email addresses must contain @ symbol and domain"
))
validator.add_rule(Rule(
"Reasonable customer age",
between('age', 13, 120),
"Customer age must be between 13 and 120 years"
))
validator.add_rule(Rule(
"Non-negative spending",
lambda df: df['total_spent'] >= 0,
"Total spending amount cannot be negative"
))
每条规则都遵循相同的模式:描述性名称、验证条件和错误消息。
- 第一条规则使用
unique_values辅助函数来检查重复的客户ID。 - 第二条规则应用正则表达式模式匹配来验证电子邮件格式。该模式要求@符号前后至少有一个字符,以及一个域名扩展。
- 第三条规则使用
between辅助函数进行范围验证,为客户设置合理的年龄限制。 - 最后一条规则使用lambda函数进行内联条件检查,确保
total_spent值非负。
请注意,每条规则读起来几乎就像一条业务需求。验证器收集这些规则,并可以对任何具有匹配列名称的DataFrame执行它们:
issues = validator.validate(customers)
for issue in issues:
print(f"❌ Rule: {issue['rule']}")
print(f"Problem: {issue['message']}")
print(f"Affected rows: {issue['sample_rows']}")
print()
输出清晰地指出了数据集中的具体问题及其位置,使调试变得简单明了。对于样本数据,你将得到以下输出:
Validation Results:
❌ Rule: Unique customer IDs
Problem: Customer IDs must be unique across all records
Violations: 2
Affected rows: [2, 3]
❌ Rule: Valid email format
Problem: Email addresses must contain @ symbol and domain
Violations: 3
Affected rows: [1, 2, 4]
❌ Rule: Reasonable customer age
Problem: Customer age must be between 13 and 120 years
Violations: 2
Affected rows: [1, 3]
❌ Rule: Non-negative spending
Problem: Total spending amount cannot be negative
Violations: 1
Affected rows: [3]
添加跨列验证
真正的业务规则通常涉及列之间的关系。自定义lambda函数可以处理复杂的验证逻辑:
def high_spender_email_required(df):
high_spenders = df['total_spent'] > 500
has_valid_email = df['email'].str.contains('@', na=False)
# Passes if: (Not a high spender) OR (Has a valid email)
return ~high_spenders | has_valid_email
validator.add_rule(Rule(
"High Spenders Need Valid Email",
high_spender_email_required,
"Customers spending over $500 must have valid email addresses"
))
此规则使用布尔逻辑,要求高消费客户必须拥有有效的电子邮件,但低消费客户可以没有联系信息。表达式~high_spenders | has_valid_email意为“不是高消费者 或 有效电子邮件”,这允许低消费者无论电子邮件状态如何都能通过验证。
处理日期验证
日期验证需要仔细处理,因为日期解析可能会失败:
def valid_date_format(column, date_format='%Y-%m-%d'):
def check_dates(df):
# pd.to_datetime with errors='coerce' turns invalid dates into NaT (Not a Time)
parsed_dates = pd.to_datetime(df[column], format=date_format, errors='coerce')
# A row is valid if the original value is not null AND the parsed date is not NaT
return df[column].notna() & parsed_dates.notna()
return check_dates
validator.add_rule(Rule(
"Valid Join Dates",
valid_date_format('join_date'),
"Join dates must follow YYYY-MM-DD format"
))
仅当原始值不为空且解析后的日期有效(即不是NaT)时,验证才通过。我们依赖pd.to_datetime中的errors='coerce'来优雅地处理格式错误的字符串(将其转换为NaT),然后使用parsed_dates.notna()来捕获它,从而移除了不必要的try-except块。
编写装饰器集成模式
对于生产管道,你可以编写装饰器模式以实现干净的集成:
def validate_dataframe(validator):
def decorator(func):
def wrapper(df, *args, **kwargs):
issues = validator.validate(df)
if issues:
error_details = [f"{issue['rule']}: {issue['violations']} violations" for issue in issues]
raise ValueError(f"Data validation failed: {'; '.join(error_details)}")
return func(df, *args, **kwargs)
return wrapper
return decorator
# Note: 'customer_validator' needs to be defined globally or passed in a real implementation
# Assuming 'customer_validator' is the instance we built earlier
# @validate_dataframe(customer_validator)
# def process_customer_data(df):
# return df.groupby('age').agg({'total_spent': 'sum'})
该装饰器确保在处理开始前数据通过验证,从而防止损坏的数据在管道中传播。装饰器会引发包含具体验证失败信息的描述性错误。代码片段中添加了注释,以指出customer_validator需要在装饰器中是可访问的。
扩展模式
根据需要,你可以扩展DSL以包含其他验证规则:
# 统计异常值检测
def within_standard_deviations(column, std_devs=3):
# Valid if absolute difference from mean is within N standard deviations
return lambda df: abs(df[column] - df[column].mean()) <= std_devs * df[column].std()
# 数据集间的引用完整性
def foreign_key_exists(column, reference_df, reference_column):
# Valid if value in column is present in the reference_column of the reference_df
return lambda df: df[column].isin(reference_df[reference_column])
# 自定义业务逻辑
def profit_margin_reasonable(df):
# Ensures 0 <= margin <= 1
margin = (df['revenue'] - df['cost']) / df['revenue']
return (margin >= 0) & (margin <= 1)
这就是如何将验证逻辑构建为返回布尔序列的可组合函数。
这是一个在样本数据上使用我们构建的数据验证DSL的示例,假设辅助函数位于名为data_quality_dsl的模块中:
import pandas as pd
from data_quality_dsl import DataValidator, Rule, unique_values, between, matches_pattern
# Sample data
df = pd.DataFrame({
'user_id': [1, 2, 2, 3],
'email': ['user@test.com', 'invalid', 'user@real.com', ''],
'age': [25, -5, 30, 150]
})
# Build validator
validator = DataValidator()
validator.add_rule(Rule("Unique users", unique_values('user_id'), "User IDs must be unique"))
validator.add_rule(Rule("Valid emails", matches_pattern('email', r'^[^@]+@[^@]+\.[^@]+$'), "Invalid email format"))
validator.add_rule(Rule("Reasonable ages", between('age', 0, 120), "Age must be 0-120"))
# Run validation
issues = validator.validate(df)
for issue in issues:
print(f"❌ {issue['rule']}: {issue['violations']} violations")
结论
这个DSL虽然简单,但之所以有效,是因为它与数据专业人员思考验证的方式相吻合。规则以易于理解的需求形式表达业务逻辑,同时允许我们利用pandas的性能和灵活性。
关注点的分离使得验证逻辑可测试且易于维护。这种方法不需要pandas以外的外部依赖,对于已经熟悉pandas操作的人来说,也无需额外的学习曲线。
这是我在几次晚间编程冲刺和几杯咖啡(当然!)之后完成的工作。你可以将此版本用作起点,构建更酷的东西。祝你编码愉快!
Bala Priya C是来自印度的一名开发人员和技术作家。她喜欢在数学、编程、数据科学和内容创作的交叉点上工作。她的兴趣和专业领域包括DevOps、数据科学和自然语言处理。她喜欢阅读、写作、编码和咖啡!目前,她正致力于通过撰写教程、操作指南、观点文章等内容来学习并与开发者社区分享她的知识。Bala还创作引人入胜的资源概览和编码教程。
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区