1. 1. 1. 整体架构
  2. 2. 2. 基础设施层详解
    1. 2.1. 2.1 SpillConfig — 配置层
    2. 2.2. 2.2 SpillStats — 统计层
    3. 2.3. 2.3 SpillFile — 文件 I/O 层
      1. 2.3.1. SpillFileInfo — 元数据
      2. 2.3.2. SpillWriter — 写路径
      3. 2.3.3. SpillReadFile — 读路径
    4. 2.4. 2.4 SpillPartitionId — 层级分区标识
    5. 2.5. 2.5 SpillState — 分区状态管理
    6. 2.6. 2.6 SpillPartition / SpillPartitionSet — 读回路径
    7. 2.7. 2.7 读流层:SpillMergeStream / BatchStream
  3. 3. 3. Spiller 层详解
    1. 3.1. 3.1 SpillerBase — 抽象基类
    2. 3.2. 3.2 SpillRun — 内存中的 spill 缓冲
    3. 3.3. 3.3 fillSpillRuns — 数据填充
    4. 3.4. 3.4 SpillRun 为何切分 / maxSpillRunRows 的作用
    5. 3.5. 3.5 行列转换:extractSpill 与 extractSpillVector
      1. 3.5.1. extractSpillVector — 控制每批大小
      2. 3.5.2. extractSpill — 行式 → 列式核心转换
    6. 3.6. 3.6 RowContainer 行布局与列提取原理
    7. 3.7. 3.7 Accumulator 中间态提取
    8. 3.8. 3.8 writeSpill 内的 Batch 切分(kTargetBatchRows / kTargetBatchBytes)
    9. 3.9. 3.9 runSpill — 并发写入
    10. 3.10. 3.10 finishSpill — 收尾
    11. 3.11. 3.11 各子类 Spiller 详解
      1. 3.11.1. HashBuildSpiller
      2. 3.11.2. AggregationInputSpiller
      3. 3.11.3. AggregationOutputSpiller
      4. 3.11.4. SortInputSpiller
      5. 3.11.5. SortOutputSpiller
      6. 3.11.6. NoRowContainerSpiller / MergeSpiller
    12. 3.12. 各子类对比
  4. 4. 4. HashJoin Spill 实现
    1. 4.1. 4.1 整体流程
    2. 4.2. 4.2 HashBuildSpiller
    3. 4.3. 4.3 setupSpiller
    4. 4.4. 4.4 ensureInputFits — 内存压力检测
    5. 4.5. 4.5 spillInput — 正在 spill 时的直通路径
    6. 4.6. 4.6 finishHashBuild — 收尾与传递
    7. 4.7. 4.7 reclaim — 内存仲裁触发路径
    8. 4.8. 4.8 postHashBuildProcess / setupSpillInput — 递归恢复
    9. 4.9. 4.9 processSpillInput — 从 spill 文件重建
    10. 4.10. 4.10 Probe 侧 Spill 详解
      1. 4.10.1. HashBuildResult — Build/Probe 信息传递载体
      2. 4.10.2. 路径 1:input spill(探测侧主路径)
      3. 4.10.3. 路径 2:restore 轮的 probe(从 spill 文件读取 probe 行)
      4. 4.10.4. 路径 3:output spill(内存仲裁触发 reclaim)
      5. 4.10.5. prepareForSpillRestore — 多轮恢复的关键
      6. 4.10.6. Probe 侧 Spill 全生命周期
  5. 5. 5. HashAggregation Spill 实现
    1. 5.1. 5.1 两种 Spiller
    2. 5.2. 5.2 GroupingSet::spill() — input spill
    3. 5.3. 5.3 GroupingSet::spill(rowIterator) — output spill
    4. 5.4. 5.4 reclaim 触发路径
    5. 5.5. 5.5 getOutputWithSpill — 归并输出
    6. 5.6. 5.6 mergeNextWithAggregates — 有序归并聚合
    7. 5.7. 5.7 mergeNextWithoutAggregates — distinct 归并
  6. 6. 6. OrderBy Spill 实现
    1. 6.1. 6.1 两种 Spiller
    2. 6.2. 6.2 SortBuffer::spillInput
    3. 6.3. 6.3 SortBuffer::spillOutput
    4. 6.4. 6.4 SortBuffer::finishSpill
    5. 6.5. 6.5 SortBuffer::getOutputWithSpill
  7. 7. 7. 关键设计决策总结
    1. 7.1. 7.1 有序 vs. 无序 Spill
    2. 7.2. 7.2 两阶段 Spill(Input/Output Spiller 分离)
    3. 7.3. 7.3 内存预留机制(proactive spill)
    4. 7.4. 7.4 HashBuild 多线程 Spill 协调
    5. 7.5. 7.5 递归 Spill(HashJoin 专属)
    6. 7.6. 7.6 probed flag Spill(Right/Full Outer Join)
  8. 8. 8. 递归 Spill 流程图
  9. 9. 9. C++ 编码品味:值得学习的细节
    1. 9.1. 9.1 SCOPE_EXIT:让清理逻辑紧贴触发逻辑
    2. 9.2. 9.2 folly::makeGuard:异步任务的”排水阀”
    3. 9.3. 9.3 跨线程异常传递:exception_ptr 模式
    4. 9.4. 9.4 “1 + N” 并发策略:第一个任务不切线程
    5. 9.5. 9.5 CHECK vs DCHECK:按热度分配断言代价
    6. 9.6. 9.6 FOLLY_LIKELY / FOLLY_UNLIKELY:把预期写进代码
    7. 9.7. 9.7 TestValue::adjust:无侵入式测试注入
    8. 9.8. 9.8 NanosecondTimer:RAII 计时的最小实现
    9. 9.9. 9.9 SpillPartitionId:把所有接口设施配齐
    10. 9.10. 9.10 withWLock / withRLock:锁的最小化持有
    11. 9.11. 9.11 SpillRun::sorted:防止双重排序的状态位
    12. 9.12. 9.12 pool_->release():主动归还,而非等待回收
    13. 9.13. 9.13 succinctBytes():有信息量的日志
    14. 9.14. 9.14 testing* 前缀:测试可观测性的统一约定
    15. 9.15. 9.15 constexpr 局部常量:让魔数有名字
    16. 9.16. 编码品味小结
  10. 10. 10. 架构设计解读:优雅背后的工程思考
    1. 10.1. 10.1 最根本的约束:OOM 不能是答案
    2. 10.2. 10.2 分层:每层只管自己的事
    3. 10.3. 10.3 分区:把”全量恢复”变成”分批恢复”
    4. 10.4. 10.4 有序与无序的分叉:根据”读回时的需求”决定
    5. 10.5. 10.5 两阶段 Spill:尊重算子的生命周期
    6. 10.6. 10.6 SpillRun 切分:在多个约束之间找到平衡点
    7. 10.7. 10.7 批量行列转换:峰值内存与 I/O 效率的双重优化
    8. 10.8. 10.8 主动预留 vs. 被动仲裁:两道防线
    9. 10.9. 10.9 NoRowContainerSpiller:让接口适应现实,而非让现实适应接口
    10. 10.10. 10.10 HashJoinBridge:异步协调的最小化接口
    11. 10.11. 10.11 DictionaryVector wrap:零拷贝分区
    12. 10.12. 10.12 小结:设计模式归纳

Velox Spiller

1. 整体架构

Velox 的 spill 系统是一个分层架构,每层职责明确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────────────────────────────┐
│ 算子层 HashBuild / GroupingSet(HashAggregation) / SortBuffer │
│ 触发时机:ensureInputFits / ensureOutputFits / reclaim() │
├─────────────────────────────────────────────────────────────────┤
│ Spiller 层 SpillerBase 及子类 │
│ 职责:从 RowContainer 提取行 → 按 hash 分区 → 排序 → 批量写入 │
│ 子类:HashBuildSpiller / AggregationInputSpiller / │
│ AggregationOutputSpiller / SortInputSpiller / │
│ SortOutputSpiller / NoRowContainerSpiller / MergeSpiller │
├─────────────────────────────────────────────────────────────────┤
│ 状态管理层 SpillState │
│ 职责:管理每个分区的 SpillWriter,追踪已 spill 分区集合 │
├─────────────────────────────────────────────────────────────────┤
│ 分区管理层 SpillPartitionId / SpillPartition / SpillPartitionSet│
│ 职责:层级分区 ID,持有 SpillFiles,提供有序/无序读取接口 │
├─────────────────────────────────────────────────────────────────┤
│ 文件 I/O 层 SpillWriter / SpillReadFile │
│ 职责:RowVector ↔ PrestoPage 序列化/反序列化,管理多文件切分 │
├─────────────────────────────────────────────────────────────────┤
│ 配置与统计层 SpillConfig / SpillStats │
│ 职责:全局配置,原子统计计数器 │
└─────────────────────────────────────────────────────────────────┘

核心设计理念

  • 分区化:spill 数据按 hash key 分区,使得每次只需将一个分区加载回内存,避免全量恢复
  • 递归 spill:若恢复某分区后仍 OOM,可对该分区再次 spill(最多 maxSpillLevel 层)
  • 有序与无序分离:sort-based 算子(Aggregation/OrderBy)spill 时保序,以支持归并输出;HashJoin 无需排序
  • 两阶段 spill:Aggregation/OrderBy 区分 input 阶段与 output 阶段,各用不同的 Spiller

2. 基础设施层详解

2.1 SpillConfig — 配置层

文件:velox/common/base/SpillConfig.h

SpillConfig 是所有 spill 行为的控制中枢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct SpillConfig {
// 回调函数(避免与具体 Task/Driver 实现耦合)
GetSpillDirectoryPathCB getSpillDirPathCb; // 惰性获取 spill 目录
UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb; // 更新全局 quota 并检查是否超限

// 文件参数
std::string fileNamePrefix; // spill 文件名前缀
uint64_t maxFileSize; // 单文件大小上限,超过后自动 rotate
uint64_t writeBufferSize; // 写缓冲区,攒够再 flush
uint64_t readBufferSize; // 读缓冲区,支持 async 双缓冲预读

// 执行参数
folly::Executor* executor; // 非 null 时 spill 写入在后台线程并发执行

// 内存触发门槛
int32_t minSpillableReservationPct; // 可 spill reservation 不低于 used 的 X%
int32_t spillableReservationGrowthPct; // 一次 reserve 扩容 used 的 X%
uint64_t maxSpillRunRows; // 单 run 最多行数(防止 OOM)
uint64_t writerFlushThresholdSize; // TableWriter flush 门槛

// 分区参数(决定 spill 时的并行度)
uint8_t startPartitionBit; // hash 分区起始 bit(e.g. 29)
uint8_t numPartitionBits; // 每层分区 bit 数(e.g. 3 → 8 分区)
int32_t maxSpillLevel; // 最大递归 spill 层数,-1 表示不限

// 归并参数
uint32_t numMaxMergeFiles; // 归并时最多同时 open 的文件数(防 FD 耗尽)

// 排序优化
std::optional<PrefixSortConfig> prefixSortConfig; // 若设置则用 PrefixSort 代替 TimSort

// 其他
common::CompressionKind compressionKind; // 压缩方式
std::string fileCreateConfig; // 传递给底层 FileSystem 的创建选项
uint32_t windowMinReadBatchRows; // Window 算子读取 batch 最小行数
};

关键计算方法

1
2
3
4
5
// 返回当前 bit offset 对应的 spill level
int32_t spillLevel(uint8_t startBitOffset) const;

// 判断是否已超过最大 spill 层数
bool exceedSpillLevelLimit(uint8_t startBitOffset) const;

递归 spill 时,每下一层 startPartitionBit 向右移 numPartitionBits 位,
spillLevel 就是 (startBitOffset - config.startPartitionBit) / numPartitionBits


2.2 SpillStats — 统计层

文件:velox/exec/SpillStats.h

所有字段都是 std::atomic,线程安全,支持跨线程并发更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct SpillStats {
// 计数
std::atomic_uint64_t spillRuns; // spill 触发次数
std::atomic_uint64_t spilledRows; // spill 的行数
std::atomic_uint64_t spilledInputBytes; // spill 前内存中的原始字节数
std::atomic_uint64_t spilledBytes; // 写到磁盘的字节数(含压缩)
std::atomic_uint32_t spilledPartitions; // spill 的分区数
std::atomic_uint64_t spilledFiles; // 生成的 spill 文件数
std::atomic_uint64_t spillMaxLevelExceededCount; // 超过最大 spill 层数次数

// 写路径时间(纳秒)
std::atomic_uint64_t spillFillTimeNanos; // 填充 SpillRun(哈希分区)
std::atomic_uint64_t spillSortTimeNanos; // 排序
std::atomic_uint64_t spillExtractVectorTimeNanos; // 从 RowContainer 提取 RowVector
std::atomic_uint64_t spillSerializationTimeNanos; // PrestoPage 序列化
std::atomic_uint64_t spillFlushTimeNanos; // 压缩 + 拷贝到写缓冲
std::atomic_uint64_t spillWriteTimeNanos; // 写到文件系统
std::atomic_uint64_t spillWrites; // write() 调用次数

// 读路径时间(纳秒)
std::atomic_uint64_t spillReadBytes;
std::atomic_uint64_t spillReads;
std::atomic_uint64_t spillReadTimeNanos;
std::atomic_uint64_t spillDeserializationTimeNanos;
IoStats ioStats;
};

每个算子持有自己的 SpillStats 对象(通过 spillStats_.get() 传递),同时每次更新都调用对应的 updateGlobal*() 函数汇总到进程级全局统计(globalSpillStats()),供 query profile 使用。


2.3 SpillFile — 文件 I/O 层

文件:velox/exec/SpillFile.h/.cpp

SpillFileInfo — 元数据

1
2
3
4
5
6
7
8
9
struct SpillFileInfo {
uint32_t id; // 单调递增的文件 ID
RowTypePtr type; // 数据类型
std::string path; // 文件路径
uint64_t size; // 文件字节数
std::vector<SpillSortKey> sortingKeys; // 排序键(无序 spill 则为空)
common::CompressionKind compressionKind; // 压缩方式
};
using SpillFiles = std::vector<SpillFileInfo>;

SpillWriter — 写路径

继承 SerializedPageFileWriter(底层用 PrestoPage 格式)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
构造参数:
type, sortingKeys : 数据类型与排序键
compressionKind : 压缩方式
pathPrefix : 文件路径前缀,实际路径为 prefix + "_" + fileId
targetFileSize : 单文件目标大小
writeBufferSize : 写缓冲区大小
updateAndCheckSpillLimitCb : quota 回调

关键流程:
write(rows, ranges) → 序列化 RowVector 片段,写入缓冲
finishFile() → 强制 flush + 关闭当前文件,记录 SpillFileInfo
finish() → 收尾所有文件,返回 SpillFiles 列表

文件切分逻辑(父类 SerializedPageFileWriter 实现):
每次 write 后,若当前文件大小 >= targetFileSize → 自动 finishFile()

