1. 1. 1. 第一性原理:为什么需要 Variant
    1. 1.1. 1.1 半结构化数据的本质矛盾
    2. 1.2. 1.2 现有方案的权衡
    3. 1.3. 1.3 Variant 带来的收益
  2. 2. 2. Variant 的核心设计
    1. 2.1. 2.1 总体结构:value + metadata 两段独立 bytes
    2. 2.2. 2.2 Header byte:每个 value 节点的第一个字节
    3. 2.3. 2.3 PRIMITIVE 子类型编码(bt = 0)
    4. 2.4. 2.4 OBJECT layout(bt = 2)
    5. 2.5. 2.5 ARRAY layout(bt = 3)
    6. 2.6. 2.6 Metadata layout(字段名字典)
    7. 2.7. 2.7 完整编码实例:{"a": 1, "b": "hi"}
    8. 2.8. 2.8 三个层次:引擎类型、文件格式、表格式
  3. 3. 3. 场景一:只有 Spark 支持(引擎层有 VariantType)
    1. 3.1. 3.1 类型定义
    2. 3.2. 3.2 两种落盘形态决定了能力上限
    3. 3.3. 3.3 类型解析与 Subfield 解析
    4. 3.4. 3.4 谓词下推与列裁剪:取决于是否 shredding
    5. 3.5. 3.5 优缺点小结
  4. 4. 4. 场景二:只有 Iceberg 支持(Spark 引擎层无 VariantType)
    1. 4.1. 4.1 问题的本质
    2. 4.2. 4.2 类型映射的三种选项
    3. 4.3. 4.3 Subfield 解析与下推
    4. 4.4. 4.4 优缺点
  5. 5. 5. 场景三:Spark + Iceberg 两方协同
    1. 5.1. 5.1 两方的职责划分
    2. 5.2. 5.2 类型链与类型解析
    3. 5.3. 5.3 Subfield 解析:DSv2 契约,不是 PushVariantIntoScan
    4. 5.4. 5.4 谓词下推:两层裁剪
    5. 5.5. 5.5 列裁剪:Shredding + DSv2 列剪枝
    6. 5.6. 5.6 严格 Cast 错误的延迟(DSv2 版)
    7. 5.7. 5.7 优缺点
  6. 6. 6. 横向对比:四个关键能力维度
    1. 6.1. 6.1 类型定义
    2. 6.2. 6.2 Subfield 解析
    3. 6.3. 6.3 谓词下推
    4. 6.4. 6.4 列裁剪
    5. 6.5. 6.5 一句话总结每个场景
    6. 6.6. 6.6 现实状态(截至 Spark 4.x / Iceberg main)
    7. 6.7. 附:关键源码索引

Variant原理与实现

1. 第一性原理:为什么需要 Variant

1.1 半结构化数据的本质矛盾

关系型数据库和列式存储引擎建立在一个基本假设上:每一列的类型在写入前已知且固定。这在 OLTP/OLAP 场景运转良好,但在以下场景产生根本性摩擦:

  • 用户行为事件:不同事件类型有完全不同的属性结构
  • API 响应日志:第三方接口的 JSON 格式随版本迭代不断变化
  • IoT 设备数据:不同设备型号上报不同的传感器字段
  • 机器学习 Feature Store:特征集合动态增减

这类数据的共同特征是:schema 在写入时未知,或在不同行之间高度异构

1.2 现有方案的权衡

面对这个矛盾,业界形成了三条路径:

路径 A:存为 STRING(JSON 文本)

1
2
CREATE TABLE events (id BIGINT, payload STRING);
INSERT INTO events VALUES (1, '{"user":"alice","action":"click","x":100}');

优点:完全灵活,任何格式都能存。
缺点:

  • 每次查询都要重新解析 JSON 文本,CPU 开销极高
  • 存储层完全不知道内部结构,无法做列裁剪、谓词下推
  • 不同行的 payload.user 可能是 string,也可能是 int,没有类型约束
  • 压缩效率低,字段名被重复存储在每一行里

路径 B:提前打平为 STRUCT

1
CREATE TABLE events (id BIGINT, user STRING, action STRING, x INT, y INT);

