批流统一特征定义语言:业界最佳实践解析

批流统一的核心矛盾是:同一个特征,离线训练时用 Spark 算,在线服务时用 Flink 或 Java 算,两套代码维护成本高且容易出现语义差异。特征定义语言(Feature Definition Language)试图用一份定义驱动两套实现,从根本上消除这个问题。本文梳理业界四种主流方案的设计思路、代表实现和适用场景,帮助你在自建特征平台时做出合理选择。

问题根因:为什么两套代码必然出现偏差

在深入方案之前,先把问题说清楚。Training-Serving Skew 的根因不是工程师粗心,而是两套执行环境的语义天然不同

时间语义不同:Spark 批处理基于处理时间(当前时间),Flink 流处理可以基于事件时间(数据中的时间戳)。"近7天"在 Spark 里是 current_date() - 7,在 Flink 里是事件时间窗口。如果没有刻意对齐,两者的窗口边界可能差几分钟到几小时。

NULL 处理不同:Spark SQL 和 Java 代码对 NULL 的默认处理行为不同。Spark 中 SUM 忽略 NULL,Java 中如果不判断 null 会抛 NPE。工程师往往在两处用了不同的默认值(0 vs -1 vs null),导致特征分布偏移。

数值精度不同:Spark 的 DECIMAL(10,2) 和 Java 的 double 在精度处理上有差异。浮点数的舍入方式不同,累积后会产生可观的偏差。

边界条件处理不同:时间窗口的左闭右开还是左闭右闭,分组键为空时的行为,这些细节在两套代码中往往没有被对齐。

这些偏差单独看都很小,但叠加在一起,足以让模型的线上效果比离线评估差 1-3% AUC,而且极难排查——你永远不确定是模型问题还是特征问题。

方案一:Python SDK 驱动代码生成(Tecton 路线)

设计思路

用 Python 装饰器或函数定义特征的计算逻辑,平台解析这份定义,自动生成对应的 Spark 批处理代码和 Flink/Spark Streaming 流处理代码。用户只写一份 Python,平台负责翻译。

这是 Tecton 的核心方案,也是目前功能最完整的商业实现。

实现示例

from tecton import batch_feature_view, stream_feature_view, Entity
from tecton.types import Field, Int64, Float64
from datetime import timedelta

user = Entity(name="user", join_keys=["user_id"])

# 一份定义,平台自动生成 Spark 批作业(每小时重算)
@batch_feature_view(
    sources=[orders_batch_source],   # Hive/S3 数据源
    entities=[user],
    mode="spark_sql",
    batch_schedule=timedelta(hours=1),
    ttl=timedelta(days=30),
    features=[
        Field("buy_cnt_7d",      Int64),
        Field("total_amount_7d", Float64),
    ],
)
def user_purchase_batch(orders):
    return """
        SELECT
            user_id,
            COUNT(*) AS buy_cnt_7d,
            SUM(amount) AS total_amount_7d
        FROM orders
        WHERE event_time >= NOW() - INTERVAL 7 DAYS
          AND status = 'paid'
        GROUP BY user_id
    """

# 同一实体的流式版本:平台自动生成 Flink 作业(实时更新)
@stream_feature_view(
    sources=[orders_stream_source],  # Kafka 数据源
    entities=[user],
    mode="spark_sql",
    stream_processing_mode="continuous",
    features=[
        Field("buy_cnt_1h",      Int64),
        Field("total_amount_1h", Float64),
    ],
)
def user_purchase_stream(orders):
    return """
        SELECT
            user_id,
            COUNT(*) AS buy_cnt_1h,
            SUM(amount) AS total_amount_1h
        FROM orders
        WHERE event_time >= NOW() - INTERVAL 1 HOUR
          AND status = 'paid'
        GROUP BY user_id
    """

优劣分析

优点

  • 用户体验最好,Python 是算法工程师最熟悉的语言,上手成本低
  • 逻辑集中在一处,批流特征的关系一目了然
  • 平台可以自动管理作业调度、依赖关系、监控告警

缺点

  • SQL 字符串内嵌在 Python 中,IDE 无法做语法检查,容易出现运行时错误
  • 批流两个函数的 SQL 逻辑是分开写的,仍然存在人为不一致的风险(虽然比完全分离好得多)
  • 自研实现这套框架的工程量极大,基本只有 Tecton 这样的专业团队才能做好

适用场景:采购了 Tecton,或有足够工程资源自研类似框架的大型团队。

设计思路

Flink 1.12 之后原生支持批流一体:同一套 Flink SQL,通过切换执行模式(SET 'execution.runtime-mode' = 'batch''streaming'),分别以批处理和流处理方式执行。