每次 updateFileStats() 回调会调用 updateAndCheckSpillLimitCb,若全局 spill 字节超过 query limit 则抛 VELOX_SPILL_LIMIT_EXCEEDED 异常。

SpillReadFile — 读路径

1
2
3
create(fileInfo, bufferSize, pool, stats) → 打开文件,设置读缓冲
nextBatch(batch) → 反序列化下一 batch RowVector
→ 若文件系统支持 async IO,双缓冲预读

2.4 SpillPartitionId — 层级分区标识

文件:velox/exec/Spill.h:277

这是支持递归 spill 的核心数据结构。用单个 uint32_t 编码完整的层级路径:

1
2
3
4
5
6
7
8
9
10
11
Bit 布局(低位在前):
bits [2:0] : Level 1 分区号(最多 8 个)
bits [5:3] : Level 2 分区号
bits [8:6] : Level 3 分区号
bits [11:9] : Level 4 分区号
bits [28:12] : 未使用
bits [31:29] : 当前所在层级(0-3)

常量:
kMaxSpillLevel = 3 (最多 4 层,层级编号 0~3)
kMaxPartitionBits = 3 (每层最多 8 分区)

构造方式

1
2
SpillPartitionId(uint32_t partitionNumber);           // 创建 level 0 分区
SpillPartitionId(SpillPartitionId parent, uint32_t n); // 创建子分区

有序性operator<):

1
2
3
4
5
6
排序规则:
1. 相同父节点的子分区按分区号从小到大排列
2. 不同层级时,按层级路径字典序比较
3. 浅层节点排在其子节点前面(DFS 前序)

示例:p_0 < p_1 < p_2_0 < p_2_1 < p_2_2 < p_3

这个有序性使 SpillPartitionSetstd::map<SpillPartitionId, ...>)能以 DFS 前序遍历,确保处理完父 partition 后再处理其子 partition(递归 spill 场景)。


2.5 SpillState — 分区状态管理

文件:velox/exec/Spill.h:581, Spill.cpp:131

每个 Spiller 持有一个 SpillState,负责管理所有分区的写入器:

1
2
3
4
5
6
7
8
class SpillState {
// 已 spill 分区的 ID 集合(用于快速判断某 partition 是否已开始 spill)
SpillPartitionIdSet spilledPartitionIdSet_;

// partition -> SpillWriter 的映射,folly::Synchronized 保证线程安全
// 不同 partition 可以并发写入(不同线程写各自的 partition)
folly::Synchronized<SpillPartitionWriterSet> partitionWriters_;
};

**核心方法 appendToPartition**:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
uint64_t SpillState::appendToPartition(
const SpillPartitionId& id, const RowVectorPtr& rows) {

// 1. 惰性获取 spill 目录(每次调用都通过 callback,目录可能在首次写时才创建)
auto spillDir = getSpillDirPathCb_();

// 2. 若该 partition 还没有 writer,在 wLock 内创建(路径格式:dir/prefix-spill-encodedId)
partitionWriters_.withWLock([&](auto& lockedWriters) {
if (!lockedWriters.contains(id)) {
lockedWriters.emplace(id, std::make_unique<SpillWriter>(
rows->type(), sortingKeys_,
compressionKind_,
fmt::format("{}/{}-spill-{}", spillDir, fileNamePrefix_, id.encodedId()),
targetFileSize_, writeBufferSize_, ...));
}
});

// 3. 验证序列化大小不超过 2GB(PrestoPage 限制)
validateSpillBytesSize(rows->estimateFlatSize());
updateSpilledInputBytes(bytes);

// 4. 写入(不持有 wLock,不同 partition 并发写互不干扰)
IndexRange range{0, rows->size()};
return partitionWriter(id)->write(rows, folly::Range<IndexRange*>(&range, 1));
}

关键设计partitionWriters_ 只在创建 writer 时加 wLock,实际 write() 不加锁——因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突。


2.6 SpillPartition / SpillPartitionSet — 读回路径

文件:velox/exec/Spill.h:457

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class SpillPartition {
SpillPartitionId id_;
SpillFiles files_; // 该 partition 的所有文件
uint64_t size_; // 总字节数

// 无序读(HashJoin 用):逐文件顺序消费,不保序
std::unique_ptr<UnorderedStreamReader<BatchStream>> createUnorderedReader(...);

// 有序读(Aggregation/OrderBy 用):N 路败者树归并,保序
std::unique_ptr<TreeOfLosers<SpillMergeStream>> createOrderedReader(...);

// 均分文件列表为 N 个 shard(并行算子使用)
std::vector<std::unique_ptr<SpillPartition>> split(int numShards);
};

// SpillPartitionSet 是按 SpillPartitionId 有序排列的 map
using SpillPartitionSet = std::map<SpillPartitionId, std::unique_ptr<SpillPartition>>;

createOrderedReader 的防 FD 耗尽机制

1
2
3
4
5
6
7
若 files_.size() > numMaxMergeFiles(且 numMaxMergeFiles >= 2):
1. 将文件分组,每组最多 numMaxMergeFiles 个
2. 对每组文件创建 TreeOfLosers 做局部归并,写出中间文件
3. 递归,直到文件数 <= numMaxMergeFiles
4. 最终对剩余文件建 TreeOfLosers 返回

这避免了同时打开 O(所有文件) 个文件描述符导致的 OOM 或 FD 耗尽。

2.7 读流层:SpillMergeStream / BatchStream

文件:velox/exec/Spill.h:56

BatchStream — 无序批次流(HashJoin 用):

1
2
3
接口:nextBatch(batch) → bool
实现:FileSpillBatchStream - 单文件
ConcatFilesSpillBatchStream - 多文件顺序串联

SpillMergeStream — 有序合并流(Aggregation/OrderBy 用):

1
2
3
4
5
6
接口:hasData() / currentIndex() / current() / pop()
实现:FileSpillMergeStream - 单文件
ConcatFilesSpillMergeStream - 多文件顺序串联(文件间已有序)

关键方法 compare():按 sortingKeys_ 逐列比较当前行,供 TreeOfLosers 使用
关键方法 pop():消费当前行;若当前 batch 用完,调用 nextBatch() 加载下一批

TreeOfLosers<SpillMergeStream> 是 N 路归并排序的败者树实现,next() 每次返回所有流中最小的行所在的流。


3. Spiller 层详解

3.1 SpillerBase — 抽象基类

文件:velox/exec/Spiller.h:29, Spiller.cpp:31

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class SpillerBase {
protected:
RowContainer* const container_; // 持有行数据的容器(nullptr 表示无容器)
folly::Executor* const executor_; // 非 null 时并发写 spill
const HashBitRange bits_; // hash 分区 bit 范围
const RowTypePtr rowType_;
const uint64_t maxSpillRunRows_; // 单 run 最大行数
const std::optional<SpillPartitionId> parentId_; // 递归 spill 时的父 ID

bool finalized_{false}; // finishSpill 后置 true,不允许再写
SpillState state_; // 管理所有 partition writer
folly::F14FastMap<SpillPartitionId, SpillRun> spillRuns_; // 内存中的行缓冲

// 纯虚方法(子类决定行为差异)
virtual bool needSort() const = 0; // 是否在写前排序
virtual std::string type() const = 0; // 类型名称(日志用)
virtual void extractSpill(folly::Range<char**>, RowVectorPtr&); // 从 container 提取
virtual void runSpill(bool lastRun); // 执行一次 spill run 的写入
};

3.2 SpillRun — 内存中的 spill 缓冲

1
2
3
4
5
6
7
8
9
10
11
12
struct SpillRun {
SpillRows rows; // char* 指针数组,指向 RowContainer 内部的行
uint64_t numBytes{0}; // 所有行的估算总字节数(通过 rowSize() 累加)
bool sorted{false}; // 是否已排序(避免同一 run 被重复排序)

void clear() {
rows.clear();
rows.shrink_to_fit(); // 主动归还内存,spill 后立即回收指针数组
numBytes = 0;
sorted = false;
}
};

SpillRun::rows 存的是 RowContainer 内部的原始行指针(char*),不拷贝行数据本身,仅持有指针。真正的行数据仍在 RowContainer 的 Arena 内存中,直到 extractSpill 时才物化为列式 RowVector。每个分区(SpillPartitionId)对应一个 SpillRun,存在 spillRuns_ map 中。


3.3 fillSpillRuns — 数据填充

这是 spill 流程的第一步,负责扫描 RowContainer、计算 hash、将行指针按分区放入对应 SpillRun。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
bool SpillerBase::fillSpillRuns(RowContainerIterator* iterator) {
constexpr int32_t kHashBatchSize = 4096; // 每次从 RowContainer 取多少行

bool lastRun{false};
uint64_t totalRows{0};

for (;;) {
// 1. 从 RowContainer 按迭代器顺序取下一批行指针(迭代器原地推进)
const auto numRows =
container_->listRows(iterator, kHashBatchSize, rows.data());
if (numRows == 0) { lastRun = true; break; } // 全部取完,这是最后一轮

auto rowSet = folly::Range<char**>(rows.data(), numRows);

// 2. 计算 hash(逐列迭代,第 2 列起 combine=true,追加 XOR 到已有 hash)
if (!isSinglePartition) {
for (auto i = 0; i < container_->keyTypes().size(); ++i) {
container_->hash(i, rowSet, /*combine=*/i > 0, hashes.data());
}
}

// 3. 按 hash 将行指针分配到对应分区的 SpillRun
for (auto i = 0; i < numRows; ++i) {
const auto partitionNum =
isSinglePartition ? 0 : bits_.partition(hashes[i]);
auto& spillRun = createOrGetSpillRun(SpillPartitionId(partitionNum));
spillRun.rows.push_back(rows[i]);
spillRun.numBytes += container_->rowSize(rows[i]); // 估算行字节数
}

totalRows += numRows;
// 4. 达到 maxSpillRunRows_ 上限时暂停,进入 runSpill,之后再继续
if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) break;
}

markSeenPartitionsSpilled(); // 标记本次出现的所有 partition 为已 spill
return lastRun;
}

hash() 的多列迭代细节:第 i=0combine=false,直接写 hashes[];后续列 combine=true,将新 hash 值 XOR 到已有结果上。这使多列 hash 只需遍历一次行集合,CPU 友好。


3.4 SpillRun 为何切分 / maxSpillRunRows 的作用

spill() 的主循环是 do { fillSpillRuns; runSpill; } while (!lastRun)maxSpillRunRows_ 控制每轮 fillSpillRuns 最多处理多少行,从而将整个大 RowContainer 的 spill 分成多个小 run 依次执行。这样做有四个原因:

1. 指针数组本身的内存代价

SpillRun::rowsstd::vector<char*>,每个元素 8 字节。若 RowContainer 有 1 亿行,不切分时仅指针数组就需要 800 MBmaxSpillRunRows_ 通常设为数十万行,指针数组只占几 MB。

2. 排序的内存代价(仅 sorted spill)

ensureSortedrun.rows 原地排序(TimSort / PrefixSort),比较函数需要间接访问行数据。run.rows.size() 越大,排序所需的辅助内存和 cache miss 越高。切小 run 使每次排序在 cache 中完成。

3. 分步释放,配合内存仲裁

每轮 runSpill 结束后调用 run.clear()(含 shrink_to_fit()),立即归还指针数组内存。内存仲裁器在两轮 run 之间可观察到内存已下降,有机会停止 spill(若目标内存已释放够了)。若一次性塞满所有行,仲裁器只能在整个 spill 完成后才能看到效果。

4. PrestoPage 序列化限制

SpillState::appendToPartition 调用 validateSpillBytesSize,检查单次写入的估算字节数不超过 2 GBstd::numeric_limits<int32_t>::max())——这是 PrestoPage 协议的字段宽度限制。切分 run 使每次写入远小于这个上限,避免序列化失败。

1
2
3
4
5
6
7
maxSpillRunRows_ 的取值:
HashBuildSpiller → spillConfig->maxSpillRunRows(默认数十万行)
AggregationInputSpiller → spillConfig->maxSpillRunRows
AggregationOutputSpiller → spillConfig->maxSpillRunRows
SortInputSpiller → spillConfig->maxSpillRunRows
SortOutputSpiller → spillConfig->maxSpillRunRows
NoRowContainerSpiller → 0(无上限,直接 appendToPartition,无 RowContainer)

3.5 行列转换:extractSpill 与 extractSpillVector

spill 的核心转换:行式 RowContainer → 列式 RowVector。这发生在 writeSpill → extractSpillVector → extractSpill 调用链中。

extractSpillVector — 控制每批大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int64_t SpillerBase::extractSpillVector(
SpillRows& rows,
int32_t maxRows, // 上限:64 行
int64_t maxBytes, // 上限:256 KB(估算)
RowVectorPtr& spillVector,
size_t& nextBatchIndex) { // 游标,指向 rows[] 中下一个未处理的行

// 1. 确定本批行数:不超过 maxRows,且估算总字节不超过 maxBytes
int32_t numRows = 0;
int64_t bytes = 0;
auto limit = std::min(rows.size() - nextBatchIndex, (size_t)maxRows);

for (; numRows < limit; ++numRows) {
bytes += container_->rowSize(rows[nextBatchIndex + numRows]);
if (bytes > maxBytes) {
++numRows; // 超了也要包含这一行(至少保证提取 1 行,避免死循环)
break;
}
}

// 2. 真正执行行 → 列转换
extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector);
nextBatchIndex += numRows;
return bytes;
}

rowSize() 的计算

1
2
3
4
5
6
uint32_t RowContainer::rowSize(const char* row) const {
return fixedRowSize_ +
(rowSizeOffset_
? *reinterpret_cast<const uint32_t*>(row + rowSizeOffset_)
: 0);
}

fixedRowSize_ 是编译时确定的固定部分大小;rowSizeOffset_ 指向行尾的一个 uint32_t 字段,记录该行的变长部分(out-of-line 字符串)总字节数,两者相加为该行的完整内存占用估算。

extractSpill — 行式 → 列式核心转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void SpillerBase::extractSpill(
folly::Range<char**> rows, // 本批 char* 指针
RowVectorPtr& resultPtr) {

// 1. 分配或复用 RowVector
if (resultPtr == nullptr) {
resultPtr = BaseVector::create<RowVector>(
rowType_, rows.size(), memory::spillMemoryPool());
} else {
resultPtr->prepareForReuse(); // 复用已有分配,避免重复 malloc
resultPtr->resize(rows.size());
}

auto* result = resultPtr.get();

// 2. 提取各普通列(key + dependent 列)
for (auto i = 0; i < container_->columnTypes().size(); ++i) {
container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
}

// 3. 提取各 accumulator 列(聚合中间状态)
const auto& accumulators = container_->accumulators();
for (auto i = 0; i < accumulators.size(); ++i) {
accumulators[i].extractForSpill(
rows, result->childAt(i + container_->columnTypes().size()));
}
}