优点:完全利用列式存储的优势,列裁剪、谓词下推、编码压缩全部生效。
缺点:

  • 必须提前定义完整 schema,半结构化场景不可行
  • schema 变更成本高(加列需要 ALTER TABLE,历史数据填 NULL)
  • 高度稀疏:大量列对大多数行是 NULL,浪费存储
  • 100 种事件类型需要 union 所有字段,产生数千列

路径 C:Variant

Variant 的出发点是:能不能在保持 schema 灵活性的同时,让存储和查询引擎理解内部结构?

核心思路是把半结构化值编码成一种自描述的二进制格式:每个值携带自己的类型信息,字段名存在共享字典里(避免重复),嵌套结构保持树形编码。

1.3 Variant 带来的收益

能力 STRING STRUCT VARIANT
schema 灵活性 完全灵活 提前固定 灵活,schema-on-read
类型安全 写入时强制 读取时可强制转换
列裁剪(只读需要的字段) 不可能 完整支持 需 shredding 才可能
谓词下推到存储 不可能 完整支持 需 shredding 才可能
字段名压缩 每行重复 无字段名 共享字典,一份
跨行 schema 不一致 透明 不支持 完整支持
查询时 CPU 每次全量解析 零解析 按需解析(仅访问的字段)

2. Variant 的核心设计

Variant 是 Apache Parquet 社区联合制定的开放格式规范(VariantEncoding.md)。下面的 layout 描述全部以 Spark 实现 VariantUtil.java 为准。

2.1 总体结构:value + metadata 两段独立 bytes

1
2
3
VariantVal
├── value bytes —— 一棵自描述的值树:编码每个节点的类型和内容,但不含字段名
└── metadata bytes —— 一张字段名字典(去重的 string pool),value 里用整数 id 引用它

两段各自独立、各自有 ≤ 128 MiB 的大小上限(测试环境 16 MiB)。把字段名抽到 metadata 单独存,是为了:① 同名字段(如数组里每个对象都有的 "id")只存一份;② value 树里用定长整数 id 代替变长字符串,定位更快。

2.2 Header byte:每个 value 节点的第一个字节

value 里每个节点都以一个 header byte 开头:

1
2
3
4
5
 7 6 5 4 3 2 1 0   bit
┌─────────────┬───┐
│ type_info │bt │ bt (basic type) = 低 2 bit
│ 6 bit │2b │ type_info = 高 6 bit
└─────────────┴───┘

basic type(低 2 bit)分四大类,type_info(高 6 bit)的含义随大类而变:

bt 大类 type_info 的含义
0 PRIMITIVE 具体的标量子类型编号(见 2.3)
1 SHORT_STR 字符串长度(0–63),字符串内容紧跟 header
2 OBJECT 三个宽度参数 0_b4_b3b2_b1b0(见 2.4)
3 ARRAY 两个宽度参数 000_b2_b1b0(见 2.5)

2.3 PRIMITIVE 子类型编码(bt = 0)

type_info 此时是子类型编号。下表的”总字节数”= header(1) + 内容,已逐项与 VariantUtil.valueSize 核对:

type_info 类型 内容编码 总字节
0 NULL(JSON null) 1
1 TRUE 1
2 FALSE 1
3 INT1 1 字节小端有符号 2
4 INT2 2 字节小端有符号 3
5 INT4 4 字节小端有符号 5
6 INT8 8 字节小端有符号 9
7 DOUBLE 8 字节 IEEE 754 9
8 DECIMAL4 1 字节 scale + 4 字节小端 unscaled 6
9 DECIMAL8 1 字节 scale + 8 字节小端 unscaled 10
10 DECIMAL16 1 字节 scale + 16 字节小端 unscaled 18
11 DATE 4 字节小端(距 epoch 天数) 5
12 TIMESTAMP 8 字节小端微秒(UTC) 9
13 TIMESTAMP_NTZ 8 字节小端微秒(按 UTC 解释) 9
14 FLOAT 4 字节 IEEE 754 5
15 BINARY 4 字节小端长度 + N 字节内容 5+N
16 LONG_STR 4 字节小端长度 + N 字节 UTF-8 5+N
20 UUID 16 字节大端 17

注意两个反直觉点(都已从源码核实):DECIMAL16 的 unscaled 整数是小端存储(读取时 getDecimal 才反转成大端交给 BigInteger);而 UUID 反而是大端getUuid 显式用 BIG_ENDIAN)。整数写入时自动选最小宽度:值 1 存成 INT1,10 万存成 INT4。字符串 ≤63 字节走 SHORT_STR(长度塞进 header),否则走 LONG_STR。

