在机器学习系统中,特征工程是连接原始数据与模型训练的关键环节。特征生产架构需要同时支持两类截然不同的计算模式:批式特征生产(Batch Feature)基于历史数据离线计算,为模型训练和回测提供高质量特征;流式特征生产(Streaming Feature)基于实时事件流在线计算,为模型推理提供低延迟特征。本文从架构设计出发,深入解析批流特征生产的核心挑战、主流架构方案、一致性保障机制,以及美团、字节、阿里等大厂的工程实践。
特征生产的核心挑战
特征生产架构面临的挑战远比看起来复杂。理解这些挑战,是设计合理架构的前提。
批流二义性问题
同一个业务特征,在训练时用历史数据批量计算,在线上推理时用实时数据流式计算,两套计算逻辑需要保持严格一致。这就是著名的训练-服务偏差(Training-Serving Skew)问题。
以"用户最近7天点击次数"为例:
- 批式计算:从 Hive 读取 7 天日志,按 user_id 分组 COUNT,结果写入特征存储
- 流式计算:消费 Kafka 点击事件流,维护滑动窗口计数,实时更新特征存储
如果批式和流式的计算逻辑存在细微差异(如时区处理、去重逻辑、NULL 值处理),训练数据和在线特征就会产生系统性偏差,导致模型在线下评估效果好但上线后效果差。
时间语义一致性
特征计算中的时间处理是另一个核心难题:
- 事件时间 vs 处理时间:用户在 23:59:50 发生的行为,可能因网络延迟在 00:00:10 才到达处理系统。批式计算可以用事件时间(Event Time)精确还原历史,流式计算则面临乱序数据和 Watermark 设置的权衡
- 特征时间点(Point-in-Time):训练样本的特征值必须是样本发生时刻的特征值,而不是当前最新值。例如,预测用户是否会购买时,特征应该是用户下单前一刻的历史行为,而非下单后的行为
- 窗口语义:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)在批式和流式中的语义必须严格对齐
特征新鲜度与计算成本
不同特征对新鲜度的要求差异巨大,而更高的新鲜度意味着更高的计算成本:
| 特征类型 | 示例 | 新鲜度要求 | 计算方式 |
|---|---|---|---|
| 静态特征 | 用户性别、注册城市 | 天级 | 批式,T+1 |
| 慢变特征 | 用户30天购买次数 | 小时级 | 批式,每小时 |
| 快变特征 | 用户当日浏览次数 | 分钟级 | 流式,近实时 |
| 实时特征 | 用户当前会话行为 | 秒级 | 流式,毫秒延迟 |
批式特征生产架构
批式特征生产是特征工程的基础,处理历史数据的大规模离线计算,为模型训练提供高质量特征。
批式特征生产流程
典型的批式特征生产流程分为以下几个阶段:
# 批式特征生产典型流程
数据源层:
- ODS(原始日志):用户行为日志、订单数据、商品数据
- DWD(明细层):清洗后的事实表,按事件时间分区
- DWS(汇总层):按业务主题聚合的宽表
特征计算层:
- 特征 SQL/DSL 定义:声明式描述特征计算逻辑
- 调度引擎:Airflow/DolphinScheduler 管理 DAG 依赖
- 计算引擎:Spark/Hive 执行大规模特征计算
特征存储层:
- 离线特征存储:Hive/Iceberg 存储历史特征快照
- 在线特征存储:Redis/HBase 存储最新特征供在线推理
- 特征向量存储:Faiss/Milvus 存储 Embedding 特征
训练数据生成:
- 样本拼接:将训练样本与 Point-in-Time 特征关联
- 特征归一化:标准化、归一化、分桶等预处理
- 训练集/验证集分割:按时间切分,避免数据泄漏
Point-in-Time Join
Point-in-Time(PIT)Join 是批式特征生产中最重要也最容易出错的环节。它解决的问题是:对于每一条训练样本,如何获取样本发生时刻的正确特征值,而不引入"未来数据泄漏"。
-- 错误做法:直接 JOIN 最新特征(引入数据泄漏)
SELECT
s.user_id,
s.item_id,
s.label,
f.user_30d_click_cnt -- 这是当前最新值,不是样本时刻的值!
FROM training_samples s
JOIN user_features f ON s.user_id = f.user_id;
-- 正确做法:Point-in-Time Join(使用样本时刻的历史特征快照)
SELECT
s.user_id,
s.item_id,
s.label,
s.event_time,
-- 找到样本时刻之前最近的特征快照
f.user_30d_click_cnt
FROM training_samples s
ASOF JOIN user_feature_history f
ON s.user_id = f.user_id
AND f.feature_time <= s.event_time -- 只取样本时刻之前的特征
QUALIFY ROW_NUMBER() OVER (
PARTITION BY s.user_id, s.event_time
ORDER BY f.feature_time DESC -- 取最近的一个快照
) = 1;
实现 PIT Join 的核心挑战在于特征历史快照的存储策略:
- 完整快照:每天存储所有特征的完整快照,存储成本高但查询简单
- 增量快照:只存储特征变化记录(Slowly Changing Dimension Type 2),存储高效但查询需要 ASOF JOIN
- Iceberg Time Travel:利用 Apache Iceberg 的时间旅行(Time Travel)能力,直接查询历史版本的特征表,无需维护快照
-- 使用 Iceberg Time Travel 实现 PIT Join
-- 查询 2024-01-15 10:00:00 时刻的用户特征
SELECT * FROM user_features
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-15 10:00:00';
-- 在 Spark 中使用 Iceberg Time Travel
spark.read
.option("as-of-timestamp", "1705312800000") -- Unix timestamp in ms
.table("user_features")
大规模批式特征计算优化
当特征数量达到千级、用户规模达到亿级时,批式特征计算的性能优化至关重要:
特征计算复用:多个特征可能共享相同的中间计算结果(如用户行为序列的聚合),通过 DAG 优化将公共子表达式提取为临时表,避免重复扫描数据。
# 特征计算 DAG 示例(使用 Feathr/Feast 风格的 DSL)
from feathr import Feature, WindowAggTransformation, TypedKey
# 定义用户行为聚合特征
user_key = TypedKey(key_column="user_id", key_column_type=ValueType.INT64)
# 这些特征共享相同的数据源,计算引擎会自动合并扫描
user_click_7d = Feature(
name="user_click_7d",
key=user_key,
feature_type=INT32,
transform=WindowAggTransformation(
agg_expr="count(item_id)",
agg_func="COUNT",
window="7d"
)
)
user_click_30d = Feature(
name="user_click_30d",
key=user_key,
feature_type=INT32,
transform=WindowAggTransformation(
agg_expr="count(item_id)",
agg_func="COUNT",
window="30d"
)
)
user_gmv_7d = Feature(
name="user_gmv_7d",
key=user_key,
feature_type=FLOAT,
transform=WindowAggTransformation(
agg_expr="sum(order_amount)",
agg_func="SUM",
window="7d"
)
)
分区裁剪与谓词下推:特征计算 SQL 应充分利用 Hive/Iceberg 的分区裁剪,只扫描必要的数据分区。确保 WHERE 条件中的时间过滤能够下推到存储层。
增量特征计算:对于窗口聚合特征(如30天点击次数),每天只需要增量更新,而非全量重算:
- 新增今天的行为数据
- 减去31天前(窗口外)的行为数据
- 维护一个累积计数表,每天做增量更新
-- 增量更新30天点击计数
-- 第一步:加上今天新增的点击
INSERT INTO user_feature_incremental
SELECT
user_id,
COUNT(*) AS click_delta,
'add' AS op_type,
current_date AS dt
FROM user_click_log
WHERE dt = current_date
GROUP BY user_id;
-- 第二步:减去31天前(已出窗口)的点击
INSERT INTO user_feature_incremental
SELECT
user_id,
-COUNT(*) AS click_delta, -- 负数表示减少
'remove' AS op_type,
current_date AS dt
FROM user_click_log
WHERE dt = date_sub(current_date, 31) -- 31天前的数据
GROUP BY user_id;
-- 第三步:更新特征表
UPDATE user_features f
SET user_30d_click_cnt = f.user_30d_click_cnt + delta.total_delta
FROM (
SELECT user_id, SUM(click_delta) AS total_delta
FROM user_feature_incremental
WHERE dt = current_date
GROUP BY user_id
) delta
WHERE f.user_id = delta.user_id;
流式特征生产架构
流式特征生产基于实时事件流,在线计算低延迟特征,是推荐、广告、风控等实时决策系统的核心基础设施。
流式特征生产架构设计
流式特征生产的典型架构由三层组成:
# 流式特征生产架构
事件采集层:
数据源:
- 用户行为日志(埋点事件)→ Kafka Topic: user_behavior
- 订单事件 → Kafka Topic: order_events
- 商品变更 → Kafka Topic: item_updates
采集方式:
- 客户端 SDK 直接上报
- 服务端日志通过 Filebeat/Fluentd 收集
- 数据库 CDC(Change Data Capture)通过 Canal/Debezium
流式计算层:
计算引擎:Flink(主流)/ Spark Structured Streaming
计算类型:
- 无状态转换:字段提取、类型转换、特征组合
- 有状态聚合:窗口计数、求和、去重、TopN
- 跨流 Join:行为流 JOIN 商品属性流
状态管理:
- 状态后端:RocksDB(大状态)/ HashMapStateBackend(小状态)
- Checkpoint:定期持久化状态到 HDFS/S3
特征存储层:
在线存储(低延迟读取):
- Redis:毫秒级读取,适合简单 KV 特征
- Tair(阿里云)/ Cellar(美团):Redis 兼容,支持大容量
- HBase:适合宽表特征,支持列族
特征服务层:
- Feature Server:统一特征读取接口,支持批量查询
- 特征缓存:本地 Cache 降低特征存储压力
Flink 流式特征计算
Apache Flink 是流式特征生产的主流选择,其精确一次(Exactly-Once)语义和强大的状态管理能力是核心优势。
// Flink 流式特征计算示例:用户实时行为窗口聚合
public class UserBehaviorFeatureJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 Checkpoint(精确一次语义)
env.enableCheckpointing(60000); // 每 60 秒 Checkpoint 一次
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
// 配置 RocksDB 状态后端(适合大状态)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://namenode/flink/checkpoints");
// 读取 Kafka 用户行为事件
KafkaSource<UserBehaviorEvent> source = KafkaSource.<UserBehaviorEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("user_behavior")
.setGroupId("feature-job")
.setStartingOffsets(OffsetsInitializer.committedOffsets())
.setValueOnlyDeserializer(new UserBehaviorDeserializer())
.build();
DataStream<UserBehaviorEvent> behaviorStream = env
.fromSource(source, WatermarkStrategy
// 允许 30 秒的乱序延迟
.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, ts) -> event.getEventTime()),
"Kafka User Behavior Source");
// 计算用户最近 1 小时点击次数(滑动窗口,每 5 分钟更新)
DataStream<UserFeature> clickFeature = behaviorStream
.filter(e -> "click".equals(e.getBehaviorType()))
.keyBy(UserBehaviorEvent::getUserId)
.window(SlidingEventTimeWindows.of(
Time.hours(1), // 窗口大小:1 小时
Time.minutes(5) // 滑动步长:5 分钟
))
.aggregate(new CountAggregator(), new FeatureWindowFunction())
.name("User 1h Click Count");
// 计算用户最近 24 小时 GMV(滚动窗口)
DataStream<UserFeature> gmvFeature = behaviorStream
.filter(e -> "order".equals(e.getBehaviorType()))
.keyBy(UserBehaviorEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(24)))
.aggregate(new SumAggregator("order_amount"), new FeatureWindowFunction())
.name("User 24h GMV");
// 将特征写入 Redis
clickFeature.addSink(new RedisSink<>(redisConfig, new FeatureRedisMapper()));
gmvFeature.addSink(new RedisSink<>(redisConfig, new FeatureRedisMapper()));
env.execute("User Behavior Feature Job");
}
}
// 特征聚合函数
public class CountAggregator implements AggregateFunction<UserBehaviorEvent, Long, Long> {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(UserBehaviorEvent event, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) { return accumulator; }
@Override
public Long merge(Long a, Long b) { return a + b; }
}
流式特征的状态管理
流式特征计算中,状态管理是性能和正确性的关键:
状态类型选择:
- ValueState:存储单个值,适合累计计数、最近一次行为
- MapState:存储 Key-Value 映射,适合用户多维度特征聚合
- ListState:存储列表,适合用户行为序列(需要控制长度上限)
- AggregatingState:内置聚合逻辑,适合窗口聚合特征
// 使用 KeyedProcessFunction 实现自定义状态特征
public class UserSessionFeatureFunction
extends KeyedProcessFunction<String, UserBehaviorEvent, UserFeature> {
// 用户当前会话内的点击序列(最多保留50个)
private ListState<String> sessionClickSequence;
// 用户当前会话开始时间
private ValueState<Long> sessionStartTime;
// 会话超时定时器(30分钟无行为视为会话结束)
private ValueState<Long> sessionTimer;
@Override
public void open(Configuration config) {
sessionClickSequence = getRuntimeContext().getListState(
new ListStateDescriptor<>("session_clicks", String.class));
sessionStartTime = getRuntimeContext().getState(
new ValueStateDescriptor<>("session_start", Long.class));
sessionTimer = getRuntimeContext().getState(
new ValueStateDescriptor<>("session_timer", Long.class));
}
@Override
public void processElement(UserBehaviorEvent event, Context ctx,
Collector<UserFeature> out) throws Exception {
long currentTime = event.getEventTime();
// 初始化会话
if (sessionStartTime.value() == null) {
sessionStartTime.update(currentTime);
}
// 更新点击序列(保留最近50个)
List<String> clicks = new ArrayList<>();
sessionClickSequence.get().forEach(clicks::add);
clicks.add(event.getItemId());
if (clicks.size() > 50) clicks = clicks.subList(clicks.size() - 50, clicks.size());
sessionClickSequence.update(clicks);
// 注册/更新会话超时定时器(30分钟)
Long oldTimer = sessionTimer.value();
if (oldTimer != null) ctx.timerService().deleteEventTimeTimer(oldTimer);
long newTimer = currentTime + 30 * 60 * 1000L;
ctx.timerService().registerEventTimeTimer(newTimer);
sessionTimer.update(newTimer);
// 输出实时特征
out.collect(UserFeature.builder()
.userId(event.getUserId())
.featureName("session_click_count")
.featureValue(String.valueOf(clicks.size()))
.updateTime(currentTime)
.build());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<UserFeature> out) throws Exception {
// 会话超时,清空状态
sessionClickSequence.clear();
sessionStartTime.clear();
sessionTimer.clear();
}
}
流式特征写入优化
流式特征的写入需要在延迟、吞吐量和一致性之间权衡:
// 异步批量写入 Redis(提升吞吐,降低延迟影响)
public class AsyncRedisSink extends RichAsyncFunction<UserFeature, Void> {
private transient RedisClusterAsyncCommands<String, String> redisCommands;
@Override
public void open(Configuration config) {
RedisClusterClient client = RedisClusterClient.create("redis://redis:6379");
StatefulRedisClusterConnection<String, String> connection = client.connect();
redisCommands = connection.async();
}
@Override
public void asyncInvoke(UserFeature feature,
ResultFuture<Void> resultFuture) {
String key = "feature:" + feature.getUserId();
String field = feature.getFeatureName();
String value = feature.getFeatureValue();
// 使用 HSET 写入 Hash 结构,一个 Key 存储用户所有特征
redisCommands.hset(key, field, value)
.thenAccept(result -> {
// 设置 TTL(避免内存无限增长)
redisCommands.expire(key, 86400); // 24小时过期
resultFuture.complete(Collections.emptyList());
})
.exceptionally(ex -> {
resultFuture.completeExceptionally(ex);
return null;
});
}
}
批流一体特征架构
批流一体(Lambda Architecture 的演进)是解决训练-服务偏差的根本方案,其核心思想是用同一套代码逻辑同时驱动批式和流式计算。
Lambda 架构的问题
传统 Lambda 架构将批式层(Batch Layer)和速度层(Speed Layer)分开维护,导致:
- 双份代码:同一个业务逻辑需要用 Hive SQL 和 Flink SQL 各写一遍,维护成本高
- 语义不一致:两套代码难以保证完全相同的计算语义,是训练-服务偏差的根源
- 数据合并复杂:查询时需要合并批式层的历史数据和速度层的增量数据,逻辑复杂
Kappa 架构与批流融合
Kappa 架构彻底抛弃批式层,所有计算都通过流式引擎完成,历史数据通过重放 Kafka 消息来处理。但 Kappa 在处理超长历史窗口特征时成本较高。
更实用的方案是批流融合架构,用统一的 SQL/DSL 描述特征计算逻辑,由框架自动生成批式和流式两套执行计划:
-- 统一特征 SQL(同时支持批式和流式执行)
-- 在 Flink SQL 中,同一段 SQL 可以处理有界流(历史数据)和无界流(实时数据)
-- 定义 Kafka 数据源(流式)
CREATE TABLE user_behavior_stream (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 定义 Hive 数据源(批式,用于历史回填)
CREATE TABLE user_behavior_hive (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'hive',
'hive-database' = 'dwd',
'hive-table' = 'user_behavior_log'
);
-- 统一的特征计算 SQL(可在批式和流式模式下执行)
CREATE VIEW user_1h_click_feature AS
SELECT
user_id,
COUNT(*) AS click_1h,
COUNT(DISTINCT item_id) AS unique_item_1h,
WINDOW_START AS window_start,
WINDOW_END AS window_end
FROM TABLE(
TUMBLE(TABLE user_behavior_stream, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
WHERE behavior = 'click'
GROUP BY user_id, WINDOW_START, WINDOW_END;
-- 将结果写入 Redis(流式模式)
INSERT INTO redis_feature_sink
SELECT user_id, 'click_1h', CAST(click_1h AS STRING), window_end
FROM user_1h_click_feature;
特征平台架构
大型公司通常构建专门的特征平台(Feature Platform / Feature Store)来统一管理批流特征:
# 特征平台核心组件
特征注册中心(Feature Registry):
功能:
- 特征元数据管理(定义、血缘、统计信息)
- 特征版本管理(支持特征灰度发布)
- 特征共享与复用(避免重复开发)
实现:类似 Feast/Tecton 的特征定义 DSL
特征计算引擎(Feature Computation Engine):
批式引擎:Spark/Hive,负责历史特征回填和训练数据生成
流式引擎:Flink,负责实时特征更新
调度系统:Airflow/DolphinScheduler,管理批式任务依赖
特征存储(Feature Store):
离线存储:Hive/Iceberg,存储历史特征快照,供训练使用
在线存储:Redis/HBase,存储最新特征,供推理使用
特征一致性:确保离线和在线特征的计算逻辑完全一致
特征服务(Feature Service):
在线查询:低延迟批量特征查询接口(P99 < 10ms)
特征预计算:对热点用户/商品提前计算并缓存特征
降级策略:特征存储不可用时的默认值/降级逻辑
监控与治理:
数据质量监控:特征分布漂移检测、NULL 率监控
特征使用统计:哪些特征被哪些模型使用
成本优化:下线无人使用的特征,节省计算和存储资源
训练-服务一致性保障
训练-服务一致性(Training-Serving Consistency)是特征生产架构最核心的质量指标,任何细微的不一致都可能导致模型效果下降。
一致性问题的来源
训练-服务偏差来自多个层面:
- 计算逻辑不一致:批式 Hive SQL 和流式 Flink 代码的边界条件处理不同(如 NULL 处理、除零保护、浮点精度)
- 时间语义不一致:批式使用事件时间,流式使用处理时间,在数据延迟时产生差异
- 数据源不一致:训练时使用的数据源和在线推理时的数据源不完全相同(如训练用 T+1 数据,在线用实时流)
- 特征版本不一致:模型训练时用的特征 v1,但线上特征已更新为 v2
一致性保障方案
方案一:统一计算引擎(推荐)
使用 Flink 的 Bounded Stream 模式处理历史数据,与无界流模式共用同一套代码,从根本上消除批流逻辑差异:
// 同一个 Flink Job,通过配置切换批式/流式模式
public class UnifiedFeatureJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
boolean isBatchMode = ParameterTool.fromArgs(args).getBoolean("batch", false);
DataStream<UserBehaviorEvent> source;
if (isBatchMode) {
// 批式模式:读取 Hive 历史数据(有界流)
source = env.fromSource(
FileSource.forRecordStreamFormat(
new JsonFormat<>(UserBehaviorEvent.class),
new Path("hdfs://namenode/user/hive/warehouse/dwd.db/user_behavior/dt=2024-01-15")
).build(),
WatermarkStrategy.forMonotonousTimestamps(),
"Hive Batch Source"
);
} else {
// 流式模式:读取 Kafka 实时数据(无界流)
source = env.fromSource(
KafkaSource.<UserBehaviorEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("user_behavior")
.build(),
WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withTimestampAssigner((e, ts) -> e.getEventTime()),
"Kafka Stream Source"
);
}
// 完全相同的特征计算逻辑,批式和流式共用
computeFeatures(source)
.addSink(isBatchMode ? hdfsSink : redisSink);
env.execute("Unified Feature Job - " + (isBatchMode ? "Batch" : "Stream"));
}
private static DataStream<UserFeature> computeFeatures(
DataStream<UserBehaviorEvent> source) {
// 特征计算逻辑:批式和流式完全共用
return source
.filter(e -> e.getBehaviorType() != null)
.keyBy(UserBehaviorEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new FeatureAggregator());
}
}
方案二:特征日志回流(Feature Logging)
在线推理时,将实际使用的特征值记录到日志,训练时直接使用这些日志特征,彻底避免训练-服务偏差:
# 特征日志回流方案
# 在线推理服务
class RecommendService:
def predict(self, user_id: str, item_ids: List[str]) -> List[float]:
# 1. 从特征存储获取特征
features = self.feature_store.get_features(user_id, item_ids)
# 2. 模型推理
scores = self.model.predict(features)
# 3. 将特征日志写入 Kafka(异步,不影响推理延迟)
feature_log = {
"request_id": generate_uuid(),
"user_id": user_id,
"item_ids": item_ids,
"features": features.to_dict(), # 记录实际使用的特征值
"timestamp": int(time.time() * 1000)
}
self.kafka_producer.send("feature_logs", feature_log)
return scores
# 训练数据生成时,直接使用特征日志
# 避免重新计算特征,消除训练-服务偏差
SELECT
fl.request_id,
fl.user_id,
fl.features, -- 直接使用在线推理时的特征值
l.label -- 从行为日志获取样本标签
FROM feature_logs fl
JOIN sample_labels l ON fl.request_id = l.request_id
WHERE fl.dt = '2024-01-15';
特征一致性监控
生产环境中需要持续监控批流特征的一致性:
# 特征一致性检测
import numpy as np
from scipy.stats import ks_2samp
def check_feature_consistency(
batch_features: pd.DataFrame,
stream_features: pd.DataFrame,
feature_name: str,
threshold: float = 0.1
) -> bool:
"""
使用 KS 检验(Kolmogorov-Smirnov Test)检测批流特征分布一致性
KS 统计量 > threshold 时触发告警
"""
batch_values = batch_features[feature_name].dropna()
stream_values = stream_features[feature_name].dropna()
# KS 检验
ks_stat, p_value = ks_2samp(batch_values, stream_values)
# 计算相对误差
batch_mean = batch_values.mean()
stream_mean = stream_values.mean()
relative_error = abs(batch_mean - stream_mean) / (batch_mean + 1e-8)
is_consistent = ks_stat < threshold and relative_error < 0.05
# 记录监控指标
metrics.gauge("feature.consistency.ks_stat", ks_stat,
tags={"feature": feature_name})
metrics.gauge("feature.consistency.relative_error", relative_error,
tags={"feature": feature_name})
if not is_consistent:
alert(f"Feature {feature_name} consistency check failed: "
f"KS={ks_stat:.4f}, relative_error={relative_error:.4f}")
return is_consistent
特征存储设计
特征存储是批流特征架构的核心基础设施,需要同时满足离线训练的大吞吐读取和在线推理的低延迟查询。
离线特征存储
离线特征存储服务于模型训练和特征回填,核心需求是高吞吐、低成本:
- Apache Iceberg:支持 ACID 事务和 Schema Evolution,Time Travel 能力天然支持 PIT Join,与 Spark/Flink 深度集成,是离线特征存储的首选
- Apache Hudi:专为增量数据湖设计,支持 Upsert 和 Merge-on-Read,适合特征频繁更新的场景
- Delta Lake:Databricks 开源,与 Spark 生态最兼容,ACID 事务和 Time Travel 能力完善
# 使用 Apache Iceberg 作为离线特征存储
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog") \
.getOrCreate()
# 创建 Iceberg 特征表(支持分区和 ACID)
spark.sql("""
CREATE TABLE IF NOT EXISTS feature_store.user_features (
user_id BIGINT NOT NULL,
feature_dt DATE NOT NULL,
click_7d INT,
click_30d INT,
gmv_7d DOUBLE,
gmv_30d DOUBLE,
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (feature_dt)
TBLPROPERTIES (
'write.target-file-size-bytes' = '134217728', -- 128MB per file
'history.expire.min-snapshots-to-keep' = '30' -- 保留30个快照
)
""")
# 增量 Upsert 特征(Iceberg MERGE INTO)
spark.sql("""
MERGE INTO feature_store.user_features t
USING (
SELECT user_id, current_date() AS feature_dt,
click_7d, click_30d, gmv_7d, gmv_30d,
current_timestamp() AS updated_at
FROM daily_feature_computation
) s
ON t.user_id = s.user_id AND t.feature_dt = s.feature_dt
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# 使用 Time Travel 进行 PIT Join
training_features = spark.read \
.option("as-of-timestamp", "1705312800000") \
.table("feature_store.user_features")
在线特征存储
在线特征存储服务于模型推理,核心需求是低延迟(P99 < 10ms)和高可用:
# 特征服务层:统一封装在线特征读取
import redis
import json
from typing import Dict, List, Optional
class FeatureStore:
def __init__(self, redis_cluster_nodes: List[str]):
self.redis = redis.RedisCluster(
startup_nodes=[{"host": h, "port": 6379} for h in redis_cluster_nodes],
decode_responses=True,
max_connections=1000,
socket_timeout=0.005 # 5ms 超时
)
# 本地 LRU 缓存(减少 Redis 请求)
self.local_cache = LRUCache(maxsize=10000, ttl=60)
def get_user_features(
self,
user_ids: List[int],
feature_names: List[str]
) -> Dict[int, Dict[str, float]]:
"""批量获取用户特征,先查本地缓存,再查 Redis"""
result = {}
cache_miss_ids = []
# 1. 查本地缓存
for user_id in user_ids:
cached = self.local_cache.get(f"user:{user_id}")
if cached:
result[user_id] = {k: cached[k] for k in feature_names if k in cached}
else:
cache_miss_ids.append(user_id)
if not cache_miss_ids:
return result
# 2. Redis Pipeline 批量查询(减少网络往返)
pipeline = self.redis.pipeline(transaction=False)
for user_id in cache_miss_ids:
pipeline.hgetall(f"feature:user:{user_id}")
redis_results = pipeline.execute()
for user_id, feature_dict in zip(cache_miss_ids, redis_results):
if feature_dict:
# 解析特征值(Redis 存储字符串,需要转换类型)
parsed = {k: float(v) for k, v in feature_dict.items()}
result[user_id] = {k: parsed[k] for k in feature_names if k in parsed}
# 写入本地缓存
self.local_cache.set(f"user:{user_id}", parsed)
else:
# 特征缺失时使用默认值(降级)
result[user_id] = {k: 0.0 for k in feature_names}
return result
批流特征同步
批式特征(T+1 更新)和流式特征(实时更新)需要在在线存储中合理组织:
# 批流特征在 Redis 中的存储结构设计
# 方案一:分 Key 存储(批式和流式特征分开)
# 批式特征(每天凌晨更新)
feature:user:12345:batch:
click_7d: "156"
click_30d: "892"
gmv_30d: "3456.78"
updated_at: "1705312800"
# 流式特征(实时更新)
feature:user:12345:stream:
click_1h: "12"
click_today: "45"
last_click_item: "item_789"
session_click_cnt: "8"
# 方案二:合并 Key 存储(统一 Hash,批流字段共存)
feature:user:12345:
# 批式特征(前缀 b_)
b_click_7d: "156"
b_click_30d: "892"
# 流式特征(前缀 s_)
s_click_1h: "12"
s_session_cnt: "8"
# 元数据
batch_updated_at: "1705312800"
stream_updated_at: "1705314600"
生产实践与案例
美团特征平台实践
美团在外卖、到店等业务场景中积累了丰富的特征工程经验,其特征平台(Feature Platform)支撑了数百个推荐和风控模型:
- 统一特征 DSL:基于 Flink SQL 扩展的声明式特征定义语言,同一份特征 SQL 自动生成批式(Spark)和流式(Flink)两套执行计划,从根本上解决训练-服务偏差
- 特征血缘追踪:自动解析特征 SQL 的数据血缘,记录每个特征依赖哪些数据源、被哪些模型使用,支持影响面分析和故障快速定位
- 在线特征存储:基于 Cellar(美团自研 Redis 兼容存储)构建在线特征存储,支持数百亿 KV 的存储规模,P99 读取延迟 < 5ms
- 特征质量监控:实时监控特征的 NULL 率、分布漂移、批流一致性,任何异常自动触发告警并暂停相关模型的在线更新
字节跳动特征实践
字节跳动的推荐系统每天处理数千亿次特征请求,其特征工程体系具有以下特点:
- 实时特征优先:抖音、今日头条等产品高度依赖用户实时行为特征,流式特征计算延迟要求达到秒级甚至毫秒级
- 特征压缩存储:用户行为序列特征(Item Sequence)长度可达数千,通过 Protobuf 压缩后存储到 Redis,节省 60% 以上存储空间
- 多级缓存架构:推理服务本地缓存 → 分布式缓存(Redis)→ 特征存储,通过多级缓存将特征读取 P99 延迟控制在 3ms 以内
常见故障与排查
生产环境中特征生产系统的常见问题:
特征延迟升高:
- Flink Job 背压(Backpressure):上游 Kafka 消费速度跟不上生产速度,导致特征更新延迟。排查方向:Flink Web UI 查看背压节点,优化算子性能或增加并行度
- Redis 热点 Key:某些高活跃用户的特征 Key 被频繁写入,导致 Redis Slot 热点。解决方案:对热点 Key 进行 Slot 迁移,或在 Flink 侧对热点 Key 做本地聚合后批量写入
特征数据质量问题:
- Kafka 消息乱序:事件时间和处理时间差异过大,导致窗口计算结果不准确。解决方案:合理设置 Watermark 延迟,对晚到数据使用 Side Output 单独处理
- 上游数据源 Schema 变更:数据源字段新增/删除/类型变更导致特征计算 Job 失败。解决方案:引入 Schema Registry(如 Confluent Schema Registry),强制 Schema 兼容性检查
// 处理晚到数据(Late Data)
DataStream<UserBehaviorEvent> mainStream = behaviorStream
.keyBy(UserBehaviorEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(lateDataTag) // 晚到数据输出到 Side Output
.aggregate(new FeatureAggregator());
// 晚到数据单独处理(例如,补偿更新特征)
DataStream<UserBehaviorEvent> lateData = mainStream
.getSideOutput(lateDataTag);
lateData.addSink(new LateDataCompensationSink());
总结
批流特征生产架构是机器学习系统工程的核心挑战之一,关键要点总结:
- 训练-服务一致性是第一原则:批式和流式特征的计算逻辑必须严格一致,推荐使用统一计算引擎(Flink Bounded Stream)或特征日志回流方案从根本上消除偏差
- 时间语义是核心难题:Point-in-Time Join 避免数据泄漏,Watermark 处理乱序数据,事件时间 vs 处理时间的选择需要根据业务容忍度权衡
- 特征新鲜度分层管理:静态特征用批式 T+1,慢变特征用小时级批式,快变特征用分钟级流式,实时特征用秒级流式,不同层次的特征用不同的计算和存储策略
- 状态管理是流式计算的核心:选择合适的状态类型(ValueState/MapState/ListState),配置 RocksDB 状态后端处理大状态,定期 Checkpoint 保证故障恢复
- 特征存储需要批流分层:离线存储用 Iceberg/Hudi 支持 Time Travel 和高吞吐,在线存储用 Redis/HBase 支持低延迟查询,多级缓存架构控制端到端延迟
- 特征平台是规模化的关键:统一特征 DSL、特征血缘追踪、数据质量监控是特征平台的三大核心能力,支撑数百个模型的特征共享和复用