spillMemoryPool() 是专用的 spill 内存池,与算子的 operator pool 独立,防止 spill 写入过程中分配的临时内存被算子内存统计。


3.6 RowContainer 行布局与列提取原理

理解行列转换,需要先了解 RowContainer 的行内存布局:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
每行在 Arena 中的内存布局(低地址 → 高地址):

┌──────────────────────────────────────────────────────┐
│ normalizedKey (可选,8 字节) │ ← 排序优化前缀
├──────────────────────────────────────────────────────┤
│ null bits(每列 1 bit,对齐到 8 位) │ ← 列 nullable 标志
├──────────────────────────────────────────────────────┤
│ 固定宽度列值(int32/int64/float/double 等,inline 存储)│
│ 变长列引用(StringView = 8B ptr + 8B size,实际数据 │
│ 通过 HashStringAllocator out-of-line 存储)│
├──────────────────────────────────────────────────────┤
│ accumulator 固定部分(各 aggregate 状态,inline 存储) │ ← 每个 agg 对齐存放
├──────────────────────────────────────────────────────┤
│ rowSizeOffset(uint32_t,变长部分总字节数) │ ← 仅含变长列时存在
├──────────────────────────────────────────────────────┤
│ probed flag(1 bit,仅 right/full join) │
└──────────────────────────────────────────────────────┘

extractColumn(rows[], numRows, columnIndex, result) 的工作:

1
2
3
4
5
6
1. 确定该列在行内的 byte offset(通过 RowColumn 对象,编译时固定)
2. 读取 null bit 填充 result 的 null bitmap
3. 对每行读取 offset 处的值:
- 固定宽度列:直接 memcpy 4/8 字节
- StringView 列:读取 ptr+size,若数据是 inline(≤12B)直接复制;
否则从 HashStringAllocator 的堆上读取实际字符串,写入 result 的 string buffer

关键性能特征

  • 同一列的 offset 在所有行中相同(columnAt(i) 编译时固定),无需解析行 header
  • 变长列(StringView)在行中只存引用,实际字符串在 HashStringAllocator 堆上;提取时需要一次间接访问,可能有 cache miss
  • freezeAndExecute 在 spill 期间冻结 HashStringAllocator,防止其他线程并发修改变长数据

3.7 Accumulator 中间态提取

对于聚合算子,extractForSpill 提取的不是最终聚合结果,而是中间状态(partial intermediate)

1
2
3
4
void Accumulator::extractForSpill(folly::Range<char**> groups, VectorPtr& result) const {
spillExtractFunction_(groups, result);
// 实际调用:aggregate->extractAccumulators(groups.data(), groups.size(), &result)
}

extractAccumulatorsAggregate 虚函数,每种聚合函数自己实现。例如:

  • sum(int64) → 提取 int64 partial sum
  • avg(double) → 提取 (sum: double, count: int64) row struct
  • array_agg(T) → 提取序列化的数组(out-of-line 存储,需要 copy)

恢复(unspill)时,updateRow 调用 aggregate->addSingleGroupIntermediateResults,将磁盘读回的中间状态与内存中的行合并:

1
2
3
4
5
6
7
8
9
void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
mergeSelection_.setValid(input.currentIndex(), true);
for (auto i = 0; i < aggregates_.size(); ++i) {
mergeArgs_[0] = input.current().childAt(i + keyChannels_.size());
// 将 spill 文件中的中间状态合并到 row 的 accumulator
aggregates_[i].function->addSingleGroupIntermediateResults(
row, mergeSelection_, mergeArgs_, false);
}
}

这实现了”流式归并聚合”:相同 key 的行按排好的顺序依次到来,逐行 merge accumulator,避免将整个 group 缓存在内存中。


3.8 writeSpill 内的 Batch 切分(kTargetBatchRows / kTargetBatchBytes)

即使在同一个 SpillRun 内部,writeSpill 也不是一次性将所有行物化为 RowVector,而是每次 extractSpillVector 只处理 最多 64 行或 256 KB(whichever first):

1
2
3
4
5
6
7
SpillRun.rows = [ptr0, ptr1, ..., ptr_N]   // N 可能有数十万

writeSpill 循环:
batch 0: [ptr0 .. ptr63] → extractSpill → RowVector(64行) → appendToPartition
batch 1: [ptr64 .. ptr127] → extractSpill → RowVector(64行) → appendToPartition
...
batch K: [ptr_last_start .. ptr_N] → extractSpill → RowVector(M行) → appendToPartition

为什么要切成 64 行/256KB 的小 batch?

① RowVector 内存峰值控制

每次 extractSpill 分配(或复用)一个 RowVector,其内存由 spillMemoryPool 分配。单批 64 行的 RowVector 内存占用可控(通常几 KB 到几百 KB),而 prepareForReuse() 使相邻批次复用同一块内存,峰值内存 ≈ 单批大小,而非整个 run 大小。

② 与 SpillWriter 写缓冲的配合

SpillWriter 内部有 writeBufferSize 的写缓冲,每次 appendToPartition → write() 将 PrestoPage 数据追加到缓冲;缓冲满时才真正调用文件系统 write()(刷盘)。小批次使每次 write() 调用的数据大小贴近 writeBufferSize,I/O 效率高。

③ “至少 1 行”的保证防止死循环

1
2
3
4
if (bytes > maxBytes) {
++numRows; // 即使超了也把这一行纳入
break;
}

若某行本身 > 256 KB(如包含超大字符串),不加这一行就会无限循环(numRows=0 → written 不推进)。++numRows 确保每次循环至少推进一行。

④ 64 行是何依据?

64 行是经验值,使得:

  • 一个 RowVector 的列式存储(FlatVector<int64> = 64×8=512B)可以完全装入 L1 cache
  • extractColumn 的内层循环(每列 64 次写入)具有良好的向量化潜力
  • 与 SpillMergeStream 归并读取的 batch 大小相配(读时也是小批量迭代,减少归并树的重平衡次数)

3.9 runSpill — 并发写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void SpillerBase::runSpill(bool lastRun) {
spillStats_->spillRuns.fetch_add(1);

std::vector<std::shared_ptr<AsyncSource<SpillStatus>>> writes;
for (const auto& [id, spillRun] : spillRuns_) {
if (spillRun.rows.empty()) continue;
writes.push_back(
memory::createAsyncMemoryReclaimTask<SpillStatus>(
[partitionId = id, this]() { return writeSpill(partitionId); }));
// 第 2 个及之后的任务提交到 executor(后台线程),第 1 个在当前线程执行
if ((writes.size() > 1) && executor_ != nullptr) {
executor_->add([source = writes.back()]() { source->prepare(); });
}
}

// 折叠守卫:无论成功失败,确保所有后台任务都被 move()(drain)
auto sync = folly::makeGuard([&]() {
for (auto& write : writes) { try { write->move(); } catch (...) {} }
});

// 主线程阻塞等待所有任务(含第 1 个本线程任务)
for (auto& write : writes) {
results.push_back(write->move());
}

// 检查错误 + 清空 SpillRun(归还指针数组内存)
for (auto& result : results) {
if (result->error) std::rethrow_exception(result->error);
spillRuns_.at(result->partitionId).clear();
if (needSort()) {
state_.finishFile(result->partitionId); // sorted spill:每 run 一个文件
}
}
}

并发策略细节

  • 第 1 个分区的写任务在调用线程上通过 write->move() 触发(AsyncSource::prepare() 从未被调用,move() 内联执行)
  • 第 2+ 个分区提交到 executor_(后台 CPU 线程池)并发执行
  • createAsyncMemoryReclaimTask 确保任务在内存仲裁器视角下是”可取消的回收任务”,若系统 OOM 时仲裁器能感知到这些任务正在运行

state_.finishFile(partitionId) 调用 SpillWriter::finishFile(),flush 当前文件并记录 SpillFileInfo,下次 appendToPartition 会创建新文件。这使每个 sorted run 独立成文,后续归并时文件间已有序。


3.10 finishSpill — 收尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void SpillerBase::finishSpill(SpillPartitionSet& partitionSet) {
finalizeSpill(); // finalized_ = true,之后不再允许写入

for (const auto& partitionId : state_.spilledPartitionIdSet()) {
// 递归 spill 时,要将本层 partition ID 包装进父 ID 形成完整路径
auto wholeId = parentId_.has_value()
? SpillPartitionId(parentId_.value(), partitionId.partitionNumber())
: partitionId;

// state_.finish(partitionId) 调用 SpillWriter::finish(),
// 关闭所有文件,返回 SpillFiles(文件路径、大小、排序键等元数据)
if (partitionSet.count(wholeId) == 0) {
partitionSet.emplace(wholeId,
std::make_unique<SpillPartition>(wholeId, state_.finish(partitionId)));
} else {
// 多个 Spiller(多线程 HashBuild)的文件合并到同一 partition
partitionSet[wholeId]->addFiles(state_.finish(partitionId));
}
}
}

partitionSetstd::map<SpillPartitionId, ...>,按 SpillPartitionId 有序排列(DFS 前序),确保递归 spill 场景下父 partition 先于子 partition 被处理。


3.11 各子类 Spiller 详解

HashBuildSpiller

1
2
3
4
5
needSort      : false(hash join 恢复时重建 hash 表,顺序无关)
HashBitRange : [startPartitionBit, startPartitionBit + numPartitionBits)(分区存储)
targetFileSize: spillConfig->maxFileSize
maxSpillRunRows: spillConfig->maxSpillRunRows
parentId_ : restoringPartitionId_(递归时指定父 partition)

差异点——重写 extractSpill 附加 probed flag 列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void HashBuildSpiller::extractSpill(folly::Range<char**> rows, RowVectorPtr& resultPtr) {
// 先提取普通列(key + dependent)
for (auto i = 0; i < container_->columnTypes().size(); ++i) {
container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
}
// 若是 right/full join,额外提取每行的 probed 标志位
if (spillProbeFlag_) {
auto* flagVector = result->childAt(types.size())->asFlatVector<bool>();
for (auto i = 0; i < rows.size(); ++i) {
flagVector->set(i, container_->hasProbedFlag(rows[i]));
// hasProbedFlag 直接读行内 probedFlagOffset_ 处的 bit
}
}
}

spill() 重载1(spillTriggered_=true + 全量 spill):

1
2
3
4
void HashBuildSpiller::spill() {
spillTriggered_ = true;
SpillerBase::spill(nullptr); // fillSpillRuns(nullptr) → 从头扫全部行
}

spill(partitionId, spillVector) 重载2(直通路径,已 triggered 后):

1
2
3
4
5
void HashBuildSpiller::spill(const SpillPartitionId& id, const RowVectorPtr& vec) {
// 无需经过 RowContainer,直接 appendToPartition
if (!state_.isPartitionSpilled(id)) state_.setPartitionSpilled(id);
state_.appendToPartition(id, vec);
}

AggregationInputSpiller

1
2
3
4
5
needSort      : true(按所有 group key 排序,支持流式归并聚合)
HashBitRange : [startPartitionBit, startPartitionBit + numPartitionBits)
targetFileSize: uint64_t::max(文件大小不限,由 maxFileSize 配置控制)
maxSpillRunRows: spillConfig->maxSpillRunRows
sortingKeys : 所有 key 列,升序,nulls last

继承默认 extractSpill:提取 key 列 + 各 accumulator 的 extractAccumulators 中间状态。

每次 runSpill 后(needSort=true),调用 state_.finishFile(id) 关闭当前文件——每个 run 的数据写到独立的 sorted 文件。后续 createOrderedReader 用败者树对所有文件做 N 路归并,还原全局有序流。

关键约束:rows->stringAllocator().freezeAndExecute([&]() { inputSpiller_->spill(); }) 在 spill 期间冻结 HashStringAllocator,防止 accumulator 在 spill 时并发分配/释放变长内存。


AggregationOutputSpiller

1
2
3
4
needSort      : false(hash 表 scan 顺序即为输出顺序,不需要重排)
HashBitRange : {}(空,单 partition 0)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows

spill(startRowIter)rowIterator 指定的偏移开始扫描(支持从已输出的位置继续):

1
2
3
4
void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) {
state_.setPartitionSpilled(SpillPartitionId(0)); // 单分区
SpillerBase::spill(&startRowIter); // 从 startRowIter 开始迭代
}

重写 runSpill:在最后一次 run 后强制 finishFile

1
2
3
4
5
6
7
8
void AggregationOutputSpiller::runSpill(bool lastRun) {
SpillerBase::runSpill(lastRun);
if (lastRun) {
for (const auto& [id, _] : spillRuns_) {
state_.finishFile(id); // 确保最后的数据落盘
}
}
}

SortInputSpiller

1
2
3
4
5
needSort      : true(按 sort key 排序)
HashBitRange : {}(空,单 partition 0,OrderBy 无 hash 分区)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows
sortingKeys : 所有 sort 列,对应 CompareFlags(方向 + nulls 位置)

spillerStoreType_ 把 sort 列排在最前面,这样归并时只需比较前 N 列(减少 cache miss)。

spill() 直接调用 SpillerBase::spill(nullptr),无参数——从 RowContainer 头部开始扫全部行。


SortOutputSpiller

1
2
3
4
needSort      : false(调用方已传入排好序的 SpillRows)
HashBitRange : {}(单 partition 0)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows

接口与其他 spiller 不同——接受外部已排序的行指针数组:

1
2
3
4
5
6
7
8
void SortOutputSpiller::spill(SpillRows& rows) {
auto& run = createOrGetSpillRun(SpillPartitionId(0));
// 直接接管外部已排序的行指针(sortedRows_ 的剩余部分)
run.rows = SpillRows(rows.begin(), rows.end(), run.rows.get_allocator());
for (const auto* row : rows) run.numBytes += container_->rowSize(row);
markSeenPartitionsSpilled();
runSpill(/*lastRun=*/true); // 一次性完成,不分多 run
}

重写 runSpill:执行完后立即 finishFile,因为这是唯一一次写入。


NoRowContainerSpiller / MergeSpiller

1
2
3
needSort      : false
container_ : nullptr(无 RowContainer)
maxSpillRunRows: 0(不使用)

无 RowContainer,通过 spill(partitionId, spillVector) 直接将外部 RowVector 写入磁盘,跳过整个 fillSpillRuns → runSpill → extractSpill 流程:

1
2
3
4
5
void NoRowContainerSpiller::spill(
const SpillPartitionId& id, const RowVectorPtr& spillVector) {
if (!state_.isPartitionSpilled(id)) state_.setPartitionSpilled(id);
state_.appendToPartition(id, spillVector); // 直接写磁盘
}

MergeSpillerNoRowContainerSpiller 的子类,额外携带 sortingKeys(用于 hash join probe 侧的归并 spill,需要保留排序信息以便后续 probe 侧归并读取)。


各子类对比