2.4 OBJECT layout(bt = 2)

type_info0_b4_b3b2_b1b0,三个宽度参数让 layout 随对象大小自适应:

  • b4 → size 字段宽度:0 = 1 字节,1 = 4 字节
  • b3b2 → id 列每个元素宽度:0/1/2 = 1/2/3 字节
  • b1b0 → offset 列每个元素宽度:0/1/2 = 1/2/3 字节

字节布局(size = 字段数;字段按字段名字典序排列,不允许重复名):

1
2
3
4
5
6
7
┌────────┬──────┬─────────────────┬───────────────────────┬──────────────┐
│ header │ size │ id 列 │ offset 列 │ field data │
│ 1 byte │ 1/4B │ size × idSize │ (size+1) × offsetSize │ 各字段值拼接 │
└────────┴──────┴─────────────────┴───────────────────────┴──────────────┘
↑ 每个 id 是 ↑ offset[i] = 第 i 个字段值
metadata 字典 相对 data 区起点的偏移;
里的下标 offset[size] = data 区总长
  • id[i]:第 i 个字段的字段名在 metadata 字典里的 id(不是值!)
  • offset[i]:第 i 个字段在 data 区的起始偏移;offset[i+1]-offset[i] 即第 i 个字段值的字节数
  • 取字段名:getMetadataKey(metadata, id[i]);取字段值:递归解析 data 区起点 + offset[i] 处的节点

2.5 ARRAY layout(bt = 3)

type_info000_b2_b1b0b2 → size 宽度,b1b0 → offset 宽度。与 OBJECT 相比省掉了 id 列(数组元素无字段名):

1
2
3
4
┌────────┬──────┬───────────────────────┬──────────────┐
│ header │ size │ offset 列 │ element data │
│ 1 byte │ 1/4B │ (size+1) × offsetSize │ 各元素值拼接 │
└────────┴──────┴───────────────────────┴──────────────┘

2.6 Metadata layout(字段名字典)

metadata 的第一个字节是它自己的 header,与 value 的 header 编码规则不同

1
2
3
4
5
 7 6 5 4 3 2 1 0   bit
┌───┬─┬───────┐
│oss│ │version│ version = 低 4 bit,当前恒为 1(VERSION_MASK = 0x0F)
│2b │ │ 4 bit │ oss = 最高 2 bit = offsetSize - 1
└───┴─┴───────┘

这是个容易写错的点:version 在低 4 bit,offset 宽度在最高 2 bit((metadata[0] >> 6) & 0x3),中间 bit 4(sorted_strings 标志)此实现未使用。

整体布局(offsetSize 由上面 header 的最高 2 bit 决定,取 1–4 字节):

1
2
3
4
┌────────┬───────────┬───────────────────────────┬──────────────────┐
│ header │ dict_size │ offset 列 │ 字符串数据 │
│ 1 byte │ offsetSize│ (dict_size+1) × offsetSize │ 所有 key 拼接 │
└────────┴───────────┴───────────────────────────┴──────────────────┘
  • dict_size:字典里 key 的个数
  • offset[i]:第 i 个 key 在字符串数据区的起始偏移;key 连续拼接、不存各自长度,靠 offset[i+1]-offset[i] 求长
  • 字符串数据区起点 = 1 + (dict_size + 2) × offsetSize

2.7 完整编码实例:{"a": 1, "b": "hi"}

把这三层拼起来走一遍。字段名字典 = ["a","b"](id 0 = a,id 1 = b)。

metadata(7 字节,offsetSize=1):

1
2
3
4
01    header:version=1, offsetSize-1=0
02 dict_size = 2
00 01 02 offsets:a∈[0,1), b∈[1,2),末尾 2 = 总长
61 62 字符串数据 "ab"

value(12 字节,一个 OBJECT)。字段按名排序后是 aba 的值 1 → INT1(0C 01),b 的值 "hi" → SHORT_STR 长度2(09 68 69):