这意味着:特征的计算逻辑只需要写一套 Flink SQL,离线训练时以批模式跑(读 Hive/HDFS),在线服务时以流模式跑(消费 Kafka),两种模式共享同一份 SQL,从语言层面消除了不一致。

实现示例

-- 这一份 SQL,批模式和流模式都能运行

-- 批模式:读 Hive 表,计算历史特征
SET 'execution.runtime-mode' = 'batch';

CREATE TABLE orders_batch (
    user_id    BIGINT,
    amount     DECIMAL(10,2),
    status     STRING,
    event_time TIMESTAMP(3)
) WITH (
    'connector' = 'hive',
    'default-database' = 'dwd',
    'table-name' = 'orders'
);

-- 流模式:消费 Kafka,计算实时特征
SET 'execution.runtime-mode' = 'streaming';

CREATE TABLE orders_stream (
    user_id    BIGINT,
    amount     DECIMAL(10,2),
    status     STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic'     = 'order_events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'    = 'json'
);

-- 核心计算逻辑:批流共用,只需写一次
-- 批模式时 source 是 orders_batch,流模式时是 orders_stream
CREATE VIEW user_purchase_features AS
SELECT
    user_id,
    COUNT(*) AS buy_cnt,
    SUM(CASE WHEN status = 'paid' THEN amount ELSE 0 END) AS total_amount,
    -- 窗口聚合:批模式用静态时间范围,流模式用滑动窗口
    window_end
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
WHERE status = 'paid'
GROUP BY user_id, window_start, window_end;
# Python 驱动层:根据场景切换执行模式
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def run_feature_job(mode: str):
    if mode == "batch":
        settings = EnvironmentSettings.in_batch_mode()
    else:
        settings = EnvironmentSettings.in_streaming_mode()

    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env, settings)

    # 加载同一份特征定义 SQL
    with open("features/user_purchase.sql") as f:
        sql = f.read()

    # 根据模式绑定不同的数据源
    if mode == "batch":
        t_env.execute_sql(HIVE_SOURCE_DDL)
    else:
        t_env.execute_sql(KAFKA_SOURCE_DDL)

    t_env.execute_sql(sql)

优劣分析

优点

  • 计算逻辑真正只写一次,批流语义由 Flink 引擎保证一致,不依赖人工对齐
  • Flink SQL 是标准 SQL 方言,可读性好,算法工程师容易上手
  • 开源方案,不依赖商业平台,自主可控

缺点

  • Flink 批模式的性能和成熟度不如 Spark(大规模复杂 ETL 场景 Spark 仍是首选)
  • 窗口语义在批流两种模式下仍有差异(批模式的静态时间范围 vs 流模式的滑动窗口),需要仔细设计 SQL 确保语义等价
  • 数据源切换(批用 Hive,流用 Kafka)需要额外的配置管理

适用场景:已经重度使用 Flink 的团队,特征计算逻辑相对简单(窗口聚合为主),追求开源自主可控。

方案三:声明式 YAML/JSON DSL(字节/阿里路线)

设计思路

用声明式的 YAML 或 JSON 描述特征的"是什么"(实体、数据源、窗口、聚合方式),平台的代码生成器负责翻译为具体的 Spark SQL 和 Flink SQL。用户不写任何代码,只填配置。

这是字节跳动、阿里巴巴等大厂内部特征平台的主流方案,适合特征类型相对标准化(以窗口聚合为主)的场景。

实现示例

# 特征定义(YAML DSL)
feature_group:
  name: user_purchase_features
  description: 用户购买行为特征
  entity:
    name: user_id
    type: BIGINT

  source:
    name: order_events
    kafka_topic: order_events          # 流数据源
    hive_table: dwd.orders             # 批数据源(同一份数据的两种形态)
    timestamp_field: event_time
    filter: "status = 'paid'"

  features:
    - name: buy_cnt_1h
      type: INT64
      window: {type: sliding, size: 1h, slide: 10m}
      aggregation: COUNT
      description: 近1小时购买次数

    - name: buy_cnt_7d
      type: INT64
      window: {type: fixed, size: 7d}
      aggregation: COUNT
      description: 近7天购买次数

    - name: total_amount_7d
      type: FLOAT64
      window: {type: fixed, size: 7d}
      aggregation: SUM
      field: amount
      description: 近7天购买总金额

    - name: avg_amount_30d
      type: FLOAT64
      window: {type: fixed, size: 30d}
      aggregation: AVG
      field: amount
      description: 近30天平均购买金额
      null_fill: 0.0                   # 统一的缺失值填充,批流一致

平台的代码生成器读取上述 YAML,自动生成:

-- 自动生成的 Spark SQL(批处理,用于离线特征和训练样本拼接)
SELECT
    user_id,
    COUNT(*) AS buy_cnt_7d,
    SUM(amount) AS total_amount_7d,
    COALESCE(AVG(amount), 0.0) AS avg_amount_30d