子类 needSort HashBitRange container_ 特殊行为
HashBuildSpiller false 多分区 重写 extractSpill:附加 probed flag 列;双重 spill() 入口
AggregationInputSpiller true 多分区 每 run 对应独立 sorted 文件;freeze allocator
AggregationOutputSpiller false 单分区(空) 从迭代器偏移开始;重写 runSpill 收尾
SortInputSpiller true 单分区(空) sort 列前置;每 run 独立 sorted 文件
SortOutputSpiller false 单分区(空) 接受外部已排序 SpillRows;重写 runSpill
NoRowContainerSpiller false 多分区 nullptr 直接 appendToPartition,无 fill/extract 流程
MergeSpiller false 多分区 nullptr 同上,额外保留 sortingKeys

4. HashJoin Spill 实现

4.1 整体流程

HashJoin spill 只发生在 Build 侧(HashBuild),主要用于处理 build 表过大的情况。Probe 侧配合 build 侧,读取对应的 spill 分区数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
┌─────────────────────────────────────────────────────────────┐
│ Build 阶段 │
│ addInput → ensureInputFits → [内存不足] → reclaim │
│ ↓ spill triggered 后 │
│ addInput → spillInput → 直接 spill 到对应 partition │
│ ↓ noMoreInput │
│ finishHashBuild → finishSpill → joinBridge.setHashTable │
│ ↓ │
│ postHashBuildProcess → waitForProbe / setupSpillInput │
├─────────────────────────────────────────────────────────────┤
│ Probe 阶段 │
│ 探测内存中的 hash 表 + 同步 spill 对应分区的 probe 侧数据 │
│ joinBridge.setSpillPartitions → build 侧 setupSpillInput │
├─────────────────────────────────────────────────────────────┤
│ Restore 阶段(递归,每个 spill partition 一轮) │
│ setupSpillInput → processSpillInput → addInput(重建表) │
│ → finishHashBuild → 新一轮 probe / 再次 spill(递归) │
└─────────────────────────────────────────────────────────────┘

4.2 HashBuildSpiller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class HashBuildSpiller : public SpillerBase {
const bool spillProbeFlag_; // 是否需要 spill probed 标记(right/full join)
bool spillTriggered_{false}; // 一旦触发 spill,后续所有输入直接 spill

// spill() 重载1:spill RowContainer 中的所有行(内存仲裁触发)
void spill() {
spillTriggered_ = true;
SpillerBase::spill(nullptr); // → fillSpillRuns → runSpill → writeSpill
}

// spill() 重载2:spill 一个 partition 的输入向量(spillTriggered 后的直通路径)
void spill(const SpillPartitionId& partitionId, const RowVectorPtr& spillVector) {
VELOX_CHECK(spillTriggered_);
if (!state_.isPartitionSpilled(partitionId)) {
state_.setPartitionSpilled(partitionId);
}
state_.appendToPartition(partitionId, spillVector);
}
};

**重写 extractSpill**:对于 right/full outer join,需要在 spill 数据中附加一列 bool probed_flag,记录该行是否已被 probe 侧命中:

1
2
3
4
5
6
7
8
9
10
11
12
13
void HashBuildSpiller::extractSpill(folly::Range<char**> rows, RowVectorPtr& resultPtr) {
// 提取普通列
for (auto i = 0; i < types.size(); ++i) {
container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
}
// 若需要 probed flag,从 RowContainer 的标记位提取
if (spillProbeFlag_) {
auto* flagVector = result->childAt(types.size())->asFlatVector<bool>();
for (auto i = 0; i < rows.size(); ++i) {
flagVector->set(i, container_->hasProbedFlag(rows[i]));
}
}
}

4.3 setupSpiller

HashBuild 初始化时和每次 restore 时调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void HashBuild::setupSpiller(SpillPartition* spillPartition) {
if (!canSpill()) return;

// 首次调用:构造 spillType_(在普通列后可能追加 bool 列用于 probed flag)
if (spillType_ == nullptr) {
spillType_ = hashJoinTableSpillType(tableType_, joinType_);
if (needProbedFlagSpill_) {
spillProbedFlagChannel_ = spillType_->size() - 1;
// 初始化 false 常量向量(build 侧 spill 时所有行未被 probe)
spillProbedFlagVector_ = std::make_shared<ConstantVector<bool>>(
pool(), 0, /*isNull=*/false, BOOLEAN(), false);
}
}

uint8_t startPartitionBit = config->startPartitionBit;

if (spillPartition != nullptr) {
// Restore 场景:从 spill 文件中读取数据重建 hash 表
spillInputReader_ = spillPartition->createUnorderedReader(
config->readBufferSize, pool(), spillStats_.get());
restoringPartitionId_ = spillPartition->id();

// 递归 spill:bit offset 右移一层
startPartitionBit = partitionBitOffset(
spillPartition->id(), startPartitionBit, numPartitionBits) + numPartitionBits;

// 若超过最大 spill 层数,禁用本轮 spill(只能尽力处理)
if (config->exceedSpillLevelLimit(startPartitionBit)) {
exceededMaxSpillLevelLimit_ = true;
return;
}
}

spiller_ = std::make_unique<HashBuildSpiller>(
joinType_, restoringPartitionId_,
table_->rows(), spillType_,
HashBitRange(startPartitionBit, startPartitionBit + config->numPartitionBits),
config, spillStats_.get());
}

4.4 ensureInputFits — 内存压力检测

每次 addInput 前调用,是 proactive spill 的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void HashBuild::ensureInputFits(RowVectorPtr& input) {
if (!canSpill() || spiller_ == nullptr || spiller_->spillTriggered()) return;

// 估算本批 input 引起的内存增量
const auto tableIncrementBytes = table_->hashTableSizeIncrease(input->size());
const auto rowContainerIncrementBytes = rows->sizeIncrement(input->size(), ...);
const auto incrementBytes = rowContainerIncrementBytes + tableIncrementBytes;

// 若当前预留充足,直接返回
if (availableReservationBytes >= minReservationBytes) {
if (freeRows > input->size() && outOfLineFreeBytes >= flatBytes) return;
if (pool()->availableReservation() > 2 * incrementBytes) return;
}

// 尝试扩容 reservation(扩容本身可能触发内存仲裁,进而触发 spill)
const auto targetIncrementBytes = std::max<int64_t>(
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);

{
Operator::ReclaimableSectionGuard guard(this); // 标记为可被仲裁
if (pool()->maybeReserve(targetIncrementBytes)) {
if (spiller_->spillTriggered()) {
// 扩容过程中仲裁器触发了 spill,不需要这份预留了
pool()->release();
}
return;
}
}
// maybeReserve 失败(OOM),后续仲裁器会调用 reclaim()
}

4.5 spillInput — 正在 spill 时的直通路径

一旦 spiller_->spillTriggered() 为 true,所有新输入不再进入 hash 表,而是直接按 partition 分发到 spill 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void HashBuild::spillInput(const RowVectorPtr& input) {
if (!canSpill() || spiller_ == nullptr || !spiller_->spillTriggered()
|| !activeRows_.hasSelections()) return;

// 1. 重新计算所有行的 hash(与 addInput 中的计算可能重复,代价可接受)
computeSpillPartitions(input); // 结果存入 spillPartitions_[row]

// 2. 按 partition 分组,并从 activeRows_ 中移除(防止重复插入 hash 表)
for (auto row = 0; row < numInput; ++row) {
if (!activeRows_.isValid(row)) continue;
activeRows_.setValid(row, false); // 从 active 中移除
rawSpillInputIndicesBuffers_[spillPartitions_[row]][numSpillInputs_[partition]++] = row;
}

// 3. 为非 spill 原始输入构建 spillChildVectors_(key + dependent + probed_flag)
maybeSetupSpillChildVectors(input);

// 4. 按 partition 逐批 spill
for (uint32_t partition = 0; partition < numSpillInputs_.size(); ++partition) {
spillPartition(partition, numSpillInputs_[partition],
spillInputIndicesBuffers_[partition], input);
}
}

4.6 finishHashBuild — 收尾与传递

所有 build 线程完成后,最后一个线程执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool HashBuild::finishHashBuild() {
// ...(等待所有 peer 线程完成)

SpillPartitionSet spillPartitions;

// 收集所有 peer 的 spill 文件(多线程 build 的各自 spill 合并)
for (auto* build : otherBuilds) {
if (build->spiller_ != nullptr) {
build->spiller_->finishSpill(spillPartitions);
}
}
if (spiller_ != nullptr) {
spiller_->finishSpill(spillPartitions);
removeEmptyPartitions(spillPartitions);
}

// 构建 hash 表(只包含未 spill 的行)
table_->prepareJoinTable(std::move(otherTables), ...);

// 注册 tableSpillFunc:允许 join bridge 在 probe 后 spill hash 表
HashJoinTableSpillFunc tableSpillFunc;
if (canReclaim()) {
tableSpillFunc = [hashBitRange = spiller_->hashBits(), ...](auto table) {
return spillHashJoinTable(table, ...);
};
}

// 将 hash 表 + spill partitions 一起传给 join bridge
joinBridge_->setHashTable(table, std::move(spillPartitions),
joinHasNullKeys_, std::move(tableSpillFunc));
}

4.7 reclaim — 内存仲裁触发路径

内存仲裁器调用 reclaim() 强制 spill:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void HashBuild::reclaim(uint64_t, memory::MemoryReclaimer::Stats& stats) {
// 状态检查:只有 kRunning/kWaitForBuild/kYield 状态且不在非可回收区才能 spill
if (nonReclaimableState()) {
++stats.numNonReclaimableAttempts;
return;
}

// 暂停所有 peer driver(task->pauseRequested() 保证)
const std::vector<Operator*> operators =
task->findPeerOperators(pipelineId, this);

// 检查所有 peer 都可被回收
for (auto* op : operators) {
if (static_cast<HashBuild*>(op)->nonReclaimableState()) {
++stats.numNonReclaimableAttempts;
return;
}
}

// 收集所有 peer 的 spiller,统一 spill 整个 hash 表
std::vector<HashBuildSpiller*> spillers;
for (auto* op : operators) {
spillers.push_back(static_cast<HashBuild*>(op)->spiller_.get());
}
spillHashJoinTable(spillers, config); // 核心:spill 所有 peer 的 hash 表

// 清空 hash 表(已 spill,不再需要)+ 释放 memory pool reservation
for (auto* op : operators) {
static_cast<HashBuild*>(op)->table_->clear(true);
static_cast<HashBuild*>(op)->pool()->release();
}
}

spillHashJoinTable 内部对每个 spiller 调用 spiller->spill()(即 HashBuildSpiller::spill()),将所有行按 hash 分区写到磁盘。

4.8 postHashBuildProcess / setupSpillInput — 递归恢复

Build 完成后,询问 join bridge 是否有 spill 数据需要 restore:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void HashBuild::postHashBuildProcess() {
if (!canSpill()) {
setState(State::kFinish);
return;
}

// 向 join bridge 请求下一个要恢复的 spill partition
auto spillInput = joinBridge_->spillInputOrFuture(&future_);
if (!spillInput.has_value()) {
setState(State::kWaitForProbe); // 等待 probe 侧完成并传递 spill partition
return;
}
setupSpillInput(std::move(spillInput.value()));
}

void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
if (spillInput.spillPartition == nullptr) {
setState(State::kFinish); // 没有更多 spill partition,结束
return;
}

// 重置所有状态,为本次 restore 重新初始化
table_.reset(); spiller_.reset(); spillInputReader_.reset();
restoringPartitionId_.reset();

// 列顺序重置(spill 文件中列已按 key + dependent 排好)
std::iota(keyChannels_.begin(), keyChannels_.end(), 0);
std::iota(dependentChannels_.begin(), dependentChannels_.end(), keyChannels_.size());

setupTable();
setupSpiller(spillInput.spillPartition.get()); // 建立新的 spiller(可能是递归层)
processSpillInput(); // 开始读取并重建
}

4.9 processSpillInput — 从 spill 文件重建

1
2
3
4
5
6
7
8
9
10
11
12
13
void HashBuild::processSpillInput() {
while (spillInputReader_->nextBatch(spillInput_)) {
// 复用 addInput 路径:重新 hash + 插入 hash 表(或再次 spill 到下一层)
addInput(std::move(spillInput_));
if (!isRunning()) return;
if (shouldYield()) {
state_ = State::kYield;
future_ = ContinueFuture{folly::Unit{}};
return;
}
}
noMoreInputInternal(); // → finishHashBuild → postHashBuildProcess(循环)
}

这形成了递归循环:processSpillInput → addInput → ensureInputFits → [spill] → noMoreInput → finishHashBuild → postHashBuildProcess → setupSpillInput → processSpillInput

4.10 Probe 侧 Spill 详解

Probe 侧(HashProbe)的 spill 涉及三条路径,通过 HashJoinBridge 与 build 侧协调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────────────────┐
│ HashJoinBridge — Build/Probe 协调中枢 │
│ │
│ setHashTable(table, spillPartitionSet, ...) │
│ ↓ probe 侧 asyncWaitForHashTable 取到 HashBuildResult │
│ ↓ HashBuildResult.spillPartitionIds 非空 → 触发 input spill │
│ │
│ probeFinished() │
│ ↓ last prober 调用,通知 bridge 准备下一个要恢复的 partition │
│ ↓ bridge 从 spillPartitionSets_ 取下一个 partition │
│ ↓ split 为 N shard → 分发给各 build 线程 │
│ │
│ appendSpilledHashTablePartitions(spillPartitionSet) │
│ ↓ probe 侧 reclaim 时,将被 spill 的 hash table 分区交还 bridge│
└─────────────────────────────────────────────────────────────────┘

HashBuildResult — Build/Probe 信息传递载体

1
2
3
4
5
6
struct HashBuildResult {
std::shared_ptr<BaseHashTable> table; // 刚建好的 hash 表
std::optional<SpillPartitionId> restoredPartitionId; // 非 null 表示本次是 restore 轮
SpillPartitionIdSet spillPartitionIds; // 本轮 build 中被 spill 的 partition ID
bool hasNullKeys;
};

spillPartitionIds 非空 ⟺ build 侧触发了 spill,probe 侧必须同步将对应行 spill 到磁盘。
restoredPartitionId 非 null ⟺ 本次 hash 表是从 spill 文件 restore 得来的,probe 侧需要从对应的 spill 文件中读取之前被 spill 的 probe 行。


路径 1:input spill(探测侧主路径)

触发时机asyncWaitForHashTable 拿到 HashBuildResult 后,发现 spillPartitionIds 非空。

1
2
3
4
5
6
7
8
9
10
11
12
void HashProbe::asyncWaitForHashTable() {
auto hashBuildResult = joinBridge_->tableOrFuture(&future_);
table_ = hashBuildResult->table;
initializeResultIter();

// 1. 若本次是 restore 轮,打开对应 partition 的 probe spill 文件
maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId);

// 2. 若 build 侧有 spill partition,初始化 inputSpiller_
maybeSetupInputSpiller(hashBuildResult->spillPartitionIds);
checkMaxSpillLevel(hashBuildResult->restoredPartitionId);
}