1
2
3
4
5
6
02    header:OBJECT, largeSize=0, idSize=1, offsetSize=1
02 size = 2(两个字段)
00 01 id 列:字段 a→id0, 字段 b→id1
00 02 05 offset 列:a 的值∈[0,2), b 的值∈[2,5),末尾 5 = data 总长
0C 01 data[0..2):字段 a 的值 = INT1(1)
09 68 69 data[2..5):字段 b 的值 = SHORT_STR("hi")

要取 $.b:读 value header 知道是 OBJECT → 在 id 列二分找 getMetadataKey(meta, id)=="b" 的位置(这里第 1 个)→ 用对应 offset 跳到 data 起点+2 → 解析出 SHORT_STR "hi"。整个过程只触碰需要的字节,不必解析字段 a

2.8 三个层次:引擎类型、文件格式、表格式

理解 Variant 实现的关键,是把三个不同层次的”支持”分开看。它们各自独立,可以单独具备或缺失:

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────────────┐
│ 引擎层 (Spark catalyst) │
│ VariantType —— plan / expression / InternalRow 里的类型 │
│ variant_get / parse_json 等函数在这里求值 │
├─────────────────────────────────────────────────────────┤
│ 表格式层 (Iceberg / Delta / Hive) │
│ 表 schema 里的列类型;负责文件级元数据、统计、裁剪 │
├─────────────────────────────────────────────────────────┤
│ 文件格式层 (Parquet / ORC / Avro) │
│ 物理字节如何落盘;shredding 是这一层的能力 │
└─────────────────────────────────────────────────────────┘
  • 内存中:Variant 是 VariantValvalue bytes + metadata bytes),只在 executor 里存在,是引擎层的概念。
  • 持久化时:需要文件格式来保存这两段 bytes;表格式负责把这些文件组织成”表”。

parse_json('{"a":1}') 这个表达式只发生在引擎层,构建一个 VariantVal 放进内存,与任何文件/表格式无关。这就是为什么”Variant 支持”在不同层次可以解耦——后面三个场景正是这三层的不同组合。

一个常被忽略的事实:同一个 Spark 集群里有两条完全不同的 Parquet 读写路径

  1. Spark 自带的 Parquet 数据源ParquetFileFormat,V1 file source)——spark.read.parquet(path) 走这条
  2. DataSourceV2 表格式自带的 Parquet 读写——Iceberg / Delta 用自己的 iceberg-parquet reader,不经过 ParquetFileFormat

这两条路径对 Variant 的处理机制完全不同,是区分场景一和场景三的根本。


3. 场景一:只有 Spark 支持(引擎层有 VariantType)

这里”只有 Spark”指:用 Spark 自带的 Parquet/JSON 数据源读写,不经过 Iceberg 等表格式。

3.1 类型定义

Spark 4.0 在 catalyst 类型系统中引入了 VariantType@Unstable@since 4.0.0):

1
2
3
4
5
// sql/api/src/main/scala/org/apache/spark/sql/types/VariantType.scala
case object VariantType extends AtomicType {
override def defaultSize: Int = 2048
// Variant 值永远 nullable,asNullable 是 no-op
}

这是 Spark 引擎层的原生类型,存在于 catalyst plan、expression tree 和运行时的 InternalRow 里。

3.2 两种落盘形态决定了能力上限

Spark 自带 Parquet 写 Variant 有两种形态,能力差异巨大:

形态 A:不透明 blob(无 shredding)

VariantType 映射为一个 Parquet GROUP(ParquetSchemaConverter.scala:902):

1
2
3
4
col_name: GROUP (LogicalType = variant) {
value: BYTE_ARRAY (required) ← 完整 value bytes
metadata: BYTE_ARRAY (required) ← 完整 metadata bytes
}

整个 Variant 是两块对存储引擎不透明的 bytes。

形态 B:shredding(拆字段)

Spark 自带 Parquet 写入器可开启 shredding(ParquetOutputWriterWithVariantShredding.scala),把高频字段拆成独立列:

1
2
3
4
5
6
7
8
col_name: GROUP (LogicalType = variant) {
value: BYTE_ARRAY ← 未被拆出的部分(fallback)
metadata: BYTE_ARRAY ← 字段名字典
typed_value: GROUP { ← 拆出的字段,独立 Parquet 列
a: INT64
b: BYTE_ARRAY
}
}

