批流统一的核心矛盾是:同一个特征,离线训练时用 Spark 算,在线服务时用 Flink 或 Java 算,两套代码维护成本高且容易出现语义差异。特征定义语言(Feature Definition Language)试图用一份定义驱动两套实现,从根本上消除这个问题。本文梳理业界四种主流方案的设计思路、代表实现和适用场景,帮助你在自建特征平台时做出合理选择。
问题根因:为什么两套代码必然出现偏差
在深入方案之前,先把问题说清楚。Training-Serving Skew 的根因不是工程师粗心,而是两套执行环境的语义天然不同:
时间语义不同:Spark 批处理基于处理时间(当前时间),Flink 流处理可以基于事件时间(数据中的时间戳)。"近7天"在 Spark 里是 current_date() - 7,在 Flink 里是事件时间窗口。如果没有刻意对齐,两者的窗口边界可能差几分钟到几小时。
NULL 处理不同:Spark SQL 和 Java 代码对 NULL 的默认处理行为不同。Spark 中 SUM 忽略 NULL,Java 中如果不判断 null 会抛 NPE。工程师往往在两处用了不同的默认值(0 vs -1 vs null),导致特征分布偏移。
数值精度不同:Spark 的 DECIMAL(10,2) 和 Java 的 double 在精度处理上有差异。浮点数的舍入方式不同,累积后会产生可观的偏差。
边界条件处理不同:时间窗口的左闭右开还是左闭右闭,分组键为空时的行为,这些细节在两套代码中往往没有被对齐。
这些偏差单独看都很小,但叠加在一起,足以让模型的线上效果比离线评估差 1-3% AUC,而且极难排查——你永远不确定是模型问题还是特征问题。
方案一:Python SDK 驱动代码生成(Tecton 路线)
设计思路
用 Python 装饰器或函数定义特征的计算逻辑,平台解析这份定义,自动生成对应的 Spark 批处理代码和 Flink/Spark Streaming 流处理代码。用户只写一份 Python,平台负责翻译。
这是 Tecton 的核心方案,也是目前功能最完整的商业实现。
实现示例
from tecton import batch_feature_view, stream_feature_view, Entity
from tecton.types import Field, Int64, Float64
from datetime import timedelta
user = Entity(name="user", join_keys=["user_id"])
# 一份定义,平台自动生成 Spark 批作业(每小时重算)
@batch_feature_view(
sources=[orders_batch_source], # Hive/S3 数据源
entities=[user],
mode="spark_sql",
batch_schedule=timedelta(hours=1),
ttl=timedelta(days=30),
features=[
Field("buy_cnt_7d", Int64),
Field("total_amount_7d", Float64),
],
)
def user_purchase_batch(orders):
return """
SELECT
user_id,
COUNT(*) AS buy_cnt_7d,
SUM(amount) AS total_amount_7d
FROM orders
WHERE event_time >= NOW() - INTERVAL 7 DAYS
AND status = 'paid'
GROUP BY user_id
"""
# 同一实体的流式版本:平台自动生成 Flink 作业(实时更新)
@stream_feature_view(
sources=[orders_stream_source], # Kafka 数据源
entities=[user],
mode="spark_sql",
stream_processing_mode="continuous",
features=[
Field("buy_cnt_1h", Int64),
Field("total_amount_1h", Float64),
],
)
def user_purchase_stream(orders):
return """
SELECT
user_id,
COUNT(*) AS buy_cnt_1h,
SUM(amount) AS total_amount_1h
FROM orders
WHERE event_time >= NOW() - INTERVAL 1 HOUR
AND status = 'paid'
GROUP BY user_id
"""
优劣分析
优点:
- 用户体验最好,Python 是算法工程师最熟悉的语言,上手成本低
- 逻辑集中在一处,批流特征的关系一目了然
- 平台可以自动管理作业调度、依赖关系、监控告警
缺点:
- SQL 字符串内嵌在 Python 中,IDE 无法做语法检查,容易出现运行时错误
- 批流两个函数的 SQL 逻辑是分开写的,仍然存在人为不一致的风险(虽然比完全分离好得多)
- 自研实现这套框架的工程量极大,基本只有 Tecton 这样的专业团队才能做好
适用场景:采购了 Tecton,或有足够工程资源自研类似框架的大型团队。
方案二:Flink SQL 统一批流(流批一体路线)
设计思路
Flink 1.12 之后原生支持批流一体:同一套 Flink SQL,通过切换执行模式(SET 'execution.runtime-mode' = 'batch' 或 'streaming'),分别以批处理和流处理方式执行。
这意味着:特征的计算逻辑只需要写一套 Flink SQL,离线训练时以批模式跑(读 Hive/HDFS),在线服务时以流模式跑(消费 Kafka),两种模式共享同一份 SQL,从语言层面消除了不一致。
实现示例
-- 这一份 SQL,批模式和流模式都能运行
-- 批模式:读 Hive 表,计算历史特征
SET 'execution.runtime-mode' = 'batch';
CREATE TABLE orders_batch (
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'hive',
'default-database' = 'dwd',
'table-name' = 'orders'
);
-- 流模式:消费 Kafka,计算实时特征
SET 'execution.runtime-mode' = 'streaming';
CREATE TABLE orders_stream (
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'order_events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 核心计算逻辑:批流共用,只需写一次
-- 批模式时 source 是 orders_batch,流模式时是 orders_stream
CREATE VIEW user_purchase_features AS
SELECT
user_id,
COUNT(*) AS buy_cnt,
SUM(CASE WHEN status = 'paid' THEN amount ELSE 0 END) AS total_amount,
-- 窗口聚合:批模式用静态时间范围,流模式用滑动窗口
window_end
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
WHERE status = 'paid'
GROUP BY user_id, window_start, window_end;
# Python 驱动层:根据场景切换执行模式
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def run_feature_job(mode: str):
if mode == "batch":
settings = EnvironmentSettings.in_batch_mode()
else:
settings = EnvironmentSettings.in_streaming_mode()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env, settings)
# 加载同一份特征定义 SQL
with open("features/user_purchase.sql") as f:
sql = f.read()
# 根据模式绑定不同的数据源
if mode == "batch":
t_env.execute_sql(HIVE_SOURCE_DDL)
else:
t_env.execute_sql(KAFKA_SOURCE_DDL)
t_env.execute_sql(sql)
优劣分析
优点:
- 计算逻辑真正只写一次,批流语义由 Flink 引擎保证一致,不依赖人工对齐
- Flink SQL 是标准 SQL 方言,可读性好,算法工程师容易上手
- 开源方案,不依赖商业平台,自主可控
缺点:
- Flink 批模式的性能和成熟度不如 Spark(大规模复杂 ETL 场景 Spark 仍是首选)
- 窗口语义在批流两种模式下仍有差异(批模式的静态时间范围 vs 流模式的滑动窗口),需要仔细设计 SQL 确保语义等价
- 数据源切换(批用 Hive,流用 Kafka)需要额外的配置管理
适用场景:已经重度使用 Flink 的团队,特征计算逻辑相对简单(窗口聚合为主),追求开源自主可控。
方案三:声明式 YAML/JSON DSL(字节/阿里路线)
设计思路
用声明式的 YAML 或 JSON 描述特征的"是什么"(实体、数据源、窗口、聚合方式),平台的代码生成器负责翻译为具体的 Spark SQL 和 Flink SQL。用户不写任何代码,只填配置。
这是字节跳动、阿里巴巴等大厂内部特征平台的主流方案,适合特征类型相对标准化(以窗口聚合为主)的场景。
实现示例
# 特征定义(YAML DSL)
feature_group:
name: user_purchase_features
description: 用户购买行为特征
entity:
name: user_id
type: BIGINT
source:
name: order_events
kafka_topic: order_events # 流数据源
hive_table: dwd.orders # 批数据源(同一份数据的两种形态)
timestamp_field: event_time
filter: "status = 'paid'"
features:
- name: buy_cnt_1h
type: INT64
window: {type: sliding, size: 1h, slide: 10m}
aggregation: COUNT
description: 近1小时购买次数
- name: buy_cnt_7d
type: INT64
window: {type: fixed, size: 7d}
aggregation: COUNT
description: 近7天购买次数
- name: total_amount_7d
type: FLOAT64
window: {type: fixed, size: 7d}
aggregation: SUM
field: amount
description: 近7天购买总金额
- name: avg_amount_30d
type: FLOAT64
window: {type: fixed, size: 30d}
aggregation: AVG
field: amount
description: 近30天平均购买金额
null_fill: 0.0 # 统一的缺失值填充,批流一致
平台的代码生成器读取上述 YAML,自动生成:
-- 自动生成的 Spark SQL(批处理,用于离线特征和训练样本拼接)
SELECT
user_id,
COUNT(*) AS buy_cnt_7d,
SUM(amount) AS total_amount_7d,
COALESCE(AVG(amount), 0.0) AS avg_amount_30d
FROM dwd.orders
WHERE status = 'paid'
AND event_time >= date_sub(from_unixtime(unix_timestamp()), 7)
AND event_time >= date_sub(from_unixtime(unix_timestamp()), 30)
GROUP BY user_id;
-- 自动生成的 Flink SQL(流处理,用于实时特征更新)
SELECT
user_id,
COUNT(*) AS buy_cnt_1h,
window_start,
window_end
FROM TABLE(
SLIDE(TABLE order_events, DESCRIPTOR(event_time),
INTERVAL '10' MINUTE, INTERVAL '1' HOUR)
)
WHERE status = 'paid'
GROUP BY user_id, window_start, window_end;
代码生成器的核心设计
YAML DSL 方案的工程核心是代码生成器,需要处理以下映射关系:
- 聚合算子映射:COUNT/SUM/AVG/MAX/MIN → Spark SQL 聚合函数 / Flink 聚合算子
- 窗口类型映射:fixed(固定窗口)→ Spark 的时间范围过滤 / Flink 的 Tumble Window;sliding(滑动窗口)→ Spark 的滑动时间过滤 / Flink 的 Slide Window
- 时间语义对齐:批处理的"近7天"用处理时间计算截止点,流处理用事件时间 Watermark,需要在 DSL 层面统一定义,代码生成时分别翻译
- NULL 处理统一:DSL 中声明
null_fill,代码生成时在批处理和流处理中都加上COALESCE(field, null_fill)
优劣分析
优点:
- 用户体验最简单,不需要写任何代码,填配置即可
- 批流语义由代码生成器统一保证,人为引入不一致的风险最低
- 特征定义标准化后,平台可以自动做特征质量监控、血缘分析、影响分析
- 非常适合大规模特征工厂(数千个特征),统一管理成本低
缺点:
- 表达能力受限:只能表达标准的窗口聚合特征,复杂的自定义逻辑(如序列特征、图特征)无法用 DSL 描述
- 代码生成器本身是复杂的工程,需要投入大量开发资源,且生成的代码难以调试
- DSL 的演进(新增特征类型)需要同时修改 DSL 规范和代码生成器,迭代较慢
适用场景:特征以标准窗口聚合为主(占 80% 以上),特征数量大(数百到数千),有足够工程资源开发和维护代码生成器的大型团队。字节、阿里、美团等大厂的特征平台核心都是这个方案。
方案四:共享算子库(最务实的方案)
设计思路
前三个方案都试图从"定义层"解决问题,但有一个更务实的思路:不统一定义,而是统一实现。把特征计算中容易出现偏差的部分(时间窗口计算、NULL 处理、数值归一化)抽象为共享算子库,批处理(Spark)和流处理(Flink/在线 Java 服务)都调用同一套算子库的实现。
这不能从根本上消除 Training-Serving Skew,但能大幅降低偏差的概率,且工程实现成本远低于前三种方案。
实现示例
// 共享算子库(Java 实现,Spark/Flink/在线服务共用)
public class FeatureOperators {
/**
* 标准化时间窗口计算:统一定义"近N天"的边界
* 批处理和流处理都调用这个方法,保证边界一致
*/
public static Timestamp windowStart(Timestamp referenceTime, int days) {
// 精确到天,不是到秒,避免时区和精度问题
LocalDate refDate = referenceTime.toLocalDateTime().toLocalDate();
return Timestamp.valueOf(refDate.minusDays(days).atStartOfDay());
}
/**
* 统一的缺失值处理:所有特征的 NULL 填充规则在这里集中定义
*/
public static double fillNull(Double value, double defaultValue) {
return value == null ? defaultValue : value;
}
/**
* 统一的数值归一化:训练和线上用同一套参数和逻辑
*/
public static double normalize(double value, double mean, double std) {
if (std < 1e-8) return 0.0;
return (value - mean) / std;
}
}
// Spark UDF:注册共享算子为 Spark 函数
import org.apache.spark.sql.functions.udf
val windowStartUDF = udf((ts: java.sql.Timestamp, days: Int) =>
FeatureOperators.windowStart(ts, days))
spark.udf.register("window_start", windowStartUDF)
// Spark SQL 中使用共享算子
spark.sql("""
SELECT
user_id,
COUNT(*) AS buy_cnt_7d
FROM orders
WHERE event_time >= window_start(current_timestamp(), 7)
AND status = 'paid'
GROUP BY user_id
""")
// Flink 自定义函数:复用同一套算子
public class WindowStartFunction extends ScalarFunction {
public Timestamp eval(Timestamp ts, Integer days) {
return FeatureOperators.windowStart(ts, days);
}
}
// 在线 Java 服务:直接调用,不需要适配
public class FeatureService {
public Map<String, Object> buildFeatures(long userId, Timestamp now) {
// 查询 Redis 中的原始计数
long rawCount = redisClient.getCount(userId);
// 使用共享算子做后处理
double normalized = FeatureOperators.normalize(rawCount, MEAN, STD);
return Map.of("buy_cnt_7d_norm", normalized);
}
}
优劣分析
优点:
- 工程实现最简单,不需要代码生成器或复杂框架,一个 JAR 包解决问题
- 灵活性高,复杂特征逻辑可以在算子库中自由实现
- 对现有代码侵入性小,可以渐进式引入
缺点:
- 只能统一"共性逻辑",特征的整体计算流程仍然是分散的
- 需要工程师主动使用共享算子库,有遗漏的风险
- 不能从根本上保证批流一致,只是降低了偏差概率
适用场景:中小型团队,特征数量不多,没有资源开发完整特征平台,但又想降低 Training-Serving Skew 风险的实用方案。也适合作为其他方案的补充(在 DSL 方案无法覆盖的复杂特征上使用)。
四方案横向对比
- 一致性保证强度:Flink SQL 统一 ≈ YAML DSL > Python SDK > 算子库共享
- 表达能力:Python SDK ≈ 算子库共享 > Flink SQL > YAML DSL
- 用户体验(易用性):YAML DSL > Python SDK > Flink SQL > 算子库共享
- 工程实现成本:算子库共享(低)< Flink SQL(中)< Python SDK(高)< YAML DSL(高)
- 适合团队规模:算子库共享(小团队)、Flink SQL(中等)、Python SDK/YAML DSL(大团队)
实践建议:如何选择和落地
渐进式演进路径
没有哪个方案是一步到位的,建议按以下路径演进:
- 起步阶段:先用算子库共享解决最常见的偏差(时间窗口边界、NULL 处理),成本低,立竿见影
- 成长阶段:特征数量增长到 100+ 后,引入 YAML DSL 管理标准窗口聚合特征,复杂特征仍用算子库
- 成熟阶段:特征数量 1000+ 后,考虑 Flink SQL 统一批流,或引入 Feast/Tecton 管理特征元数据和服务
不管哪种方案都要做的事
无论选择哪个方案,以下工程实践都是必须的:
- 特征一致性校验:定期(每天)对比离线特征和在线特征的分布(均值、方差、分位数),超过阈值告警。这是发现 Skew 的最后一道防线。
- 特征版本管理:每次特征计算逻辑变更都要打版本号,模型训练时记录使用的特征版本,上线时确保推理服务使用的特征版本与训练时一致。
- Shadow Mode 验证:新的特征计算逻辑上线前,在影子模式(Shadow Mode)下同时运行新旧两套逻辑,对比输出,确认无偏差后再切换。
# 特征一致性校验示例
def check_feature_consistency(feature_name: str, date: str):
# 从离线存储(Hive)读取批量计算的特征值
offline = spark.sql(f"""
SELECT user_id, {feature_name}
FROM feature_store.user_features
WHERE dt = '{date}'
""").toPandas()
# 从在线存储(Redis)读取当前特征值(抽样)
sample_users = offline['user_id'].sample(1000).tolist()
online_values = redis_client.hmget_batch(
[f"user:features:{uid}" for uid in sample_users],
feature_name
)
online = pd.DataFrame({'user_id': sample_users, feature_name: online_values})
# 计算 PSI(分布漂移指标)
psi = calculate_psi(offline[feature_name], online[feature_name])
if psi > 0.1:
alert(f"特征 {feature_name} 批流一致性告警:PSI={psi:.3f}")
return psi
总结
批流统一特征定义语言本质上是在解决一个抽象层次的问题:把特征的"语义"(这个特征计算的是什么)和"实现"(怎么计算)分离。语义层只有一份,实现层可以有多份(Spark、Flink、Java),但由平台保证各实现的语义等价。
四种方案在这个抽象层次上的位置不同:
- 算子库共享:只统一了最底层的工具函数,抽象层次最低
- Flink SQL 统一:在 SQL 层面统一,引擎保证语义一致,但要求批流都用 Flink
- Python SDK:在 Python 函数层面统一,平台负责翻译,灵活性和一致性兼顾
- YAML DSL:在声明式配置层面统一,抽象层次最高,代价是表达能力受限
没有银弹。选择的关键是:你的特征有多复杂、你的团队有多大、你愿意投入多少工程资源。从算子库共享起步,随着规模增长逐步演进,是最稳健的路径。