**maybeSetupInputSpiller**:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void HashProbe::maybeSetupInputSpiller(
const SpillPartitionIdSet& spillPartitionIds) {

spillInputPartitionIds_ = spillPartitionIds;
if (spillInputPartitionIds_.empty()) return;

// 计算 bit offset(与 build 侧一致,保证 hash 分区对齐)
const auto bitOffset = partitionBitOffset(
*spillInputPartitionIds_.begin(),
spillConfig()->startPartitionBit,
spillConfig()->numPartitionBits);

// NoRowContainerSpiller:无 RowContainer,直接将外部 RowVector 写到磁盘
inputSpiller_ = std::make_unique<NoRowContainerSpiller>(
probeType_,
restoringPartitionId_, // 递归时携带父 partition ID
HashBitRange(bitOffset, bitOffset + spillConfig()->numPartitionBits),
spillConfig(),
spillStats_.get());

// 只 spill build 侧已 spill 的那些 partition(其余 partition 可以正常 probe)
inputSpiller_->setPartitionsSpilled(spillInputPartitionIds_);

// SpillPartitionFunction:计算每行属于哪个 partition(与 build 侧使用相同的 hash bit 范围)
spillPartitionFunction_ = std::make_unique<SpillPartitionFunction>(
SpillPartitionIdLookup(spillInputPartitionIds_, ...),
probeType_, keyChannels_);
}

**spillInput**(在 addInput 中调用):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void HashProbe::spillInput(RowVectorPtr& input) {
const auto numInputRows = input->size();
// 预分配各 partition 的 indices buffer(复用避免重复 malloc)
prepareInputIndicesBuffers(numInputRows,
inputSpiller_->state().spilledPartitionIdSet());

// 为每行计算所属 partition
spillPartitionFunction_->partition(*input, spillPartitions_);

vector_size_t numNonSpillingInput = 0;
for (auto row = 0; row < numInputRows; ++row) {
const auto& partitionId = spillPartitions_[row];
if (!inputSpiller_->state().isPartitionSpilled(partitionId)) {
// 该 partition 未被 build 侧 spill,行留在内存继续 probe
rawNonSpillInputIndicesBuffer_[numNonSpillingInput++] = row;
} else {
// 该 partition 已被 build 侧 spill,行需要 spill 到磁盘
rawSpillInputIndicesBuffers_.at(partitionId)
[numSpillInputs_.at(partitionId)++] = row;
}
}

if (numNonSpillingInput == numInputRows) return; // 无行需要 spill

// 强制加载所有 lazy 列(spill 序列化需要实际数据)
for (int32_t i = 0; i < input->childrenSize(); ++i) {
input->childAt(i)->loadedVector();
}

// 按 partition 分批 spill(wrap 创建 DictionaryVector,不拷贝数据,只共享 indices)
for (const auto& [partitionId, numRows] : numSpillInputs_) {
if (numRows == 0) continue;
inputSpiller_->spill(
partitionId,
wrap(numRows, spillInputIndicesBuffers_.at(partitionId), input));
}

if (numNonSpillingInput == 0) {
input = nullptr; // 所有行都 spill 了,无需继续 probe
} else {
input = wrap(numNonSpillingInput, nonSpillInputIndicesBuffer_, input);
// 只保留未 spill 的行继续 probe
}
}

关键设计wrap 使用 DictionaryVector,不拷贝行数据,只是 indices 引用,spill 写入时才通过 loadedVector() 强制物化。这样同一个 input batch 里属于不同 partition 的行可以零拷贝地分发到各自的 spill 文件。

**noMoreInputInternal**(探测完成,收尾 input spill):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void HashProbe::noMoreInputInternal() {
noMoreSpillInput_ = true;
if (!spillInputPartitionIds_.empty()) {
// 关闭 inputSpiller_,将所有分区的 SpillFiles 收集到 inputSpillPartitionSet_
inputSpiller_->finishSpill(inputSpillPartitionSet_);
// 注:NoRowContainerSpiller 不排序,spillSortTimeNanos 永远为 0
}

// 等待所有 peer probe 线程完成(防止一个线程先收尾而另一个还在写同一 partition)
if (!operatorCtx_->task()->allPeersFinished(...)) {
setState(ProbeOperatorState::kWaitForPeers);
return;
}

lastProber_ = true; // 最后一个完成的 probe 线程负责通知 build 侧
}

路径 2:restore 轮的 probe(从 spill 文件读取 probe 行)

当 build 侧从某个 spill partition 重建 hash 表后,probe 侧必须把之前 spill 到磁盘的对应 probe 行读回重新 probe。

**maybeSetupSpillInputReader**:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void HashProbe::maybeSetupSpillInputReader(
const std::optional<SpillPartitionId>& restoredPartitionId) {
if (!restoredPartitionId.has_value()) return;

// 从 inputSpillPartitionSet_ 中取出对应 partition 的 SpillPartition
auto iter = inputSpillPartitionSet_.find(restoredPartitionId.value());
auto partition = std::move(iter->second);
restoringPartitionId_ = restoredPartitionId;

// 创建无序读取器(probe 行之间无顺序要求,顺序读即可)
spillInputReader_ = partition->createUnorderedReader(
spillConfig_->readBufferSize, pool(), spillStats_.get());
inputSpillPartitionSet_.erase(iter);
}

**addSpillInput**(在 isBlockedkRunning 状态时轮询调用):

1
2
3
4
5
6
7
8
9
10
11
12
void HashProbe::addSpillInput() {
if (input_ != nullptr || noMoreSpillInput_) return;

if (!spillInputReader_->nextBatch(input_)) {
// spill 文件读完,走正常的 noMoreInput 流程
noMoreInputInternal();
return;
}

// 复用 addInput 路径(包含 spillInput 检查,支持递归 spill)
addInput(std::move(input_));
}

路径 3:output spill(内存仲裁触发 reclaim)

触发条件:probe 正在处理某批 input 时,输出过大导致内存仲裁器调用 reclaim()

**reclaim**:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void HashProbe::reclaim(uint64_t, memory::MemoryReclaimer::Stats& stats) {
if (exceededMaxSpillLevelLimit_ || nonReclaimableState()) {
++stats.numNonReclaimableAttempts; return;
}

const auto probeOps = findPeerOperators();
bool hasMoreProbeInput = false;
for (auto* probeOp : probeOps) {
if (probeOp->nonReclaimableState()) { ++stats.numNonReclaimableAttempts; return; }
hasMoreProbeInput |= !probeOp->noMoreSpillInput_;
}

// 步骤 1:并发将所有 peer 的 pending input 产出的 output 先 spill 到磁盘
spillOutput(probeOps);

// 步骤 2:若还有 probe input 未处理,spill hash 表(释放 build 侧内存)
SpillPartitionSet spillPartitionSet;
if (hasMoreProbeInput) {
spillPartitionSet = spillHashJoinTable(
table_, restoringPartitionId_, tableSpillHashBits_,
joinNode_, spillConfig(), spillStats_.get());
}

// 步骤 3:通知各 probe 算子设置 inputSpiller_(后续输入按新 partitions spill)
const auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);
for (auto* probeOp : probeOps) {
probeOp->clearBuffers();
if (!spillPartitionSet.empty()) {
probeOp->maybeSetupInputSpiller(spillPartitionIdSet);
}
probeOp->pool()->release();
}

// 步骤 4:清空 hash 表 + 将 spill partitions 注册到 join bridge(供 build 侧后续恢复)
table_->clear(true);
if (!spillPartitionIdSet.empty()) {
joinBridge_->appendSpilledHashTablePartitions(std::move(spillPartitionSet));
}
}

spillOutput(单个算子)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void HashProbe::spillOutput() {
if (input_ == nullptr && !needLastProbe()) return;

// 用 NoRowContainerSpiller 将 pending input 产生的所有 output 写到磁盘(单一 partition 0)
auto outputSpiller = std::make_unique<NoRowContainerSpiller>(
outputType_, std::nullopt, HashBitRange{}, spillConfig(), spillStats_.get());
outputSpiller->setPartitionsSpilled({SpillPartitionId(0)});

// 调用 getOutputInternal(toSpillOutput=true):产出 output 但直接 spill 而非返回
for (;;) {
auto output = getOutputInternal(/*toSpillOutput=*/true);
if (output != nullptr) {
for (int32_t i = 0; i < output->childrenSize(); ++i) {
output->childAt(i)->loadedVector(); // 强制物化 lazy 列
}
outputSpiller->spill(SpillPartitionId(0), output);
continue;
}
if (input_ == nullptr) break; // 当前 input 批次已处理完
// right semi join 特殊:input_ 不为 null 但 output 为 null 属于正常状态
break;
}

// 收尾:将 spill 文件信息存入 spillOutputPartitionSet_(供后续读回)
outputSpiller->finishSpill(spillOutputPartitionSet_);
removeEmptyPartitions(spillOutputPartitionSet_);
}

读回阶段:下次 getOutput 调用时,maybeReadSpillOutput() 将磁盘数据读回:

1
2
3
4
5
6
7
8
9
10
11
12
bool HashProbe::maybeReadSpillOutput() {
if (spillOutputReader_ == nullptr) {
maybeSetupSpillOutputReader(); // 首次:打开 UnorderedStreamReader
}
if (spillOutputReader_ == nullptr) return false;

if (!spillOutputReader_->nextBatch(output_)) {
spillOutputReader_.reset(); // 读完,重置
return false;
}
return true; // 将读回的 output 直接返回给调用方
}

output spill 特殊注意getOutputInternal(toSpillOutput=true) 被调用时,若发现 input_ 为 null(即没有 pending input),会提前返回 null——这保证了 reclaim 路径不会误进入”probe 完成逻辑”(如 prepareForSpillRestore),避免状态混乱。


prepareForSpillRestore — 多轮恢复的关键

当一轮 probe(含当前 hash 表和对应 spill 文件的 probe 行)全部完成后,最后一个 probe 线程(lastProber_)负责启动下一轮恢复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void HashProbe::prepareForSpillRestore() {
// 重置本轮相关状态
noMoreSpillInput_ = false;
if (lastProber_) {
table_->clear(true); // 清空当前 hash 表,释放内存
}
table_.reset();
inputSpiller_.reset();
spillInputReader_.reset();
restoringPartitionId_.reset();
spillInputPartitionIds_.clear();
spillOutputReader_.reset();

if (!lastProber_) return;

// 通知 join bridge:probe 侧已完成,请准备下一个 spill partition
joinBridge_->probeFinished();
// bridge 内部:从 spillPartitionSets_ 取下一个 partition,均分为 N shard,
// 放入 restoringSpillShards_,build 侧 spillInputOrFuture() 返回各自的 shard

// 唤醒等待的 peer probe 线程(之前在 kWaitForPeers 状态阻塞)
wakeupPeerOperators();
lastProber_ = false;
}

Probe 侧 Spill 全生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
asyncWaitForHashTable()

├─ spillPartitionIds 非空 → maybeSetupInputSpiller()
│ 创建 NoRowContainerSpiller,标记需要 spill 的 partition 集合

├─ restoredPartitionId 非 null → maybeSetupSpillInputReader()
│ 打开 UnorderedStreamReader,顺序读取之前 spill 的 probe 行

addInput(input)

└─ needToSpillInput()? → spillInput(input)
按 hash 分 partition:
spilled partition → DictionaryVector wrap → inputSpiller_.spill()
non-spilled partition → 继续正常 probe(无拷贝)

getOutput() → ensureOutputFits()

├─ maybeReadSpillOutput():读取上次 spillOutput 写到磁盘的 output batch

└─ 内存不足 → reclaim():
1. spillOutput(probeOps) 并发 spill 各 probe 的 pending output
2. spillHashJoinTable(table) 将 hash 表 spill 到磁盘
3. maybeSetupInputSpiller(ids) 各 probe 设置新 inputSpiller_
4. appendSpilledHashTablePartitions() → 通知 bridge

isBlocked(kRunning) + spillInputReader_ 非 null → addSpillInput()
│ 逐 batch 从 spill 文件读取 probe 行 → addInput() → spillInput() → probe

noMoreInputInternal()

├─ inputSpiller_.finishSpill(inputSpillPartitionSet_)
└─ allPeersFinished() → lastProber_ = true


hasMoreSpillData()? → prepareForSpillRestore()
│ joinBridge_->probeFinished()
│ wakeupPeerOperators()

asyncWaitForHashTable() ← 循环(等待 build 侧重建 hash 表)

└─ 所有 spill partition 处理完 → setState(kFinish)

5. HashAggregation Spill 实现

5.1 两种 Spiller

HashAggregation 的 spill 逻辑集中在 GroupingSet 中,区分两个阶段:

1
2
3
4
5
6
7
8
9
10
11
inputSpiller_  (AggregationInputSpiller):
- 在 input 处理阶段触发(hash 表太大)
- needSort = true:按 group key 排序
- 分区:按 hash(group key)
- 目的:spill 后 hash 表清空,腾出内存继续处理输入

outputSpiller_ (AggregationOutputSpiller):
- 在 output 处理阶段触发(输出时内存不足)
- needSort = false:已是输出顺序(hash 表 scan 顺序),不需要重排
- 无分区(HashBitRange 为空)
- 目的:将剩余输出行先 spill 到磁盘,之后边读边输出

两者互斥:同一个 GroupingSet 要么用 inputSpiller_,要么用 outputSpiller_,不会同时存在。

5.2 GroupingSet::spill() — input spill

HashAggregation::reclaim()ensureInputFits 调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void GroupingSet::spill() {
if (table_ == nullptr || table_->numDistinct() == 0) return;

auto* rows = table_->rows();
VELOX_CHECK_NULL(outputSpiller_);

// 首次 spill:初始化 inputSpiller_
if (inputSpiller_ == nullptr) {
// sortingKeys:按所有 group key 列排序(升序,nulls 在末尾)
const auto sortingKeys = SpillState::makeSortingKeys(
std::vector<CompareFlags>(rows->keyTypes().size()));
inputSpiller_ = std::make_unique<AggregationInputSpiller>(
rows,
makeSpillType(), // key 列 + accumulator 列的类型
HashBitRange(startPartitionBit, startPartitionBit + numPartitionBits),
sortingKeys,
spillConfig_,
spillStats_);
}

// 冻结 HashStringAllocator:spill 可能多线程执行,防止 accumulator 并发分配
rows->stringAllocator().freezeAndExecute([&]() {
inputSpiller_->spill();
});

// distinct 聚合:记录本次 spill 生成的文件数(用于后续归并去重)
if (isDistinct() && numDistinctSpillFilesPerPartition_.empty()) {
// 记录每个 partition 的文件数,文件 ID < 该数的是 distinct 文件
for (int partition = 0; partition < maxPartitions; ++partition) {
numDistinctSpillFilesPerPartition_[partition] =
inputSpiller_->state().numFinishedFiles(SpillPartitionId(partition));
}
}

// 清空 hash 表(所有行已 spill 到磁盘)
table_->clear(/*freeTable=*/true);
}