写入流程:缓冲前 4096 行(或 64 MiB)→ InferVariantShreddingSchema 分析字段分布并推断 shredding schema(整数统一 widen 为 long,类型冲突或超过 300 字段的退回 value blob)→ 落盘。

写 JSON 文件时则完全不同:Variant 被 v.toJson() 还原为 JSON 文本写出(JacksonGenerator.scala:341)。CSV 仅支持读不支持写,ORC 完全不支持。

3.3 类型解析与 Subfield 解析

variant_get(col, '$.a.b', 'int') 的解析过程:

Analysis 阶段

  1. 解析 col → 确认类型为 VariantType,通过
  2. 解析 '$.a.b' → 这只是字符串字面量,不做路径有效性检查,不在 schema 里查找字段 a.b
  3. dataType 由第三个参数决定,返回 IntegerType

这与 STRUCT 子字段解析截然不同——后者在 analysis 阶段就验证字段存在性:

1
2
SELECT s.nonexistent FROM t;                            -- STRUCT:编译报错
SELECT variant_get(v, '$.nonexistent', 'int') FROM t; -- VARIANT:运行时返回 NULL

运行时VariantPathParser 把路径解析为 Array[VariantPathSegment] → 在 VariantVal binary 里逐层定位 → 用 Cast 转目标类型 → 找不到返回 NULL;转换失败时 variant_get 抛异常、try_variant_get 返回 NULL。

3.4 谓词下推与列裁剪:取决于是否 shredding

这是形态 A 与形态 B 的分水岭。纯引擎层的 VariantType 本身不带来下推能力,下推来自 shredding + 一条优化器规则。

**关键优化器规则 PushVariantIntoScan**(PushVariantIntoScan.scala):

它把 Variant 列改写成一个带注解的 struct,把 variant_get 改写成普通 struct 字段访问:

1
2
3
4
5
6
7
8
9
改写前:
Project [v:a::int, v:b::string]
Filter [v:a::int = 1]
Relation [v: variant]

改写后:
Project [v.0, v.1]
Filter [v.0 = 1]
Relation [v: struct<0: int @path="$.a", 1: string @path="$.b">]

之后 v.0 = 1 就是一个普通 struct 字段谓词,能下推到 Parquet 的 typed_value.a 列,利用 row group min/max 统计跳过数据;列裁剪也只读 typed_value.a 这一列,不读完整 value blob。

形态 A(blob) 形态 B(shredding)
谓词下推 不可能(blob 对存储不透明) 可能(仅 shredded 字段)
列裁剪 不可能(必读整个 blob) 可能(仅 shredded 列)
Parquet statistics 无效 shredded 字段有效

关键限制PushVariantIntoScan 的匹配条件被**硬编码为 ParquetFileFormat**(PushVariantIntoScan.scala:374):

1
case ...HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _)... =>

也就是说,这条规则只对 Spark 自带的 Parquet 数据源生效。Iceberg 等 DataSourceV2 表走的是完全不同的代码路径,这条规则对它们不触发——这正是场景三需要另一套机制的根本原因。

3.5 优缺点小结

优点:完整 SQL 函数集;不依赖表格式;shredding 下可获得字段级下推和裁剪;跨文件格式(JSON/XML/CSV 读、Parquet 写)。

缺点

  • 无 shredding 时,每行 Variant blob 必须全量读入,无下推无裁剪
  • 下推能力强绑定 Spark 自带 Parquet 数据源,换成 DSv2 表格式即失效
  • 没有表级元数据/事务/schema 演进(那是表格式的职责)
  • Spark 写出的 Parquet(带 variant logical type 或 shredding)需 reader 认识该约定才能正确读回

4. 场景二:只有 Iceberg 支持(Spark 引擎层无 VariantType)

4.1 问题的本质

假设 Iceberg 在其类型系统中加入了 Types.VariantType,但 Spark 的 catalyst 层没有 VariantType(例如旧版本 Spark)。Spark-Iceberg 连接器读到 Iceberg 表 schema 里的 VARIANT 时,必须把它映射到某个 Spark 能理解的类型。这是问题的核心。

4.2 类型映射的三种选项

映射 用户看到的类型 可用函数 代价
A: BinaryType Array[Byte] 完全不可用,仅作 bytes 逃生通道
B: StringType JSON 文本 get_json_object/json_tuple/from_json 精度损失、UDF 黑盒、反序列化贵
C: StructType struct<a:bigint,b:string> 普通字段访问 schema 静态,新字段需 migration