FROM dwd.orders
WHERE status = 'paid'
  AND event_time >= date_sub(from_unixtime(unix_timestamp()), 7)
  AND event_time >= date_sub(from_unixtime(unix_timestamp()), 30)
GROUP BY user_id;
-- 自动生成的 Flink SQL(流处理,用于实时特征更新)
SELECT
    user_id,
    COUNT(*) AS buy_cnt_1h,
    window_start,
    window_end
FROM TABLE(
    SLIDE(TABLE order_events, DESCRIPTOR(event_time),
          INTERVAL '10' MINUTE, INTERVAL '1' HOUR)
)
WHERE status = 'paid'
GROUP BY user_id, window_start, window_end;

代码生成器的核心设计

YAML DSL 方案的工程核心是代码生成器,需要处理以下映射关系:

  • 聚合算子映射:COUNT/SUM/AVG/MAX/MIN → Spark SQL 聚合函数 / Flink 聚合算子
  • 窗口类型映射:fixed(固定窗口)→ Spark 的时间范围过滤 / Flink 的 Tumble Window;sliding(滑动窗口)→ Spark 的滑动时间过滤 / Flink 的 Slide Window
  • 时间语义对齐:批处理的"近7天"用处理时间计算截止点,流处理用事件时间 Watermark,需要在 DSL 层面统一定义,代码生成时分别翻译
  • NULL 处理统一:DSL 中声明 null_fill,代码生成时在批处理和流处理中都加上 COALESCE(field, null_fill)

优劣分析

优点

  • 用户体验最简单,不需要写任何代码,填配置即可
  • 批流语义由代码生成器统一保证,人为引入不一致的风险最低
  • 特征定义标准化后,平台可以自动做特征质量监控、血缘分析、影响分析
  • 非常适合大规模特征工厂(数千个特征),统一管理成本低

缺点

  • 表达能力受限:只能表达标准的窗口聚合特征,复杂的自定义逻辑(如序列特征、图特征)无法用 DSL 描述
  • 代码生成器本身是复杂的工程,需要投入大量开发资源,且生成的代码难以调试
  • DSL 的演进(新增特征类型)需要同时修改 DSL 规范和代码生成器,迭代较慢

适用场景:特征以标准窗口聚合为主(占 80% 以上),特征数量大(数百到数千),有足够工程资源开发和维护代码生成器的大型团队。字节、阿里、美团等大厂的特征平台核心都是这个方案。

方案四:共享算子库(最务实的方案)

设计思路

前三个方案都试图从"定义层"解决问题,但有一个更务实的思路:不统一定义,而是统一实现。把特征计算中容易出现偏差的部分(时间窗口计算、NULL 处理、数值归一化)抽象为共享算子库,批处理(Spark)和流处理(Flink/在线 Java 服务)都调用同一套算子库的实现。

这不能从根本上消除 Training-Serving Skew,但能大幅降低偏差的概率,且工程实现成本远低于前三种方案。

实现示例

// 共享算子库(Java 实现,Spark/Flink/在线服务共用)
public class FeatureOperators {

    /**
     * 标准化时间窗口计算:统一定义"近N天"的边界
     * 批处理和流处理都调用这个方法,保证边界一致
     */
    public static Timestamp windowStart(Timestamp referenceTime, int days) {
        // 精确到天,不是到秒,避免时区和精度问题
        LocalDate refDate = referenceTime.toLocalDateTime().toLocalDate();
        return Timestamp.valueOf(refDate.minusDays(days).atStartOfDay());
    }

    /**
     * 统一的缺失值处理:所有特征的 NULL 填充规则在这里集中定义
     */
    public static double fillNull(Double value, double defaultValue) {
        return value == null ? defaultValue : value;
    }

    /**
     * 统一的数值归一化:训练和线上用同一套参数和逻辑
     */
    public static double normalize(double value, double mean, double std) {
        if (std < 1e-8) return 0.0;
        return (value - mean) / std;
    }
}
// Spark UDF:注册共享算子为 Spark 函数
import org.apache.spark.sql.functions.udf

val windowStartUDF = udf((ts: java.sql.Timestamp, days: Int) =>
    FeatureOperators.windowStart(ts, days))

spark.udf.register("window_start", windowStartUDF)

// Spark SQL 中使用共享算子
spark.sql("""
    SELECT
        user_id,
        COUNT(*) AS buy_cnt_7d
    FROM orders
    WHERE event_time >= window_start(current_timestamp(), 7)
      AND status = 'paid'
    GROUP BY user_id
""")
// Flink 自定义函数:复用同一套算子
public class WindowStartFunction extends ScalarFunction {
    public Timestamp eval(Timestamp ts, Integer days) {
        return FeatureOperators.windowStart(ts, days);
    }
}