makeSpillType() 生成 spill 类型:

1
2
3
4
5
6
7
8
9
10
RowTypePtr GroupingSet::makeSpillType() const {
std::vector<TypePtr> types;
// Group key 列
for (auto& hasher : hashers_) types.push_back(hasher->type());
// Accumulator 的中间状态类型(不是最终结果类型)
for (auto& accumulator : accumulators(false)) {
types.push_back(accumulator.spillType());
}
return ROW(names, types);
}

5.3 GroupingSet::spill(rowIterator) — output spill

HashAggregation::reclaim() 在 output 阶段调用:

1
2
3
4
5
6
7
8
9
10
11
12
void GroupingSet::spill(const RowContainerIterator& rowIterator) {
VELOX_CHECK(!hasSpilled()); // 确保没有 inputSpiller_

auto* rows = table_->rows();
outputSpiller_ = std::make_unique<AggregationOutputSpiller>(
rows, makeSpillType(), spillConfig_, spillStats_);

rows->stringAllocator().freezeAndExecute([&]() {
outputSpiller_->spill(rowIterator); // 从 rowIterator 指定的位置开始 spill
});
table_->clear(/*freeTable=*/true);
}

AggregationOutputSpiller::spill(rowIterator) 内部:

1
2
3
4
5
void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) {
// 只标记单一分区(partition 0),无 hash 分区
state_.setPartitionSpilled(SpillPartitionId(0));
SpillerBase::spill(&startRowIter); // 从 rowIterator 位置开始迭代
}

AggregationOutputSpiller::runSpill 重写:每次 run 后立即 finishFile(保证每个 run 独立一个文件,便于后续归并)。

5.4 reclaim 触发路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void HashAggregation::reclaim(uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
// 1. 先尝试 compact(清理 accumulator 的字符串存储),代价低
const auto compactedBytes = groupingSet_->compact();
if (compactedBytes >= targetBytes) return;

if (!canSpill()) { /* ... */ return; }

if (groupingSet_->hasSpilled()) {
if (isOutputProcessing) {
// 已在输出阶段且已 spill:无法再次 spill,报告无法回收
return;
}
}

if (isOutputProcessing) {
// 在输出阶段:spill 从 resultIterator_ 位置起的剩余行
groupingSet_->spill(resultIterator_);
} else {
// 在输入阶段:spill 所有行(hash 表全量 dump)
groupingSet_->spill();
}
}

5.5 getOutputWithSpill — 归并输出

groupingSet_->hasSpilled() 为 true 时,getOutput 路径切换为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool GroupingSet::getOutputWithSpill(int32_t maxOutputRows,
int32_t maxOutputBytes,
const RowVectorPtr& result) {
if (outputSpillPartition_ == -1) {
// 首次进入:初始化归并结构
mergeRows_ = std::make_unique<RowContainer>(keyTypes, ...); // 临时 RowContainer
initializeAggregates(*mergeRows_, false);

// 将所有 spill 文件信息收集到 spillPartitionSet_
inputSpiller_->finishSpill(spillPartitionSet_);
// 或:outputSpiller_->finishSpill(spillPartitionSet_);

removeEmptyPartitions(spillPartitionSet_);
prepareNextSpillPartitionOutput(); // 打开第一个 partition 的 TreeOfLosers
}

// 逐 partition 归并输出
return mergeNext(maxOutputRows, maxOutputBytes, result);
}

bool GroupingSet::prepareNextSpillPartitionOutput() {
merge_ = nullptr;
if (spillPartitionSet_.empty()) return false;

auto it = spillPartitionSet_.begin();
outputSpillPartition_ = it->first.partitionNumber();
// createOrderedReader:构建 N 路败者树(含文件数超限时的预归并)
merge_ = it->second->createOrderedReader(*spillConfig_, pool_, spillStats_);
spillPartitionSet_.erase(it);
return true;
}

5.6 mergeNextWithAggregates — 有序归并聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bool GroupingSet::mergeNextWithAggregates(
int32_t maxOutputRows, int32_t maxOutputBytes, const RowVectorPtr& result) {

bool nextKeyIsEqual{false};
for (;;) {
// 从败者树取下一行(带 equal 标志:与上一行 key 是否相同)
auto [stream, equal] = merge_->nextWithEquals();

if (stream == nullptr) {
// 当前 partition 耗尽:输出已积累的行
extractSpillResult(result);
if (result->size() > 0) return true;
// 换下一个 partition
if (!prepareNextSpillPartitionOutput()) return false;
continue;
}

if (!nextKeyIsEqual) {
// 新的 group key:在 mergeRows_ 中创建新行,初始化 accumulators
mergeState_ = mergeRows_->newRow();
initializeRow(*stream, mergeState_); // 复制 key 列 + 初始化 accumulator
}
// 用当前行更新 accumulator(partial merge)
updateRow(*stream, mergeState_);
nextKeyIsEqual = equal;
stream->pop();

// 达到行数/字节数上限:输出
if (!nextKeyIsEqual && (mergeRows_->numRows() >= maxOutputRows
|| mergeRowBytes() >= maxOutputBytes)) {
extractSpillResult(result);
return true;
}
}
}

关键merge_->nextWithEquals() 利用已排序的特性,保证相同 key 的行连续出现,从而可以在流式归并的同时完成聚合(无需将整个 group 缓存在内存中)。

5.7 mergeNextWithoutAggregates — distinct 归并

对于 SELECT DISTINCT 场景,spill 文件分为两类:

  • distinct 文件(文件 ID < numDistinctSpillFilesPerPartition_):已输出过的 distinct 行
  • input 文件(文件 ID >= 阈值):新输入的行

归并时:若某个相同 key 的 run 中包含来自 distinct 文件的行,则跳过该 run(因为已经输出过);否则输出该 run 中的一行。


6. OrderBy Spill 实现

6.1 两种 Spiller

OrderBy 的 spill 通过 SortBuffer 实现,同样区分两个阶段:

1
2
3
4
5
6
7
8
9
10
inputSpiller_  (SortInputSpiller):
- input 阶段:RowContainer 行数过多时触发
- needSort = true:按 sort key 排序后写入
- 无 hash 分区(HashBitRange 为空)
- 写入一个 partition(partition 0)

outputSpiller_ (SortOutputSpiller):
- output 阶段:已完成内排序,输出时内存不足
- needSort = false:外部传入已排序的 SpillRows
- 接收 sortedRows_ 中未输出部分,直接写入

6.2 SortBuffer::spillInput

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void SortBuffer::spillInput() {
if (inputSpiller_ == nullptr) {
// sortingKeys:将所有 sort column + 其余列按 sort 顺序排列
const auto sortingKeys = SpillState::makeSortingKeys(sortCompareFlags_);
inputSpiller_ = std::make_unique<SortInputSpiller>(
data_.get(), // RowContainer
spillerStoreType_, // sort 列在前,其余列在后的类型
sortingKeys,
spillConfig_,
spillStats_);
}
inputSpiller_->spill(); // fillSpillRuns → sort → writeSpill → 写磁盘
data_->clear(); // 清空 RowContainer,腾出内存
}

spillerStoreType_ 的列顺序重排(构造时完成):

1
2
3
4
5
6
7
8
9
10
11
12
// sort 列排在最前面(归并时只需比较前几列)
for (auto i = 0; i < numSortKeys; ++i) {
sorted_types.push_back(input->childAt(sortColumnIndices[i])->type());
sorted_names.push_back(input->nameOf(sortColumnIndices[i]));
}
// 其余列追加在后面
for (auto i = 0; i < input->size(); ++i) {
if (!isSortKey(i)) {
sorted_types.push_back(input->childAt(i)->type());
sorted_names.push_back(input->nameOf(i));
}
}

这样 spill 文件中每行的前 N 列就是排序键,归并比较时只需读取前 N 列,节省反序列化开销。

6.3 SortBuffer::spillOutput

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void SortBuffer::spillOutput() {
if (hasSpilled()) return; // 已经 spill 过
if (numOutputRows_ == sortedRows_.size()) return; // 全部输出完了

outputSpiller_ = std::make_unique<SortOutputSpiller>(
data_.get(), spillerStoreType_, spillConfig_, spillStats_);

// 将 sortedRows_ 中未输出的部分([numOutputRows_, end))直接传给 outputSpiller_
auto spillRows = SpillerBase::SpillRows(
sortedRows_.begin() + numOutputRows_, sortedRows_.end(),
*memory::spillMemoryPool());
outputSpiller_->spill(spillRows);

data_->clear();
sortedRows_.clear(); sortedRows_.shrink_to_fit();
finishSpill(); // output spill 只触发一次,立即收尾
}

SortOutputSpiller::spill(SpillRows) 内部:

1
2
3
4
5
6
7
8
9
void SortOutputSpiller::spill(SpillRows& rows) {
auto& spillRun = createOrGetSpillRun(SpillPartitionId(0));
spillRun.rows = SpillRows(rows.begin(), rows.end(), ...);
// 计算 numBytes
for (const auto* row : rows) spillRun.numBytes += container_->rowSize(row);
markSeenPartitionsSpilled();

runSpill(true); // rows 已按 sort key 有序,跳过排序直接写入
}

SortOutputSpiller::runSpill 重写:在父类 runSpill 后立即调用 finishFile(保证文件封尾)。

6.4 SortBuffer::finishSpill

1
2
3
4
5
6
7
8
9
10
void SortBuffer::finishSpill() {
// inputSpiller_ 和 outputSpiller_ 互斥,只有一个会被初始化
if (inputSpiller_ != nullptr) {
inputSpiller_->finishSpill(spillPartitionSet_);
} else {
outputSpiller_->finishSpill(spillPartitionSet_);
}
// OrderBy 无 hash 分区,只有一个 partition
VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
}

6.5 SortBuffer::getOutputWithSpill

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void SortBuffer::prepareOutputWithSpill() {
if (spillMerger_ != nullptr) return; // 已初始化

VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
// 创建 TreeOfLosers:归并所有 spill 文件(可能包含内存中剩余行)
spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader(
*spillConfig_, pool(), spillStats_);
spillPartitionSet_.clear();
}

void SortBuffer::getOutputWithSpill() {
int32_t outputRow = 0, outputSize = 0;
while (outputRow + outputSize < output_->size()) {
SpillMergeStream* stream = spillMerger_->next();
VELOX_CHECK_NOT_NULL(stream);

spillSources_[outputSize] = &stream->current();
spillSourceRows_[outputSize] = stream->currentIndex(&isEndOfBatch);
++outputSize;

if (isEndOfBatch) {
// batch 到边界时先复制出来,再 pop 触发加载下一 batch
gatherCopy(output_.get(), outputRow, outputSize,
spillSources_, spillSourceRows_, columnMap_);
outputRow += outputSize; outputSize = 0;
}
stream->pop();
}
// 处理最后一批
if (outputSize != 0) {
gatherCopy(output_.get(), outputRow, outputSize, ...);
}
numOutputRows_ += output_->size();
}

gatherCopy 使用 columnMap_ 将 spill 文件中的列顺序(sort 列在前)映射回原始输出列顺序。


7. 关键设计决策总结

7.1 有序 vs. 无序 Spill

算子 Spill 类型 原因
HashJoin Build 无序 恢复时重建 hash 表,顺序无关
HashAggregation Input 有序(按 group key) 支持流式归并聚合,避免全量恢复
HashAggregation Output 无序(hash 表 scan 顺序) 已是输出顺序,不需要重排
OrderBy Input 有序(按 sort key) 支持多路归并产生有序输出
OrderBy Output 有序(已排序传入) 同上

7.2 两阶段 Spill(Input/Output Spiller 分离)

Aggregation 和 OrderBy 都支持两阶段 spill:

  • Input 阶段:按 hash 分区 spill,允许多次触发;之后 hash 表清空继续处理输入
  • Output 阶段:无分区,只触发一次;将剩余行 spill 后用归并输出

两阶段互斥,通过 inputSpiller_/outputSpiller_ 非空判断。

7.3 内存预留机制(proactive spill)

1
2
3
4
5
触发链:ensureInputFits → maybeReserve → 触发内存仲裁 → reclaim → spill

关键参数:
minSpillableReservationPct 确保始终有足够"可 spill"预留空间
spillableReservationGrowthPct 每次扩容幅度,平衡内存利用率与 spill 频率

设计意图:通过 proactive reservation,在真正 OOM 之前触发 spill,避免在不可中断的关键路径上被动 OOM。

7.4 HashBuild 多线程 Spill 协调

1
2
3
4
5
6
7
8
所有 peer HashBuild 线程共享同一个 joinBridge
仲裁器触发 reclaim 时:
1. 暂停所有 peer driver(task->pause())
2. 检查所有 peer 均可回收
3. 统一 spill 所有 peer 的 hash 表
4. 清空所有 peer 的表 + 释放预留

这确保所有线程的 hash 表碎片被一次性清理,而不是只清理一个线程的。

7.5 递归 Spill(HashJoin 专属)

1
2
3
4
5
6
7
8
初次 spill:bit [32:29] 用于分区 → 8 个 spill partitions
第一次递归:bit [28:26] 用于分区 → 8 个子 partitions
最多 maxSpillLevel 层(默认 3 层)

终止条件:
1. 达到 maxSpillLevel
2. 某 partition 能完全装入内存(不再 spill)
3. partition 数据为空

7.6 probed flag Spill(Right/Full Outer Join)

对于 right join / full outer join,hash 表中每行有一个 probed 标记位,记录该行是否已与 probe 行匹配。Spill 时需要将此标记一同写入磁盘(作为额外的 bool 列),恢复时读回并设置回 RowContainer 的对应行。


8. 递归 Spill 流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
HashBuild 初始化


addInput(循环)


ensureInputFits

内存不足?
┌─YES──────────────────────────────────────────────────────┐
│ reclaim() / maybeReserve 触发仲裁 │
│ spillHashJoinTable(spillers) │
│ ├─ spiller->spill() 对每个 partition │
│ │ fillSpillRuns → sort(no) → writeSpill → 磁盘 │
│ └─ table_->clear() │
│ spillTriggered_ = true │
└──────────────────────────────────────────────────────────┘

spillTriggered?
├─YES: spillInput(input) → 直接按 partition 写到 spill 文件
└─NO : insertIntoHashTable(input)


noMoreInput → finishHashBuild


所有 peer 完成 → 合并 spill files → prepareJoinTable(只含未 spill 行)


joinBridge_.setHashTable(table, spillPartitions)


Probe 侧开始 probe + 同步 spill 对应的 probe 行


postHashBuildProcess

有 spill partition?
├─NO : setState(kFinish) ──────────────────── END
└─YES: setupSpillInput(nextSpillPartition)


processSpillInput


addInput(从 spill 文件读取,复用 addInput 路径)

内存不足? ──YES──► 同上(递归 spill,bit offset +numPartitionBits)


finishHashBuild → joinBridge(递归)