选项 B(StringType) 是最容易落地的:连接器读 Variant 后 toJson() 暴露为字符串。

1
SELECT get_json_object(variant_col, '$.age') FROM t;

get_json_object 对 Spark 优化器是黑盒 UDF,无法产生下推计划;JSON 文本对 timestamp/decimal 有精度歧义。

选项 C(StructType) 让连接器通过 schema 推断把 Variant 暴露成 struct:

1
SELECT variant_col.user_id, variant_col.event_type FROM t;  -- Spark 当成普通 struct

子字段在 analysis 阶段解析(GetStructField),常规谓词下推和列裁剪能生效,但 schema 是静态的——不在推断 schema 里的字段无法访问,新字段写入后需要更新映射。

4.3 Subfield 解析与下推

映射 Subfield 解析时机 未知字段行为 下推 / 裁剪
BinaryType 不支持 无法访问
StringType 运行时(UDF) 返回 NULL 无(优化器黑盒)
StructType Analysis 阶段 编译报错 有(在连接器层实现)

在这个场景下,Spark 引擎完全不懂 Variant,所有优化只能在 Iceberg 连接器内部、用 Spark 已有的类型语义(struct)去实现。

4.4 优缺点

优点:对 Spark 透明,旧版本 Spark 也能用;列裁剪、部分下推可在连接器层独立实现;schema 演进由 Iceberg 管理。

缺点:映射类型丢失语义(Binary 不可用 / String 精度损失 / Struct 不灵活);用不了 variant_get 等原生函数;codegen 无法针对 Variant 优化;除 StructType 外路径错误延迟到运行时。


5. 场景三:Spark + Iceberg 两方协同

这是真正”原生支持”的形态:Spark 引擎懂 Variant,Iceberg 表格式也懂 Variant,两方各司其职。注意 Parquet 在这里只是两方共用的物理底座——Iceberg 用自己的 iceberg-parquet reader 读写它,不经过 Spark 的 ParquetFileFormat,所以这是”两方”而非”三方”。

5.1 两方的职责划分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────────────────── Spark(引擎,"提问方")────────────────────────────┐
│ - VariantType(catalyst 类型) │
│ - variant_get / parse_json / schema_of_variant 等函数的求值 │
│ - 把用户查询中的 variant 提取,通过 DSv2 接口"递"给数据源 │
│ SupportsPushDownVariantExtractions(@since 4.1.0, @Experimental) │
└────────────────────────────────────┬───────────────────────────────────────────┘
│ DSv2 pushdown 契约
┌────────────────────────────────────┴──── Iceberg(表格式,"执行方")────────────┐
│ - Types.VariantType(Iceberg schema 类型 + field-id) │
│ - 连接器类型转换:Iceberg VariantType ←→ Spark VariantType │
│ - 自己的 Parquet reader/writer,理解 shredding 布局,重建 VariantVal │
│ - 实现 SupportsPushDownVariantExtractions,决定接受哪些提取 │
│ - manifest 级统计与文件裁剪(built-in Parquet 路径没有的额外一层) │
└──────────────────────────────────────────────────────────────────────────────────┘

现状提示:截至当前,Apache Iceberg 的 TypeID 枚举里还没有 VARIANT,Spark-Iceberg 连接器也还没有 VariantType 映射。本节描述的是 Spark 侧已经备好的契约(SupportsPushDownVariantExtractions 等接口确实存在于代码中)和 Iceberg 侧需要补齐的实现。

5.2 类型链与类型解析

三层类型,但读写由 Spark 引擎和 Iceberg 两方负责:

1
2
3
4
5
Iceberg 表 schema:   Types.VariantType(带 field-id)
↕ 连接器 TypeToSparkType / SparkTypeToType
Spark catalyst: VariantType
↕ Iceberg 自己的 iceberg-parquet reader(非 ParquetFileFormat)
Parquet 物理: GROUP { value, metadata, typed_value{...} }

读路径类型解析:Iceberg reader 读到带 variant logical type 的 Parquet GROUP → 还原为 VariantVal → 连接器声明该列 Spark 类型为 VariantType → 用户 SQL 里 col 即 VariantType。

5.3 Subfield 解析:DSv2 契约,不是 PushVariantIntoScan