// 在线 Java 服务:直接调用,不需要适配
public class FeatureService {
    public Map<String, Object> buildFeatures(long userId, Timestamp now) {
        // 查询 Redis 中的原始计数
        long rawCount = redisClient.getCount(userId);
        // 使用共享算子做后处理
        double normalized = FeatureOperators.normalize(rawCount, MEAN, STD);
        return Map.of("buy_cnt_7d_norm", normalized);
    }
}

优劣分析

优点

  • 工程实现最简单,不需要代码生成器或复杂框架,一个 JAR 包解决问题
  • 灵活性高,复杂特征逻辑可以在算子库中自由实现
  • 对现有代码侵入性小,可以渐进式引入

缺点

  • 只能统一"共性逻辑",特征的整体计算流程仍然是分散的
  • 需要工程师主动使用共享算子库,有遗漏的风险
  • 不能从根本上保证批流一致,只是降低了偏差概率

适用场景:中小型团队,特征数量不多,没有资源开发完整特征平台,但又想降低 Training-Serving Skew 风险的实用方案。也适合作为其他方案的补充(在 DSL 方案无法覆盖的复杂特征上使用)。

四方案横向对比

  • 一致性保证强度:Flink SQL 统一 ≈ YAML DSL > Python SDK > 算子库共享
  • 表达能力:Python SDK ≈ 算子库共享 > Flink SQL > YAML DSL
  • 用户体验(易用性):YAML DSL > Python SDK > Flink SQL > 算子库共享
  • 工程实现成本:算子库共享(低)< Flink SQL(中)< Python SDK(高)< YAML DSL(高)
  • 适合团队规模:算子库共享(小团队)、Flink SQL(中等)、Python SDK/YAML DSL(大团队)

实践建议:如何选择和落地

渐进式演进路径

没有哪个方案是一步到位的,建议按以下路径演进:

  1. 起步阶段:先用算子库共享解决最常见的偏差(时间窗口边界、NULL 处理),成本低,立竿见影
  2. 成长阶段:特征数量增长到 100+ 后,引入 YAML DSL 管理标准窗口聚合特征,复杂特征仍用算子库
  3. 成熟阶段:特征数量 1000+ 后,考虑 Flink SQL 统一批流,或引入 Feast/Tecton 管理特征元数据和服务

不管哪种方案都要做的事

无论选择哪个方案,以下工程实践都是必须的:

  • 特征一致性校验:定期(每天)对比离线特征和在线特征的分布(均值、方差、分位数),超过阈值告警。这是发现 Skew 的最后一道防线。
  • 特征版本管理:每次特征计算逻辑变更都要打版本号,模型训练时记录使用的特征版本,上线时确保推理服务使用的特征版本与训练时一致。
  • Shadow Mode 验证:新的特征计算逻辑上线前,在影子模式(Shadow Mode)下同时运行新旧两套逻辑,对比输出,确认无偏差后再切换。
# 特征一致性校验示例
def check_feature_consistency(feature_name: str, date: str):
    # 从离线存储(Hive)读取批量计算的特征值
    offline = spark.sql(f"""
        SELECT user_id, {feature_name}
        FROM feature_store.user_features
        WHERE dt = '{date}'
    """).toPandas()

    # 从在线存储(Redis)读取当前特征值(抽样)
    sample_users = offline['user_id'].sample(1000).tolist()
    online_values = redis_client.hmget_batch(
        [f"user:features:{uid}" for uid in sample_users],
        feature_name
    )
    online = pd.DataFrame({'user_id': sample_users, feature_name: online_values})

    # 计算 PSI(分布漂移指标)
    psi = calculate_psi(offline[feature_name], online[feature_name])

    if psi > 0.1:
        alert(f"特征 {feature_name} 批流一致性告警:PSI={psi:.3f}")
    return psi

总结

批流统一特征定义语言本质上是在解决一个抽象层次的问题:把特征的"语义"(这个特征计算的是什么)和"实现"(怎么计算)分离。语义层只有一份,实现层可以有多份(Spark、Flink、Java),但由平台保证各实现的语义等价。

四种方案在这个抽象层次上的位置不同:

  • 算子库共享:只统一了最底层的工具函数,抽象层次最低
  • Flink SQL 统一:在 SQL 层面统一,引擎保证语义一致,但要求批流都用 Flink
  • Python SDK:在 Python 函数层面统一,平台负责翻译,灵活性和一致性兼顾
  • YAML DSL:在声明式配置层面统一,抽象层次最高,代价是表达能力受限

没有银弹。选择的关键是:你的特征有多复杂、你的团队有多大、你愿意投入多少工程资源。从算子库共享起步,随着规模增长逐步演进,是最稳健的路径。