📢 转载信息
原文链接:https://www.kdnuggets.com/pandas-advanced-groupby-techniques-for-complex-aggregations
原文作者:Josep Ferrer
Image by Author
构建清晰的层级结构
虽然groupby().sum()
和groupby().mean()
适用于快速检查,但生产级别的指标需要更强大的解决方案。现实世界中的表格通常涉及多个键、时间序列数据、权重以及促销、退货或异常值等各种条件。
这意味着您需要频繁计算总计和比率、在每个细分市场内对项目进行排名、按日历时间段汇总数据,然后将组统计信息合并回原始行以供建模使用。本文将指导您使用Pandas库进行高级分组技术,以有效处理这些复杂场景。
选择正确的模式
使用 agg
将组缩减为单行
当您希望每组仅返回一条记录时,请使用agg
,例如总计、平均值、中位数、最小值/最大值以及自定义的向量化归约。
out = ( df.groupby(['store', 'cat'], as_index=False, sort=False) .agg(sales=('rev', 'sum'), orders=('order_id', 'nunique'), avg_price=('price', 'mean')) )
这适用于关键绩效指标(KPI)表格、每周汇总和多指标摘要。
使用 transform
将统计信息广播回行
transform
方法返回与输入形状相同的(Broadcast)结果。它非常适合创建每行所需的特征,例如Z分数、组内份额或组级填充。
g = df.groupby('store')['rev'] df['rev_z'] = (df['rev'] - g.transform('mean')) / g.transform('std') df['rev_share'] = df['rev'] / g.transform('sum')
这适用于建模特征、质量保证比率和插补。
使用 apply
进行自定义的每组逻辑
仅当所需逻辑无法用内置函数表达时,才使用apply
。它的速度较慢且难以优化,因此您应首先尝试agg
或transform
。
def capped_mean(s): q1, q3 = s.quantile([.25, .75]) return s.clip(q1, q3).mean() df.groupby('store')['rev'].apply(capped_mean)
这适用于特殊的规则和较小的组。
使用 filter
保留或丢弃整个组
filter
方法允许整个组通过或失败某个条件。这对于数据质量规则和阈值设置非常方便。
big = df.groupby('store').filter(lambda g: g['order_id'].nunique() >= 100)
这适用于最小规模的用户群组以及在聚合前移除稀疏类别。
多键分组和命名聚合
按多个键分组
您可以控制输出的形状和顺序,以便结果可以直接导入商业智能(BI)工具。
g = df.groupby(['store', 'cat'], as_index=False, sort=False, observed=True)
as_index=False
返回一个扁平的DataFrame,更易于连接和导出sort=False
避免重新排序组,当顺序不重要时可以节省工作量observed=True
(与分类列一起使用时)会丢弃未使用的类别对
使用命名聚合
命名聚合会产生类似SQL中可读的列名。
out = ( df.groupby(['store', 'cat']) .agg(sales=('rev', 'sum'), orders=('order_id', 'nunique'), # 在此处使用您的ID列 avg_price=('price', 'mean')) )
整理列
如果堆叠多个聚合操作,您将得到一个MultiIndex。将其展平一次并标准化列顺序。
out = out.reset_index() out.columns = [ '_'.join(c) if isinstance(c, tuple) else c for c in out.columns ] # 可选:确保业务友好的列顺序 cols = ['store', 'cat', 'orders', 'sales', 'avg_price'] out = out[cols]
在不使用 apply 的情况下进行条件聚合
在 agg
中使用布尔掩码数学运算
当掩码依赖于其他列时,请按索引对齐数据。
# 按(store, cat)计算促销销售额和促销率 cond = df['is_promo'] out = df.groupby(['store', 'cat']).agg( promo_sales=('rev', lambda s: s[cond.loc[s.index]].sum()), promo_rate=('is_promo', 'mean') # 促销行所占的比例 )
计算比率和比例
比率是简单的sum(mask) / size
,相当于布尔列的平均值。
df['is_return'] = df['status'].eq('returned') rates = df.groupby('store').agg(return_rate=('is_return', 'mean'))
创建类似用户群组的窗口
首先,使用日期边界预计算掩码,然后聚合数据。
# 示例:每个客户在首次购买后的30天内重复购买 first_ts = df.groupby('customer_id')['ts'].transform('min') within_30 = (df['ts'] <= first_ts + pd.Timedelta('30D')) & (df['ts'] > first_ts) # 客户群组 = 首次购买的月份 df['cohort'] = first_ts.dt.to_period('M').astype(str) repeat_30_rate = ( df.groupby('cohort') .agg(repeat_30_rate=('within_30', 'mean')) .rename_axis(None) )
每组的加权指标
实现加权平均模式
向量化数学运算,并防止除以零权重。
import numpy as np tmp = df.assign(wx=df['price'] * df['qty']) agg = tmp.groupby(['store', 'cat']).agg(wx=('wx', 'sum'), w=('qty', 'sum')) # 每个(store, cat)的加权平均价格 agg['wavg_price'] = np.where(agg['w'] > 0, agg['wx'] / agg['w'], np.nan)
安全处理 NaN 值
决定对空组或全为NaN
的值返回什么。两种常见选择是:
# 1) 返回 NaN(透明,对下游统计数据最安全) agg['wavg_price'] = np.where(agg['w'] > 0, agg['wx'] / agg['w'], np.nan) # 2) 如果所有权重都为零,则回退到未加权平均值(明确的策略) mean_price = df.groupby(['store', 'cat'])['price'].mean() agg['wavg_price_safe'] = np.where( agg['w'] > 0, agg['wx'] / agg['w'], mean_price.reindex(agg.index).to_numpy() )
时间感知分组
使用 pd.Grouper 和频率
通过将时间序列数据分组到特定时间间隔中,来尊重KPI的日历边界。
weekly = df.groupby(['store', pd.Grouper(key='ts', freq='W')], observed=True).agg( sales=('rev', 'sum'), orders=('order_id', 'nunique') )
对每组应用滚动/扩展窗口
始终先对数据进行排序,并在时间戳列上对齐。
df = df.sort_values(['customer_id', 'ts']) df['rev_30d_mean'] = ( df.groupby('customer_id') .rolling('30D', on='ts')['rev'].mean() .reset_index(level=0, drop=True) )
避免数据泄露
保持时间顺序,并确保窗口仅“看到”过去的数据。不要打乱时间序列数据,也不要在将数据集拆分进行训练和测试之前,对完整数据集计算组统计信息。
组内排名和 Top-N
查找每组的 Top-k 行
以下是选择每组前 N 行的两个实用选项。
# 排序 + head top3 = (df.sort_values(['cat', 'rev'], ascending=[True, False]) .groupby('cat') .head(3)) # 对单个指标进行每组 nlargest top3_alt = (df.groupby('cat', group_keys=False) .apply(lambda g: g.nlargest(3, 'rev')))
使用辅助函数
Pandas提供了一些用于排名和选择的辅助函数。
rank
—控制如何处理平局(例如,method='dense'
或'first'
),并可以通过pct=True
计算百分位排名。
df['rev_rank_in_cat'] = df.groupby('cat')['rev'].rank(method='dense', ascending=False)
cumcount
—提供每行在其组内的0基位置。
df['pos_in_store'] = df.groupby('store').cumcount()
nth
—在不排序整个DataFrame的情况下获取每组的第 k 行。
second_row = df.groupby('store').nth(1) # 每家商店存在的第二行
使用 transform 广播特征
执行组级归一化
在每组内标准化一个指标,以便可以跨不同组比较行。
g = df.groupby('store')['rev'] df['rev_z'] = (df['rev'] - g.transform('mean')) / g.transform('std')
插补缺失值
使用组统计信息填充缺失值。这通常比使用全局填充值更能保持分布的真实性。
df['price'] = df['price'].fillna(df.groupby('cat')['price'].transform('median'))
创建组内份额特征
将原始数字转换为组内比例,以实现更清晰的比较。
df['rev_share_in_store'] = df['rev'] / df.groupby('store')['rev'].transform('sum')
处理类别、空组和缺失数据
使用分类类型提高速度
如果您的键来自固定集合(例如,商店、区域、产品类别),请先将它们转换为分类类型。这使得GroupBy操作更快、更节省内存。
from pandas.api.types import CategoricalDtype store_type = CategoricalDtype(categories=sorted(df['store'].dropna().unique()), ordered=False) df['store'] = df['store'].astype(store_type) cat_type = CategoricalDtype(categories=['Grocery', 'Electronics', 'Home', 'Clothing', 'Sports']) df['cat'] = df['cat'].astype(cat_type)
删除未使用的组合
当按分类列分组时,设置observed=True
会排除数据中未实际出现的类别对,从而产生更干净、噪声更少的输出。
out = df.groupby(['store', 'cat'], observed=True).size().reset_index(name='n')
使用 NaN
键分组
明确您如何处理缺失的键。默认情况下,Pandas会丢弃NaN
组;仅当它有助于质量保证流程时才保留它们。
# 默认值:NaN 键被丢弃 by_default = df.groupby('region').size() # 保留 NaN 作为其自己的组,当您需要审计缺失的键时 kept = df.groupby('region', dropna=False).size()
快速备忘单
计算条件比率/每组
# 布尔值的平均值即为比率 df.groupby(keys).agg(rate=('flag', 'mean')) # 或明确地:sum(mask)/size df.groupby(keys).agg(rate=('flag', lambda s: s.sum() / s.size))
计算加权平均值
df.assign(wx=df[x] * df[w]) .groupby(keys) .apply(lambda g: g['wx'].sum() / g[w].sum() if g[w].sum() else np.nan) .rename('wavg')
查找每组 Top-k 行
(df.sort_values([key, metric], ascending=[True, False]) .groupby(key) .head(k)) # 或 df.groupby(key, group_keys=False).apply(lambda g: g.nlargest(k, metric))
计算每周指标
df.groupby([key, pd.Grouper(key='ts', freq='W')], observed=True).agg(...)
执行组级填充
df[col] = df[col].fillna(df.groupby(keys)[col].transform('median'))
计算组内份额
df['share'] = df[val] / df.groupby(keys)[val].transform('sum')
总结
首先,为您要执行的任务选择正确的模式:使用agg
进行归约,使用transform
进行广播,仅在无法实现向量化时才保留apply
。利用pd.Grouper
处理基于时间的桶,并利用排名辅助函数进行 Top-N 选择。通过优先采用清晰的向量化模式,您可以确保输出是扁平的、命名清晰的且易于测试,从而保证指标的正确性并使您的Notebook运行得更快。
Josep Ferrer 是一位来自巴塞罗那的分析工程师。他拥有物理工程学位,目前从事应用于人类移动性领域的数据科学工作。他是一位兼职内容创作者,专注于数据科学和技术。Josep撰写所有与人工智能相关的内容,涵盖该领域爆炸性发展的应用。
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区