这是与场景一最大的区别。Iceberg 是 DSv2 源,**PushVariantIntoScan 不触发**。取而代之,Spark 通过 SupportsPushDownVariantExtractions 把提取请求”递”给 Iceberg:

1
2
// Spark 调用,Iceberg 的 SparkScanBuilder 实现
boolean[] pushVariantExtractions(VariantExtraction[] extractions);

每个 VariantExtraction 描述一次提取(VariantExtraction.java):

1
2
3
4
5
interface VariantExtraction {
String[] columnName(); // ["payload"] 或 ["structCol","innerVariant"]
DataType expectedDataType(); // 目标类型,如 IntegerType
Metadata metadata(); // path="$.age", failOnError, timeZoneId
}

协同流程:

1
2
3
4
5
6
7
8
9
用户:variant_get(payload, '$.age', 'int')

Spark 优化器:构造 VariantExtraction(columnName=["payload"], path="$.age", type=int)

调用 iceberg.scanBuilder.pushVariantExtractions([该 extraction])

Iceberg 检查:$.age 是否被 shredding?类型/路径是否支持?
├─ 接受 → 返回 true,Iceberg 负责读 typed_value.age 并产出 int 列
└─ 拒绝 → 返回 false,Spark 读回完整 Variant 后自己用 variant_get 求值

返回的 boolean[] 让下推可以部分接受:Iceberg 能处理的字段下推,不能处理的回退给 Spark。这比场景一的”全有或全无”更灵活。

Subfield → 物理列的绑定发生在 Iceberg reader 内部(根据 Spark 递来的 extractions),而不是 Spark 的优化器里。路径字符串本身依旧不在 analysis 阶段校验,未命中返回 NULL。

5.4 谓词下推:两层裁剪

Iceberg 路径比场景一多了一层 manifest 级裁剪:

1
SELECT * FROM iceberg_tbl WHERE variant_get(payload, '$.age', 'int') > 18;
1
2
3
4
5
6
7
8
9
10
① Iceberg manifest 级(文件裁剪,built-in Parquet 没有这层)
若 manifest 记录了 shredded 字段 age 的 min/max
→ 整个 data file 在 scan planning 阶段直接跳过

② Iceberg Parquet reader 级(row group 裁剪)
typed_value.age 列的 row group min/max
→ 跳过不满足的 row group

③ 引擎层(剩余行)
Iceberg 未接受的部分,Spark 读回 VariantVal 后用 variant_get 过滤

能否下推的前提同样是:$.age 被 shredding 拆出来了。未 shred 的字段留在 value blob 里,对统计不可见,只能回退到引擎层逐行过滤。

5.5 列裁剪:Shredding + DSv2 列剪枝

Iceberg reader 根据 Spark 递来的 extractions 与列剪枝信息,只从 Parquet 读被请求的 shredded 列(如只读 typed_value.name),不读 value blob、不读未访问字段。这与场景一形态 B 的效果一致,但实现位于 Iceberg 自己的 reader 中。

5.6 严格 Cast 错误的延迟(DSv2 版)

variant_get 严格模式下,若把 cast 下推给 reader,出错时会丢失原始值信息。DSv2 契约用 companion 字段解决(SupportsPushDownVariantExtractions.supportsDeferCastError()):reader 正常填数据字段,cast 失败时填 companion 字段存原始值,引擎层 UnwrapVariantCastError 再据此抛出带原始值的错误。这套机制在场景一(built-in Parquet,VariantMetadata.castErrorFor)和场景三(DSv2,castErrorFor metadata key)里是对称的两套实现。

5.7 优缺点

优点

  • Spark 原生函数 + Iceberg 表能力(事务、snapshot、schema 演进、隐藏分区)兼得
  • 两层裁剪:Iceberg manifest 文件级 + Parquet row group 级
  • 部分下推(boolean[])比 built-in 路径更灵活
  • field-id 让 Variant 列也能安全地重命名/演进
  • 写出的文件任何认识 Variant shredding 规范的引擎都能读(Trino、Flink 等)

缺点

  • 需要 Iceberg 在类型系统、连接器、iceberg-parquet reader/writer、manifest 统计四处都补齐实现(目前尚未完成)
  • shredding schema 由谁决定、如何随 snapshot 演进,是额外的工程复杂度
  • 依赖较新的 Spark(DSv2 变体提取接口 @since 4.1.0)和未来的 Iceberg 版本