└──────────► 下一个 spill partition(循环直至所有 partition 处理完)

9. C++ 编码品味:值得学习的细节

这一章从代码本身出发,梳理 Velox spill 系统在 C++ 工程实践上值得学习的具体技法。每一条都有对应的代码位置。


9.1 SCOPE_EXIT:让清理逻辑紧贴触发逻辑

1
2
3
4
5
6
7
8
// HashBuild::finishHashBuild
SCOPE_EXIT {
// 无论函数从哪个 return 路径退出,都必须唤醒等待的 peer
peers.clear();
for (auto& promise : promises) {
promise.setValue();
}
};
1
2
3
4
// SortBuffer::getOutput
SCOPE_EXIT {
pool_->release(); // 每次 getOutput 返回后都归还未用预留
};

SCOPE_EXIT(底层是 folly::ScopeGuard)比 try-catch 清晰得多:清理代码与触发它的上下文紧邻,阅读者一眼就能看到”这个函数结束时会做什么”,而不需要去找 finally 块或析构函数。

对比写法

1
2
3
4
5
6
7
8
9
10
11
12
// ❌ 反例:try-catch 把清理和逻辑分离
try {
doWork();
pool_->release(); // 容易漏掉 early return 路径
} catch (...) {
pool_->release();
throw;
}

// ✅ SCOPE_EXIT:写一次,所有路径都覆盖
SCOPE_EXIT { pool_->release(); };
doWork();

9.2 folly::makeGuard:异步任务的”排水阀”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// SpillerBase::runSpill
auto sync = folly::makeGuard([&]() {
for (auto& write : writes) {
try {
write->move(); // 强制消费所有 pending 任务
} catch (const std::exception&) {
// 清理路径不能 throw
}
}
});

// 主线程消费结果(可能 throw)
for (auto& write : writes) {
results.push_back(write->move()); // 若某个任务失败,这里 throw
}
// 若 results 循环 throw,guard 析构时仍会 drain 剩余任务

这里的 makeGuard 扮演”排水阀”角色:无论主逻辑成功还是抛异常,都确保所有后台异步任务被 move()(消费)掉,避免后台任务持有的资源(文件句柄、内存)泄漏。

关键细节:guard 内部的 catch (const std::exception&) 是有意的——清理路径本身不能 throw,第一个错误已经在 results 循环里被捕获了。


9.3 跨线程异常传递:exception_ptr 模式

多线程 spill 时,后台线程的异常无法直接传播到主线程。Velox 使用 std::exception_ptr 将异常”装箱”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 后台线程(writeSpill)
try {
// ... 做实际工作
return std::make_unique<SpillStatus>(id, written, nullptr); // 成功
} catch (const std::exception&) {
// 捕获任何异常,装箱为 exception_ptr
return std::make_unique<SpillStatus>(id, 0, std::current_exception());
}

// 主线程(runSpill)
for (auto& result : results) {
if (result->error != nullptr) {
std::rethrow_exception(result->error); // 在主线程重新抛出
}
// ... 处理正常结果
}

结果对象 SpillStatus 的定义简洁体现了这个模式:

1
2
3
4
5
struct SpillStatus {
SpillPartitionId partitionId;
uint32_t rowsWritten;
std::exception_ptr error; // nullptr 表示成功,否则携带异常
};

这使得多线程错误处理与单线程代码风格一致——主线程的调用者只需检查返回值或 catch 异常,不需要感知线程。


9.4 “1 + N” 并发策略:第一个任务不切线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (const auto& [id, spillRun] : spillRuns_) {
writes.push_back(
memory::createAsyncMemoryReclaimTask<SpillStatus>(
[partitionId = id, this]() { return writeSpill(partitionId); }));

// 只有第 2 个及之后的任务才提交到后台 executor
if ((writes.size() > 1) && executor_ != nullptr) {
executor_->add([source = writes.back()]() { source->prepare(); });
}
}

// 主线程顺序 move(),第一个任务在这里内联执行
for (auto& write : writes) {
results.push_back(write->move()); // 若未 prepare(),move() 内联执行 lambda
}

设计精妙处AsyncSource::move() 的语义是”如果还没执行就内联执行,已经执行就等待结果”。第一个任务从未被 prepare(),所以在 move() 调用时内联执行于主线程。

好处:

  • 若只有一个 partition(常见情况),完全不产生线程切换开销
  • 主线程始终有工作做,不会空等
  • N 个后台线程 + 主线程并发,吞吐量最大化

HashProbe::spillOutput 对多个 probe 算子也用了完全一样的模式。


9.5 CHECK vs DCHECK:按热度分配断言代价

1
2
3
4
5
6
7
// 生产代码的不变式用 VELOX_CHECK(release 也执行)
VELOX_CHECK(!finalized_); // finishSpill 之后不能再写
VELOX_CHECK_EQ(numWritten, run.rows.size()); // 写出的行数必须对得上

// 热路径上的显然正确断言用 VELOX_DCHECK(只在 debug 执行)
VELOX_DCHECK_GE(partitionNum, 0); // hash 分区号必然 >= 0(inner loop)
VELOX_DCHECK(isPartitionSpilled(id)); // appendToPartition 前必须已设置

原则:

  • VELOX_CHECK_* 用于外部可观察的不变式,违反时意味着调用方 bug 或状态异常,即使在 release 也要发现
  • VELOX_DCHECK_* 用于内部实现的显然正确断言,理论上不可能失败,加它只是为了 debug 期间辅助排查

fillSpillRuns 的内层循环(每行都执行一次)里用 VELOX_DCHECK 而非 VELOX_CHECK,避免 release 构建中每行多一次分支,对亿级行场景有实际性能影响。


9.6 FOLLY_LIKELY / FOLLY_UNLIKELY:把预期写进代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// createOrGetSpillRun:绝大多数情况 spillRun 已存在
if (FOLLY_UNLIKELY(!spillRuns_.contains(id))) {
spillRuns_.emplace(id, SpillRun(*memory::spillMemoryPool()));
}

// getOutputWithSpill:几乎每行都需要复制,到达 batch 边界是例外
if (FOLLY_UNLIKELY(isEndOfBatch)) {
gatherCopy(...);
}

// 最后一批输出大多非空
if (FOLLY_LIKELY(outputSize != 0)) {
gatherCopy(...);
}

FOLLY_LIKELY/UNLIKELY 不仅是编译器分支预测提示,更是可执行的注释——它明确告诉读者”作者预期这条路径是 hot/cold 的”。这比写 // rarely happens 注释更可靠,因为注释会过期,而这行代码会随着逻辑一起维护。


9.7 TestValue::adjust:无侵入式测试注入

1
2
3
4
5
6
7
8
// SpillerBase 构造函数
TestValue::adjust("facebook::velox::exec::SpillerBase", this);

// HashBuild::addInput 入口
TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this);

// HashBuild::reclaim 入口
TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

TestValue::adjust 在生产代码中是空操作(编译器会完全优化掉)。在测试中,可以向特定注入点注册回调,在精确时机修改状态或注入错误,无需任何条件编译。

配套的 TestScopedSpillInjection 更进一步:

1
2
3
4
5
6
7
8
// 测试代码:
TestScopedSpillInjection injection(20, ".*", 10);
// poolName 匹配 ".*" 时,testingTriggerSpill() 有 20% 概率返回 true,最多触发 10 次

// 生产代码:
if (testingTriggerSpill(pool_->name())) {
spill(); // 测试强制触发 spill
}

这使得 spill 的边界条件测试(”在恰好第 3 次 addInput 时触发 spill”)变得非常精确,而生产代码无任何额外开销。


9.8 NanosecondTimer:RAII 计时的最小实现

1
2
3
4
5
6
7
uint64_t execTimeNs{0};
{
NanosecondTimer timer(&execTimeNs); // 构造时记录起始时间
// ... 被测代码 ...
} // 析构时写入 execTimeNs

spillStats_->spillFillTimeNanos.fetch_add(execTimeNs, std::memory_order_relaxed);

几个值得注意的细节:

① 先积累,再 fetch_add:不在块内直接 fetch_add,而是先存到局部变量,块结束后一次 atomic 操作。减少 atomic 操作次数,也使代码结构更清晰。

**② memory_order_relaxed**:spill 统计不需要与其他内存操作同步,用 relaxed 避免不必要的内存屏障。

③ 一个特殊处理

1
2
// NOTE: Always set a non-zero sort time to avoid flakiness in tests which check sort time.
updateSpillSortTime(std::max<uint64_t>(1, sortTimeNs));

即使排序耗时 0 纳秒(几乎不可能但理论上存在),也写入 1,防止测试误判”排序未发生”。这是一个小而精的防御性设计。


9.9 SpillPartitionId:把所有接口设施配齐

SpillPartitionId 是一个需要在多种容器中使用的值类型,代码为它配齐了所有标准设施:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 1. 比较运算符:支持 std::map(有序容器)
bool operator<(const SpillPartitionId& other) const;
bool operator>(const SpillPartitionId& other) const {
return other < *this; // 复用 < 实现 >,不重复逻辑
}
bool operator==(const SpillPartitionId& other) const = default; // C++20 default

// 2. std::hash 特化:支持 folly::F14FastMap/Set
namespace std {
template <>
struct hash<SpillPartitionId> {
uint32_t operator()(const SpillPartitionId& id) const {
return std::hash<uint32_t>()(id.encodedId()); // 直接哈希底层整数,O(1)
}
};
}

// 3. fmt::formatter 特化:支持 fmt::format("{}", id)
template <>
struct fmt::formatter<SpillPartitionId> : formatter<std::string> {
auto format(SpillPartitionId s, format_context& ctx) const {
return formatter<std::string>::format(s.toString(), ctx);
}
};

// 4. operator<< :支持 LOG(INFO) << id
inline std::ostream& operator<<(std::ostream& os, SpillPartitionId id) {
return os << id.toString();
}

operator> 通过调用 operator< 实现而非独立写逻辑,是很好的习惯——减少了两个实现可能不一致的风险。


9.10 withWLock / withRLock:锁的最小化持有

1
2
3
4
5
6
7
8
9
10
// SpillState::appendToPartition
partitionWriters_.withWLock([&](auto& lockedWriters) {
// 只在创建 writer 时持有写锁
if (!lockedWriters.contains(id)) {
lockedWriters.emplace(id, std::make_unique<SpillWriter>(...));
}
}); // 锁在此释放

// 实际写入在锁外进行(不同 partition 的写入完全并发)
return partitionWriter(id)->write(rows, ...);
1
2
3
4
5
6
// testingSpilledFilePaths(只读,用 rlock)
partitionWriters_.withRLock([&](const auto& partitionWriters) {
for (const auto& [id, writer] : partitionWriters) {
// ... 读取
}
});

folly::SynchronizedwithWLock/withRLock lambda 接口使锁的范围在代码结构上可见——lambda 的括号就是临界区的边界,不需要手动 lock_guard

关键设计:写锁只保护”创建 writer”(初始化一次),不保护”写数据”。这是因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突,无需锁保护。最快的锁是不持有锁


9.11 SpillRun::sorted:防止双重排序的状态位

1
2
3
4
5
6
7
8
9
10
11
12
13
struct SpillRun {
SpillRows rows;
uint64_t numBytes{0};
bool sorted{false}; // 是否已排序
};

void SpillerBase::ensureSorted(SpillRun& run) {
if (run.sorted || !needSort()) { // 已排序则跳过
return;
}
// ... 执行排序
run.sorted = true; // 标记,防止重复排序
}

这个 sorted 标志看起来很简单,但它解决了一个微妙的 bug 风险:如果 ensureSorted 被意外调用两次(代码重构时很容易发生),不加保护会导致正确排好序的数据被 re-sort 破坏(稳定排序会保持顺序,但非稳定排序可能打乱)。一个布尔标志,一个 guard,防御性地堵死了这条 bug 路径。


9.12 pool_->release():主动归还,而非等待回收

1
2
3
4
5
6
7
8
9
10
11
12
13
// SortBuffer::noMoreInput()
// Releases the unused memory reservation after processing input.
pool_->release();

// SortBuffer::getOutput()
SCOPE_EXIT {
pool_->release();
};

// HashBuild::ensureTableFits() 中扩容后
if (spiller_->spillTriggered()) {
pool()->release(); // spill 已触发,扩容的预留用不上了,主动归还
}

pool_->release() 的语义是”我承诺不再需要当前预留的超额内存”。Velox 在每个”阶段完成”的节点系统性地调用它,而不是等待 GC 或析构。

这是 push 模型而非 pull 模型:算子主动告诉内存池”我这块不要了”,内存池立即可以分配给其他需要者。在多算子并发的大查询中,这种主动性使内存回转速度快得多。


9.13 succinctBytes():有信息量的日志

1
2
3
4
5
6
7
LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << pool_->name()
<< ", root pool: " << pool_->root()->name()
<< ", used: " << succinctBytes(pool_->usedBytes())
<< ", reservation: " << succinctBytes(pool_->reservedBytes())
<< ", root pool reservation: "
<< succinctBytes(pool_->root()->reservedBytes());

几个细节值得学习:

**① succinctBytes()**:自动选择合适单位(B/KB/MB/GB),比裸输出字节数可读性高 10 倍。

② 固定的信息层次pool 名 → root pool 名 → 已用 → 已预留 → root 已预留。这个模板在整个 spill 系统中完全一致,运维人员看一眼就知道去哪里找信息,不需要猜测每条日志的格式。

③ WARNING 而非 ERROR:预留失败不是错误,只是触发 spill 的信号。级别选择准确。


9.14 testing* 前缀:测试可观测性的统一约定

1
2
3
4
5
6
// SpillState::testingSpilledFilePaths()
// SpillState::testingSpilledFileIds()
// SpillState::testingNonEmptySpilledPartitionIdSet()
// HashBuild::testingExceededMaxSpillLevelLimit()
// HashProbe::testingHasInputSpiller()
// HashProbe::testingExceededMaxSpillLevelLimit()

所有为测试暴露的内部状态方法,一律以 testing 前缀命名。这个约定的价值:

  • 接口清晰度:任何 testing* 方法,调用者立刻知道”这不是业务接口”
  • grep 友好grep 'testing[A-Z]' 精确列出所有测试专用接口,代码审查时可以专门审视
  • 永不 friend:Velox 明确禁止 friend 声明(CLAUDE.md 有规定),测试访问内部状态只能通过这类 accessor,倒逼设计者只暴露真正需要的状态,而非用 friend 开后门

9.15 constexpr 局部常量:让魔数有名字

1
2
3
4
5
6
7
8
9
10
void SpillerBase::fillSpillRuns(...) {
constexpr int32_t kHashBatchSize = 4096; // 每次从 RowContainer 取多少行
// ...
}

std::unique_ptr<SpillerBase::SpillStatus> SpillerBase::writeSpill(...) {
constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K
constexpr int32_t kTargetBatchRows = 64;
// ...
}

这些常量定义在函数内部而非文件级别,有两个好处:

① 最小作用域kHashBatchSize 只在 fillSpillRuns 里有意义,定义在全局或类级别会误导读者以为它有更广泛的语义。

② 紧贴使用处:常量定义和使用之间距离最短,读者不需要跳转文件来理解这个数字的含义。

1 << 18262144 更清楚地表达了”这是 2 的幂次”的意图,同时命名为 kTargetBatchBytes 说明了它是目标(上限)而非精确值。


编码品味小结

技法 核心价值 代表位置
SCOPE_EXIT 清理与触发紧邻,所有 return 路径都覆盖 SortBuffer::getOutput, HashBuild::finishHashBuild
folly::makeGuard 排水阀 异步任务无论成败都被消费,防止资源泄漏 SpillerBase::runSpill
exception_ptr 跨线程传递 后台线程异常装箱,主线程重抛,调用方无感知 SpillerBase::writeSpill / runSpill
“1 + N” 并发 首任务内联,后续并发,单 partition 时零开销 SpillerBase::runSpill
DCHECK 分热路径 release 不付 inner loop 断言代价 fillSpillRuns inner loop
FOLLY_LIKELY/UNLIKELY 可执行的”热度注释”,不会过期 createOrGetSpillRun, getOutputWithSpill
TestValue::adjust 无侵入注入,生产零开销 所有关键函数入口
NanosecondTimer RAII 单行计时,早 return 安全 fillSpillRuns, ensureSorted
配齐值类型接口 <, >, ==, hash, formatter, << 一起提供 SpillPartitionId
withWLock 最小临界区 锁只保护初始化,写操作在锁外 SpillState::appendToPartition
sorted 防御位 防双重排序,用状态堵死隐患 SpillRun / ensureSorted
pool_->release() 主动归还 push 模型快速回转,而非等待析构 noMoreInput, getOutput
succinctBytes() 统一日志格式 固定信息层次,可读,可预期 所有 LOG(WARNING)
testing* 前缀约定 测试接口一眼可辨,禁用 friend 倒逼最小暴露 所有 testing* 方法
constexpr 局部常量 最小作用域,紧贴使用处,有名字 kHashBatchSize, kTargetBatchRows

10. 架构设计解读:优雅背后的工程思考

Velox spill 系统的代码量约 12,000 行,但它的设计并不复杂——真正值得品味的,是每一处决策背后的约束权衡系统性思考。这一章从”为什么要这样设计”出发,梳理其核心设计哲学。


10.1 最根本的约束:OOM 不能是答案

数据库引擎处理大数据时,内存不可能无限大。任何算子都需要回答这个问题:当数据超过可用内存时,怎么办?

最简单的答案是抛出 OOM 异常。但这会让用户的查询白白失败,代价极高。Velox 的选择是:任何时候都不能因为数据量大而失败,必须能完成查询(在 spill 开启的情况下)。

这一承诺对系统设计造成了深远影响:

  • 不能假设数据能放进内存——每一个数据结构都要有”放不下时的出路”
  • 不能只支持一次 spill——必须支持递归 spill(spill 后恢复,恢复时再次 OOM,再次 spill)
  • 不能只在”好时机”才 spill——必须能在任意时刻被打断(内存仲裁 reclaim

这三点决定了架构的基本形态。


10.2 分层:每层只管自己的事

整个 spill 系统被分成 5 层,每层接口极简:

1
2
3
4
5
算子层     → 决定「何时」和「触发哪种」spill
Spiller层 → 决定「如何」把 RowContainer 内容写到磁盘(行→列,分区,排序)
SpillState → 决定「写到哪个文件」
SpillFile → 决定「如何」序列化/反序列化(PrestoPage 格式)
SpillConfig→ 决定「用什么参数」

这种分层带来的核心好处不是”解耦”这么虚的东西,而是每一层可以独立演进

  • 要换序列化格式?只改 SpillFile,上面所有层不感知
  • 要支持新算子 spill?只实现新的 Spiller 子类,底层文件 I/O 不变
  • 要调整内存触发策略?只改算子层的 ensureInputFits,文件写入逻辑不变

这在一个长期演进的系统里价值巨大。HashJoin、HashAggregation、OrderBy 三类算子的 spill 逻辑差异极大,但它们复用了同一套文件 I/O 和统计框架,维护成本大幅降低。


10.3 分区:把”全量恢复”变成”分批恢复”

Spill 最天真的设计是:内存不够时,把所有数据写到一个文件;恢复时,把整个文件读回来。

问题在于:如果数据总量是内存的 10 倍,一次性读回仍然 OOM。

Velox 的解法是按 hash 分区

1
2
3
4
5
数据 → hash(key) → partition 0, 1, 2, ..., 7(8 个分区)

恢复时:
一次只加载 partition 0 → 处理完 → 加载 partition 1 → ...
每次内存中只有 1/8 的数据

这个设计的精妙之处在于:分区和处理逻辑是解耦的。HashBuild 恢复 partition 0 时,它完全复用了正常的 addInputinsertHashTable 路径,没有任何特殊处理。”从 spill 文件恢复” 和 “从上游算子输入” 在代码路径上几乎完全一样。

SpillPartitionId 的 bit 编码设计支持这种分批恢复天然地扩展到多层递归:

1
2
3
Level 0:bit[31:29] = 0, 分区号存在 bit[2:0]  → 8 个 partition
Level 1:bit[31:29] = 1, 分区号存在 bit[5:3] → 每个 Level 0 partition 又有 8 个子 partition
Level 2:...

当 Level 0 的某个 partition 恢复时仍然 OOM,直接用下一层 bit 范围再 spill。恢复路径完全复用,递归深度理论上无限(受 maxSpillLevel 限制)。


10.4 有序与无序的分叉:根据”读回时的需求”决定

Spill 时要不要排序,完全由读回时的需求决定,而不是写入时的方便:

算子 读回时的需求 要不要排序
HashJoin Build 重建 hash 表(插入顺序无关) 不排序(节省 CPU)
HashAggregation 流式归并聚合(需要相同 key 相邻) 排序(按 group key)
OrderBy 多路归并产生有序输出(需要各 run 内有序) 排序(按 sort key)

这个设计的价值在于:对于不需要排序的场景,完全不付排序的代价。HashJoin 是最常见的 spill 场景,它不排序节省了大量 CPU 时间。

而对于需要排序的场景(Aggregation、OrderBy),排序在 spill 时一次性完成,读回时的 N 路归并就是线性扫描,极其高效。如果不在 spill 时排序,读回时就必须全量加载再排序,峰值内存反而更高。


10.5 两阶段 Spill:尊重算子的生命周期

Aggregation 和 OrderBy 各有两个 Spiller(input/output),这不是过度设计,而是对算子生命周期的精确建模:

1
2
Input 阶段:数据持续流入 → 多次触发 spill → hash 表/排序缓冲反复清空重填
Output 阶段:数据已全部处理 → 只可能触发一次 spill → 之后只有读出

两个阶段的需求截然不同:

Input 阶段 spill(AggregationInputSpiller / SortInputSpiller)

  • 需要 hash 分区(支持分批恢复)
  • 需要排序(支持流式归并)
  • 可以多次触发(每次清空 hash 表继续处理输入)

Output 阶段 spill(AggregationOutputSpiller / SortOutputSpiller)

  • 无需分区(只触发一次,全量写出)
  • 无需排序(数据已经就绪,直接写入已有顺序)
  • 只触发一次(之后只读,无需再写)

用同一个 Spiller 处理两个阶段会导致代码复杂化,且无法针对性优化。分开设计使每个 Spiller 的逻辑极度简单。


10.6 SpillRun 切分:在多个约束之间找到平衡点

maxSpillRunRows 参数控制每次 fill-run 的行数,这个设计同时解决了四个互不相关的问题:

1
2
3
4
约束 1:指针数组内存(SpillRun.rows 每行 8 字节,1 亿行 = 800MB)
约束 2:排序辅助内存(大 run 排序 cache miss 严重)
约束 3:PrestoPage 2GB 序列化限制
约束 4:内存仲裁的观测粒度(每 run 之间才能看到内存下降)

一个参数,四个收益,这是典型的”一石多鸟”设计。

值得注意的是,这个设计并没有在代码里写四个 // TODO: 原因X,而是通过一个统一的机制自然地满足了所有约束。这种”机制而非策略”的思维是优雅设计的标志。


10.7 批量行列转换:峰值内存与 I/O 效率的双重优化

extractSpillVector 每次只处理最多 64 行或 256KB,这个”小批量”设计同样服务于两个目标:

峰值内存

1
2
3
不切分时:SpillRun 10 万行 → 物化为一个 10 万行 RowVector → 可能数 GB
切分后: 每次 64 行 RowVector → 几百 KB → prepareForReuse 复用同一块内存
峰值 = 单批大小,与 run 总大小无关

I/O 效率

1
2
3
4
SpillWriter 有 writeBufferSize 写缓冲(默认几 MB)
每次 appendToPartition 追加到缓冲,满了才刷盘
64 行 ≈ 几十 KB → 多次 append 后缓冲满 → 一次大 write()
这比每 10 万行一次大 write() 的 I/O 效率相当,但内存峰值小得多

64 这个数字也不是随意的——它使得每列 FlatVector 的数据(64 × sizeof(int64) = 512B)恰好能放入 L1 cache,extractColumn 的内层循环具有最优的 cache 利用率。


10.8 主动预留 vs. 被动仲裁:两道防线

Velox 的内存触发有两条路径,共同构成防 OOM 的双重保险:

1
2
3
4
5
6
7
8
防线 1 — 主动预留(proactive):
ensureInputFits → maybeReserve(targetIncrementBytes)
├─ 成功:继续处理(预留了足够空间)
└─ 失败:[内存仲裁在此可能已触发 reclaim]

防线 2 — 被动仲裁(reactive):
内存仲裁器观测到系统内存紧张 → 调用 reclaim()
└─ 强制 spill 当前算子

两者的分工:

  • 防线 1 发生在算子自己的处理循环中,知道”我即将需要多少内存”,可以精确预留
  • 防线 2 发生在任意时刻,无需算子配合,是最后的兜底手段

ReclaimableSectionGuard 是连接两者的桥梁——算子在调用 maybeReserve 前设置它,告诉仲裁器”现在可以来抢我的内存”。这样即使 maybeReserve 本身触发了全局内存整理,算子也能安全地被 spill。


10.9 NoRowContainerSpiller:让接口适应现实,而非让现实适应接口

HashProbe 侧 spill 时,行数据在 RowVectorPtr 里,根本不经过 RowContainer。如果强制所有 spill 都走 RowContainer → extractSpill → 磁盘 的路径,就需要先把 RowVector 插入 RowContainer 再提取出来,白白多一次转换。

NoRowContainerSpiller 的存在体现了一个原则:抽象应该覆盖真实场景,而不是把真实场景削足适履。它直接接受 RowVectorPtr,跳过整个 fill/extract 流程,只保留写文件这一核心功能。

同样的思路也体现在 SortOutputSpiller 上:sortedRows_ 已经是排好序的行指针数组,如果再经过 fillSpillRuns → 按 hash 分区 → sort 这个流程,不仅多余还会破坏已有的顺序。它重写接口,接受外部已排序的 SpillRows,直接写入。


10.10 HashJoinBridge:异步协调的最小化接口

Build 和 Probe 在不同线程运行,需要协调三件事:

  1. Build 完成 → 通知 Probe 开始
  2. Probe 完成 → 通知 Build 恢复 spill partition
  3. Build/Probe 任意一方 spill → 另一方配合

HashJoinBridge 用三个接口覆盖了全部场景:

1
2
3
setHashTable(table, spillPartitionSet) // Build → Probe:我建好了,这些 partition 被 spill 了
probeFinished() // Probe → Build:我探测完了,请处理下一个 spill partition
appendSpilledHashTablePartitions(set) // Probe reclaim → Build:我 spill 了你的 hash 表

最优雅的设计是:这三个接口覆盖了所有递归层级,无论是第 0 层还是第 3 层递归,调用的接口完全一样。递归深度的增加不需要任何新的协调原语。

对比一下如果没有这种设计,每一层递归都要单独管理 future/promise 对,代码复杂度会指数级增长。


10.11 DictionaryVector wrap:零拷贝分区

Probe 侧 spillInput 需要将一个 batch 的行按 partition 分发到多个 spill 文件。最直接的做法是按 partition 拷贝行数据,但这意味着每行数据被读一次、写一次——对于宽行(含字符串列)代价极高。

Velox 的解法是 DictionaryVector

1
2
3
4
5
6
原始 input:[row0, row1, row2, row3, row4]
partition 0 的 indices buffer:[1, 3]
partition 1 的 indices buffer:[0, 2, 4]

wrap(2, partition0_buffer, input) → DictionaryVector{indices=[1,3], base=input}
↓ 序列化时才真正读取 row1, row3 的数据

这使得”分区”这个操作的代价只是分配 indices buffer 和一次 hash 计算,不涉及任何数据拷贝。真正的 I/O 推迟到 appendToPartition → SpillWriter::write → 序列化,此时数据只被读一次、写一次。

这是”懒求值”(lazy evaluation)思想在 spill 路径中的具体应用。


10.12 小结:设计模式归纳

回顾整个 spill 系统,可以归纳出几个贯穿始终的设计模式:

① 约束驱动设计(Constraint-Driven Design)
每个设计决策都能追溯到一个具体约束:SpillRun 切分 → PrestoPage 2GB 限制;64 行批次 → L1 cache;DictionaryVector → 零拷贝。没有无目的的”为了灵活性而灵活性”。

② 机制 vs. 策略分离(Mechanism vs. Policy Separation)
SpillerBase 提供机制(fill/sort/write 的流程),各子类通过 needSort()、重写 extractSpill() 等钩子注入策略。算子层决定”何时 spill”,Spiller 层决定”如何 spill”,文件层决定”如何存储”。

③ 路径复用(Path Reuse)
HashBuild 恢复 spill partition 时走的是和正常处理输入完全相同的 addInput 路径。HashProbe restore 轮也走同一套 addInput → spillInput → probe 路径。不为”恢复”写特殊逻辑,而是让恢复成为正常路径的自然一部分。这减少了测试覆盖的难度,也降低了特殊路径引入 bug 的风险。

④ 懒求值(Lazy Evaluation)
数据尽可能晚才物化:行指针不拷贝数据(SpillRun)→ DictionaryVector 不拷贝行(probe 分区)→ lazy 列在 spill 前才 loadedVector()。每一层都只在真正需要时才付出代价。

⑤ 尊重生命周期(Lifecycle Awareness)
Input Spiller 和 Output Spiller 的分离不是技术必要性,而是对算子语义的尊重:input 阶段和 output 阶段的资源需求、触发频率、分区需求都不同,强行合并会产生大量条件分支,拆开则各自极简。


这套设计的优雅,不在于它的聪明,而在于它的诚实——每个决策都清楚地知道自己在解决什么约束,不做多余的事,也不留下欠债。