6. 横向对比:四个关键能力维度

为精确起见,把”只有 Spark”按是否 shredding 拆成两列。

6.1 类型定义

Spark·blob Spark·shredding 只有 Iceberg Spark+Iceberg
引擎(catalyst) VariantType VariantType Binary/String/Struct VariantType
表格式(Iceberg) 不经过 不经过 Types.VariantType Types.VariantType
文件(Parquet) GROUP{value,metadata} +typed_value +typed_value +typed_value
读写者 Spark ParquetFileFormat Spark ParquetFileFormat iceberg-parquet iceberg-parquet

6.2 Subfield 解析

Spark·blob Spark·shredding Iceberg·Struct 映射 Spark+Iceberg
解析时机 运行时 优化器(PushVariantIntoScan) Analysis Iceberg reader(DSv2 extractions)
路径校验 编译期
新字段 透明 透明 需 migration 透明
绑定机制 VariantPathParser 改写为 GetStructField GetStructField pushVariantExtractions

6.3 谓词下推

Spark·blob Spark·shredding 只有 Iceberg Spark+Iceberg
触发机制 PushVariantIntoScan(仅 ParquetFileFormat) 连接器内部 SupportsPushDownVariantExtractions
Parquet row group ✓(shredded) ✓(Struct 映射) ✓(shredded)
Iceberg 文件级 ✓(manifest) ✓(manifest)
部分下推 全有或全无 ✓(boolean[])

6.4 列裁剪

Spark·blob Spark·shredding 只有 Iceberg Spark+Iceberg
整列跳过
字段级 I/O 裁剪 ✗(读整个 blob) ✓(shredded 列) 连接器可实现 ✓(shredded 列)
实现位置 Spark 内置 Parquet reader Iceberg reader Iceberg reader

6.5 一句话总结每个场景

  • Spark·blob:引擎懂 Variant,但落盘是不透明 bytes —— 函数齐全,零下推零裁剪。
  • Spark·shredding:引擎 + 自带 Parquet 协同,PushVariantIntoScan 把提取改写成 struct 访问 —— 仅对 ParquetFileFormat 生效。
  • 只有 Iceberg:引擎不懂 Variant,连接器把它”翻译”成 String/Struct —— 损失语义,优化在连接器内闭环。
  • Spark+Iceberg:引擎出 VariantType + 下推契约,Iceberg 出类型 + 自有 reader + manifest 裁剪 —— 走 DSv2 而非 PushVariantIntoScan,多一层文件级裁剪,支持部分下推。

6.6 现实状态(截至 Spark 4.x / Iceberg main)

  • Spark:完整支持 VariantType(4.0),shredding 写入 + PushVariantIntoScan(仅 ParquetFileFormat);DSv2 变体提取接口 SupportsPushDownVariantExtractions(4.1,@Experimental)已就位
  • IcebergTypeID 枚举 VARIANT,连接器 VariantType 映射 —— 场景三尚未在 Iceberg 侧落地
  • 当前可用路径:用 Spark 自带 Parquet 读写 Variant(场景一);或在 Iceberg 表把 JSON 存为 STRING 列,查询时 parse_json() 动态转换

附:关键源码索引

组件 文件
引擎类型定义 sql/api/.../types/VariantType.scala
运行时表示 common/unsafe/.../types/VariantVal.java
二进制编码 common/variant/.../variant/VariantUtil.java
SQL 表达式 sql/catalyst/.../expressions/variant/variantExpressions.scala
优化器下推规则(仅 built-in Parquet) sql/core/.../datasources/PushVariantIntoScan.scala
DSv2 下推接口(表格式用) sql/catalyst/.../connector/read/SupportsPushDownVariantExtractions.java
DSv2 提取描述 sql/catalyst/.../connector/read/VariantExtraction.java
Parquet 类型映射 sql/core/.../parquet/ParquetSchemaConverter.scala:902
Shredding 推断 sql/core/.../parquet/InferVariantShreddingSchema.scala
Shredding Writer sql/core/.../parquet/ParquetOutputWriterWithVariantShredding.scala
Iceberg 类型系统 apache/iceberg: api/.../types/Type.java