1. 1. Velox Spiller
    1. 1.1. 1. 整体架构
    2. 1.2. 2. 基础设施层详解
      1. 1.2.1. 2.1 SpillConfig — 配置层
      2. 1.2.2. 2.2 SpillStats — 统计层
      3. 1.2.3. 2.3 SpillFile — 文件 I/O 层
        1. 1.2.3.1. SpillFileInfo — 元数据
        2. 1.2.3.2. SpillWriter — 写路径
        3. 1.2.3.3. SpillReadFile — 读路径
      4. 1.2.4. 2.4 SpillPartitionId — 层级分区标识
      5. 1.2.5. 2.5 SpillState — 分区状态管理
      6. 1.2.6. 2.6 SpillPartition / SpillPartitionSet — 读回路径
      7. 1.2.7. 2.7 读流层:SpillMergeStream / BatchStream
    3. 1.3. 3. Spiller 层详解
      1. 1.3.1. 3.1 SpillerBase — 抽象基类
      2. 1.3.2. 3.2 SpillRun — 内存中的 spill 缓冲
      3. 1.3.3. 3.3 fillSpillRuns — 数据填充
      4. 1.3.4. 3.4 SpillRun 为何切分 / maxSpillRunRows 的作用
      5. 1.3.5. 3.5 行列转换:extractSpill 与 extractSpillVector
        1. 1.3.5.1. extractSpillVector — 控制每批大小
        2. 1.3.5.2. extractSpill — 行式 → 列式核心转换
      6. 1.3.6. 3.6 RowContainer 行布局与列提取原理
      7. 1.3.7. 3.7 Accumulator 中间态提取
      8. 1.3.8. 3.8 writeSpill 内的 Batch 切分(kTargetBatchRows / kTargetBatchBytes)
      9. 1.3.9. 3.9 runSpill — 并发写入
      10. 1.3.10. 3.10 finishSpill — 收尾
      11. 1.3.11. 3.11 各子类 Spiller 详解
        1. 1.3.11.1. HashBuildSpiller
        2. 1.3.11.2. AggregationInputSpiller
        3. 1.3.11.3. AggregationOutputSpiller
        4. 1.3.11.4. SortInputSpiller
        5. 1.3.11.5. SortOutputSpiller
        6. 1.3.11.6. NoRowContainerSpiller / MergeSpiller
      12. 1.3.12. 各子类对比
    4. 1.4. 4. HashJoin Spill 实现
      1. 1.4.1. 4.1 整体流程
      2. 1.4.2. 4.2 HashBuildSpiller
      3. 1.4.3. 4.3 setupSpiller
      4. 1.4.4. 4.4 ensureInputFits — 内存压力检测
      5. 1.4.5. 4.5 spillInput — 正在 spill 时的直通路径
      6. 1.4.6. 4.6 finishHashBuild — 收尾与传递
      7. 1.4.7. 4.7 reclaim — 内存仲裁触发路径
      8. 1.4.8. 4.8 postHashBuildProcess / setupSpillInput — 递归恢复
      9. 1.4.9. 4.9 processSpillInput — 从 spill 文件重建
      10. 1.4.10. 4.10 Probe 侧 Spill 详解
        1. 1.4.10.1. HashBuildResult — Build/Probe 信息传递载体
        2. 1.4.10.2. 路径 1:input spill(探测侧主路径)
        3. 1.4.10.3. 路径 2:restore 轮的 probe(从 spill 文件读取 probe 行)
        4. 1.4.10.4. 路径 3:output spill(内存仲裁触发 reclaim)
        5. 1.4.10.5. prepareForSpillRestore — 多轮恢复的关键
        6. 1.4.10.6. Probe 侧 Spill 全生命周期
    5. 1.5. 5. HashAggregation Spill 实现
      1. 1.5.1. 5.1 两种 Spiller
      2. 1.5.2. 5.2 GroupingSet::spill() — input spill
      3. 1.5.3. 5.3 GroupingSet::spill(rowIterator) — output spill
      4. 1.5.4. 5.4 reclaim 触发路径
      5. 1.5.5. 5.5 getOutputWithSpill — 归并输出
      6. 1.5.6. 5.6 mergeNextWithAggregates — 有序归并聚合
      7. 1.5.7. 5.7 mergeNextWithoutAggregates — distinct 归并
    6. 1.6. 6. OrderBy Spill 实现
      1. 1.6.1. 6.1 两种 Spiller
      2. 1.6.2. 6.2 SortBuffer::spillInput
      3. 1.6.3. 6.3 SortBuffer::spillOutput
      4. 1.6.4. 6.4 SortBuffer::finishSpill
      5. 1.6.5. 6.5 SortBuffer::getOutputWithSpill
    7. 1.7. 7. 关键设计决策总结
      1. 1.7.1. 7.1 有序 vs. 无序 Spill
      2. 1.7.2. 7.2 两阶段 Spill(Input/Output Spiller 分离)
      3. 1.7.3. 7.3 内存预留机制(proactive spill)
      4. 1.7.4. 7.4 HashBuild 多线程 Spill 协调
      5. 1.7.5. 7.5 递归 Spill(HashJoin 专属)
      6. 1.7.6. 7.6 probed flag Spill(Right/Full Outer Join)
    8. 1.8. 8. 递归 Spill 流程图
    9. 1.9. 9. C++ 编码品味:值得学习的细节
      1. 1.9.1. 9.1 SCOPE_EXIT:让清理逻辑紧贴触发逻辑
      2. 1.9.2. 9.2 folly::makeGuard:异步任务的"排水阀"
      3. 1.9.3. 9.3 跨线程异常传递:exception_ptr 模式
      4. 1.9.4. 9.4 "1 + N" 并发策略:第一个任务不切线程
      5. 1.9.5. 9.5 CHECK vs DCHECK:按热度分配断言代价
      6. 1.9.6. 9.6 FOLLY_LIKELY / FOLLY_UNLIKELY:把预期写进代码
      7. 1.9.7. 9.7 TestValue::adjust:无侵入式测试注入
      8. 1.9.8. 9.8 NanosecondTimer:RAII 计时的最小实现
      9. 1.9.9. 9.9 SpillPartitionId:把所有接口设施配齐
      10. 1.9.10. 9.10 withWLock / withRLock:锁的最小化持有
      11. 1.9.11. 9.11 SpillRun::sorted:防止双重排序的状态位
      12. 1.9.12. 9.12 pool_->release():主动归还,而非等待回收
      13. 1.9.13. 9.13 succinctBytes():有信息量的日志
      14. 1.9.14. 9.14 testing* 前缀:测试可观测性的统一约定
      15. 1.9.15. 9.15 constexpr 局部常量:让魔数有名字
      16. 1.9.16. 编码品味小结
    10. 1.10. 10. 架构设计解读:优雅背后的工程思考
      1. 1.10.1. 10.1 最根本的约束:OOM 不能是答案
      2. 1.10.2. 10.2 分层:每层只管自己的事
      3. 1.10.3. 10.3 分区:把"全量恢复"变成"分批恢复"
      4. 1.10.4. 10.4 有序与无序的分叉:根据"读回时的需求"决定
      5. 1.10.5. 10.5 两阶段 Spill:尊重算子的生命周期
      6. 1.10.6. 10.6 SpillRun 切分:在多个约束之间找到平衡点
      7. 1.10.7. 10.7 批量行列转换:峰值内存与 I/O 效率的双重优化
      8. 1.10.8. 10.8 主动预留 vs. 被动仲裁:两道防线
      9. 1.10.9. 10.9 NoRowContainerSpiller:让接口适应现实,而非让现实适应接口
      10. 1.10.10. 10.10 HashJoinBridge:异步协调的最小化接口
      11. 1.10.11. 10.11 DictionaryVector wrap:零拷贝分区
      12. 1.10.12. 10.12 小结:设计模式归纳

Velox Spiller

VELOX · QUERYEXECUTION2025-11-30

Velox Spiller

2025-11-30#Velox #QueryExecution

1. 整体架构

Velox 的 spill 系统是一个分层架构,每层职责明确:

算子层 HashBuild / GroupingSet(HashAggregation) / SortBuffer触发:ensureInputFits / ensureOutputFits / reclaim()Spiller 层 SpillerBase 及子类从 RowContainer 提取行 → 按 hash 分区 → 排序 → 批量写入子类:HashBuild / AggregationInput / AggregationOutput / Sort* / NoRowContainer / Merge状态管理层 SpillState管理每个分区的 SpillWriter,追踪已 spill 分区集合分区管理层 SpillPartitionId / SpillPartition / SpillPartitionSet层级分区 ID,持有 SpillFiles,提供有序 / 无序读取接口文件 I/O 层 SpillWriter / SpillReadFileRowVector ↔ PrestoPage 序列化 / 反序列化,管理多文件切分配置与统计层 SpillConfig / SpillStats全局配置,原子统计计数器
Fig. Spiller 六层架构:算子触发 → Spiller 抽取分区排序 → State / Partition 管理 → 文件 I/O

核心设计理念

  • 分区化:spill 数据按 hash key 分区,使得每次只需将一个分区加载回内存,避免全量恢复
  • 递归 spill:若恢复某分区后仍 OOM,可对该分区再次 spill(最多 maxSpillLevel 层)
  • 有序与无序分离:sort-based 算子(Aggregation/OrderBy)spill 时保序,以支持归并输出;HashJoin 无需排序
  • 两阶段 spill:Aggregation/OrderBy 区分 input 阶段与 output 阶段,各用不同的 Spiller

2. 基础设施层详解

2.1 SpillConfig — 配置层

文件:velox/common/base/SpillConfig.h

SpillConfig 是所有 spill 行为的控制中枢。

struct SpillConfig {
  // 回调函数(避免与具体 Task/Driver 实现耦合)
  GetSpillDirectoryPathCB   getSpillDirPathCb;         // 惰性获取 spill 目录
  UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb; // 更新全局 quota 并检查是否超限

  // 文件参数
  std::string fileNamePrefix;          // spill 文件名前缀
  uint64_t    maxFileSize;             // 单文件大小上限,超过后自动 rotate
  uint64_t    writeBufferSize;         // 写缓冲区,攒够再 flush
  uint64_t    readBufferSize;          // 读缓冲区,支持 async 双缓冲预读

  // 执行参数
  folly::Executor* executor;           // 非 null 时 spill 写入在后台线程并发执行

  // 内存触发门槛
  int32_t  minSpillableReservationPct;      // 可 spill reservation 不低于 used 的 X%
  int32_t  spillableReservationGrowthPct;   // 一次 reserve 扩容 used 的 X%
  uint64_t maxSpillRunRows;                 // 单 run 最多行数(防止 OOM)
  uint64_t writerFlushThresholdSize;        // TableWriter flush 门槛

  // 分区参数(决定 spill 时的并行度)
  uint8_t  startPartitionBit;    // hash 分区起始 bit(e.g. 29)
  uint8_t  numPartitionBits;     // 每层分区 bit 数(e.g. 3 → 8 分区)
  int32_t  maxSpillLevel;        // 最大递归 spill 层数,-1 表示不限

  // 归并参数
  uint32_t numMaxMergeFiles;     // 归并时最多同时 open 的文件数(防 FD 耗尽)

  // 排序优化
  std::optional<PrefixSortConfig> prefixSortConfig; // 若设置则用 PrefixSort 代替 TimSort

  // 其他
  common::CompressionKind compressionKind;  // 压缩方式
  std::string fileCreateConfig;             // 传递给底层 FileSystem 的创建选项
  uint32_t windowMinReadBatchRows;          // Window 算子读取 batch 最小行数
};

关键计算方法

// 返回当前 bit offset 对应的 spill level
int32_t spillLevel(uint8_t startBitOffset) const;

// 判断是否已超过最大 spill 层数
bool exceedSpillLevelLimit(uint8_t startBitOffset) const;

递归 spill 时,每下一层 startPartitionBit 向右移 numPartitionBits 位, spillLevel 就是 (startBitOffset - config.startPartitionBit) / numPartitionBits


2.2 SpillStats — 统计层

文件:velox/exec/SpillStats.h

所有字段都是 std::atomic,线程安全,支持跨线程并发更新。

struct SpillStats {
  // 计数
  std::atomic_uint64_t spillRuns;           // spill 触发次数
  std::atomic_uint64_t spilledRows;         // spill 的行数
  std::atomic_uint64_t spilledInputBytes;   // spill 前内存中的原始字节数
  std::atomic_uint64_t spilledBytes;        // 写到磁盘的字节数(含压缩)
  std::atomic_uint32_t spilledPartitions;   // spill 的分区数
  std::atomic_uint64_t spilledFiles;        // 生成的 spill 文件数
  std::atomic_uint64_t spillMaxLevelExceededCount; // 超过最大 spill 层数次数

  // 写路径时间(纳秒)
  std::atomic_uint64_t spillFillTimeNanos;           // 填充 SpillRun(哈希分区)
  std::atomic_uint64_t spillSortTimeNanos;           // 排序
  std::atomic_uint64_t spillExtractVectorTimeNanos;  // 从 RowContainer 提取 RowVector
  std::atomic_uint64_t spillSerializationTimeNanos;  // PrestoPage 序列化
  std::atomic_uint64_t spillFlushTimeNanos;          // 压缩 + 拷贝到写缓冲
  std::atomic_uint64_t spillWriteTimeNanos;          // 写到文件系统
  std::atomic_uint64_t spillWrites;                  // write() 调用次数

  // 读路径时间(纳秒)
  std::atomic_uint64_t spillReadBytes;
  std::atomic_uint64_t spillReads;
  std::atomic_uint64_t spillReadTimeNanos;
  std::atomic_uint64_t spillDeserializationTimeNanos;
  IoStats ioStats;
};

每个算子持有自己的 SpillStats 对象(通过 spillStats_.get() 传递),同时每次更新都调用对应的 updateGlobal*() 函数汇总到进程级全局统计(globalSpillStats()),供 query profile 使用。


2.3 SpillFile — 文件 I/O 层

文件:velox/exec/SpillFile.h/.cpp

SpillFileInfo — 元数据

struct SpillFileInfo {
  uint32_t id;                              // 单调递增的文件 ID
  RowTypePtr type;                          // 数据类型
  std::string path;                         // 文件路径
  uint64_t size;                            // 文件字节数
  std::vector<SpillSortKey> sortingKeys;    // 排序键(无序 spill 则为空)
  common::CompressionKind compressionKind;  // 压缩方式
};
using SpillFiles = std::vector<SpillFileInfo>;

SpillWriter — 写路径

继承 SerializedPageFileWriter(底层用 PrestoPage 格式)。

构造参数:
  type, sortingKeys            : 数据类型与排序键
  compressionKind              : 压缩方式
  pathPrefix                   : 文件路径前缀,实际路径为 prefix + "_" + fileId
  targetFileSize               : 单文件目标大小
  writeBufferSize              : 写缓冲区大小
  updateAndCheckSpillLimitCb   : quota 回调

关键流程:
  write(rows, ranges)   → 序列化 RowVector 片段,写入缓冲
  finishFile()          → 强制 flush + 关闭当前文件,记录 SpillFileInfo
  finish()              → 收尾所有文件,返回 SpillFiles 列表

文件切分逻辑(父类 SerializedPageFileWriter 实现):
  每次 write 后,若当前文件大小 >= targetFileSize → 自动 finishFile()

每次 updateFileStats() 回调会调用 updateAndCheckSpillLimitCb,若全局 spill 字节超过 query limit 则抛 VELOX_SPILL_LIMIT_EXCEEDED 异常。

SpillReadFile — 读路径

create(fileInfo, bufferSize, pool, stats) → 打开文件,设置读缓冲
nextBatch(batch)                          → 反序列化下一 batch RowVector
                                          → 若文件系统支持 async IO,双缓冲预读

2.4 SpillPartitionId — 层级分区标识

文件:velox/exec/Spill.h:277

这是支持递归 spill 的核心数据结构。用单个 uint32_t 编码完整的层级路径:

Bit 布局(低位在前):
  bits [2:0]   : Level 1 分区号(最多 8 个)
  bits [5:3]   : Level 2 分区号
  bits [8:6]   : Level 3 分区号
  bits [11:9]  : Level 4 分区号
  bits [28:12] : 未使用
  bits [31:29] : 当前所在层级(0-3)

常量:
  kMaxSpillLevel  = 3   (最多 4 层,层级编号 0~3)
  kMaxPartitionBits = 3 (每层最多 8 分区)

构造方式

SpillPartitionId(uint32_t partitionNumber);           // 创建 level 0 分区
SpillPartitionId(SpillPartitionId parent, uint32_t n); // 创建子分区

有序性operator<):

排序规则:
  1. 相同父节点的子分区按分区号从小到大排列
  2. 不同层级时,按层级路径字典序比较
  3. 浅层节点排在其子节点前面(DFS 前序)

示例:p_0 < p_1 < p_2_0 < p_2_1 < p_2_2 < p_3

这个有序性使 SpillPartitionSetstd::map<SpillPartitionId, ...>)能以 DFS 前序遍历,确保处理完父 partition 后再处理其子 partition(递归 spill 场景)。


2.5 SpillState — 分区状态管理

文件:velox/exec/Spill.h:581, Spill.cpp:131

每个 Spiller 持有一个 SpillState,负责管理所有分区的写入器:

class SpillState {
  // 已 spill 分区的 ID 集合(用于快速判断某 partition 是否已开始 spill)
  SpillPartitionIdSet spilledPartitionIdSet_;

  // partition -> SpillWriter 的映射,folly::Synchronized 保证线程安全
  // 不同 partition 可以并发写入(不同线程写各自的 partition)
  folly::Synchronized<SpillPartitionWriterSet> partitionWriters_;
};

**核心方法 appendToPartition**:

uint64_t SpillState::appendToPartition(
    const SpillPartitionId& id, const RowVectorPtr& rows) {

  // 1. 惰性获取 spill 目录(每次调用都通过 callback,目录可能在首次写时才创建)
  auto spillDir = getSpillDirPathCb_();

  // 2. 若该 partition 还没有 writer,在 wLock 内创建(路径格式:dir/prefix-spill-encodedId)
  partitionWriters_.withWLock([&](auto& lockedWriters) {
    if (!lockedWriters.contains(id)) {
      lockedWriters.emplace(id, std::make_unique<SpillWriter>(
          rows->type(), sortingKeys_,
          compressionKind_,
          fmt::format("{}/{}-spill-{}", spillDir, fileNamePrefix_, id.encodedId()),
          targetFileSize_, writeBufferSize_, ...));
    }
  });

  // 3. 验证序列化大小不超过 2GB(PrestoPage 限制)
  validateSpillBytesSize(rows->estimateFlatSize());
  updateSpilledInputBytes(bytes);

  // 4. 写入(不持有 wLock,不同 partition 并发写互不干扰)
  IndexRange range{0, rows->size()};
  return partitionWriter(id)->write(rows, folly::Range<IndexRange*>(&range, 1));
}

关键设计partitionWriters_ 只在创建 writer 时加 wLock,实际 write() 不加锁——因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突。


2.6 SpillPartition / SpillPartitionSet — 读回路径

文件:velox/exec/Spill.h:457

class SpillPartition {
  SpillPartitionId id_;
  SpillFiles files_;      // 该 partition 的所有文件
  uint64_t size_;         // 总字节数

  // 无序读(HashJoin 用):逐文件顺序消费,不保序
  std::unique_ptr<UnorderedStreamReader<BatchStream>> createUnorderedReader(...);

  // 有序读(Aggregation/OrderBy 用):N 路败者树归并,保序
  std::unique_ptr<TreeOfLosers<SpillMergeStream>> createOrderedReader(...);

  // 均分文件列表为 N 个 shard(并行算子使用)
  std::vector<std::unique_ptr<SpillPartition>> split(int numShards);
};

// SpillPartitionSet 是按 SpillPartitionId 有序排列的 map
using SpillPartitionSet = std::map<SpillPartitionId, std::unique_ptr<SpillPartition>>;

createOrderedReader 的防 FD 耗尽机制

若 files_.size() > numMaxMergeFiles(且 numMaxMergeFiles >= 2):
  1. 将文件分组,每组最多 numMaxMergeFiles 个
  2. 对每组文件创建 TreeOfLosers 做局部归并,写出中间文件
  3. 递归,直到文件数 <= numMaxMergeFiles
  4. 最终对剩余文件建 TreeOfLosers 返回

这避免了同时打开 O(所有文件) 个文件描述符导致的 OOM 或 FD 耗尽。

2.7 读流层:SpillMergeStream / BatchStream

文件:velox/exec/Spill.h:56

BatchStream — 无序批次流(HashJoin 用):

接口:nextBatch(batch) → bool
实现:FileSpillBatchStream        - 单文件
      ConcatFilesSpillBatchStream - 多文件顺序串联

SpillMergeStream — 有序合并流(Aggregation/OrderBy 用):

接口:hasData() / currentIndex() / current() / pop()
实现:FileSpillMergeStream        - 单文件
      ConcatFilesSpillMergeStream - 多文件顺序串联(文件间已有序)

关键方法 compare():按 sortingKeys_ 逐列比较当前行,供 TreeOfLosers 使用
关键方法 pop():消费当前行;若当前 batch 用完,调用 nextBatch() 加载下一批

TreeOfLosers<SpillMergeStream> 是 N 路归并排序的败者树实现,next() 每次返回所有流中最小的行所在的流。


3. Spiller 层详解

3.1 SpillerBase — 抽象基类

文件:velox/exec/Spiller.h:29, Spiller.cpp:31

class SpillerBase {
 protected:
  RowContainer* const container_;    // 持有行数据的容器(nullptr 表示无容器)
  folly::Executor* const executor_;  // 非 null 时并发写 spill
  const HashBitRange bits_;          // hash 分区 bit 范围
  const RowTypePtr rowType_;
  const uint64_t maxSpillRunRows_;   // 单 run 最大行数
  const std::optional<SpillPartitionId> parentId_; // 递归 spill 时的父 ID

  bool finalized_{false};            // finishSpill 后置 true,不允许再写
  SpillState state_;                 // 管理所有 partition writer
  folly::F14FastMap<SpillPartitionId, SpillRun> spillRuns_; // 内存中的行缓冲

  // 纯虚方法(子类决定行为差异)
  virtual bool needSort() const = 0;         // 是否在写前排序
  virtual std::string type() const = 0;      // 类型名称(日志用)
  virtual void extractSpill(folly::Range<char**>, RowVectorPtr&); // 从 container 提取
  virtual void runSpill(bool lastRun);        // 执行一次 spill run 的写入
};

3.2 SpillRun — 内存中的 spill 缓冲

struct SpillRun {
  SpillRows rows;         // char* 指针数组,指向 RowContainer 内部的行
  uint64_t numBytes{0};   // 所有行的估算总字节数(通过 rowSize() 累加)
  bool sorted{false};     // 是否已排序(避免同一 run 被重复排序)

  void clear() {
    rows.clear();
    rows.shrink_to_fit();  // 主动归还内存,spill 后立即回收指针数组
    numBytes = 0;
    sorted = false;
  }
};

SpillRun::rows 存的是 RowContainer 内部的原始行指针(char*),不拷贝行数据本身,仅持有指针。真正的行数据仍在 RowContainer 的 Arena 内存中,直到 extractSpill 时才物化为列式 RowVector。每个分区(SpillPartitionId)对应一个 SpillRun,存在 spillRuns_ map 中。


3.3 fillSpillRuns — 数据填充

这是 spill 流程的第一步,负责扫描 RowContainer、计算 hash、将行指针按分区放入对应 SpillRun。

bool SpillerBase::fillSpillRuns(RowContainerIterator* iterator) {
  constexpr int32_t kHashBatchSize = 4096; // 每次从 RowContainer 取多少行

  bool lastRun{false};
  uint64_t totalRows{0};

  for (;;) {
    // 1. 从 RowContainer 按迭代器顺序取下一批行指针(迭代器原地推进)
    const auto numRows =
        container_->listRows(iterator, kHashBatchSize, rows.data());
    if (numRows == 0) { lastRun = true; break; }  // 全部取完,这是最后一轮

    auto rowSet = folly::Range<char**>(rows.data(), numRows);

    // 2. 计算 hash(逐列迭代,第 2 列起 combine=true,追加 XOR 到已有 hash)
    if (!isSinglePartition) {
      for (auto i = 0; i < container_->keyTypes().size(); ++i) {
        container_->hash(i, rowSet, /*combine=*/i > 0, hashes.data());
      }
    }

    // 3. 按 hash 将行指针分配到对应分区的 SpillRun
    for (auto i = 0; i < numRows; ++i) {
      const auto partitionNum =
          isSinglePartition ? 0 : bits_.partition(hashes[i]);
      auto& spillRun = createOrGetSpillRun(SpillPartitionId(partitionNum));
      spillRun.rows.push_back(rows[i]);
      spillRun.numBytes += container_->rowSize(rows[i]); // 估算行字节数
    }

    totalRows += numRows;
    // 4. 达到 maxSpillRunRows_ 上限时暂停,进入 runSpill,之后再继续
    if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) break;
  }

  markSeenPartitionsSpilled(); // 标记本次出现的所有 partition 为已 spill
  return lastRun;
}

hash() 的多列迭代细节:第 i=0combine=false,直接写 hashes[];后续列 combine=true,将新 hash 值 XOR 到已有结果上。这使多列 hash 只需遍历一次行集合,CPU 友好。


3.4 SpillRun 为何切分 / maxSpillRunRows 的作用

spill() 的主循环是 do { fillSpillRuns; runSpill; } while (!lastRun)maxSpillRunRows_ 控制每轮 fillSpillRuns 最多处理多少行,从而将整个大 RowContainer 的 spill 分成多个小 run 依次执行。这样做有四个原因:

1. 指针数组本身的内存代价

SpillRun::rowsstd::vector<char*>,每个元素 8 字节。若 RowContainer 有 1 亿行,不切分时仅指针数组就需要 800 MBmaxSpillRunRows_ 通常设为数十万行,指针数组只占几 MB。

2. 排序的内存代价(仅 sorted spill)

ensureSortedrun.rows 原地排序(TimSort / PrefixSort),比较函数需要间接访问行数据。run.rows.size() 越大,排序所需的辅助内存和 cache miss 越高。切小 run 使每次排序在 cache 中完成。

3. 分步释放,配合内存仲裁

每轮 runSpill 结束后调用 run.clear()(含 shrink_to_fit()),立即归还指针数组内存。内存仲裁器在两轮 run 之间可观察到内存已下降,有机会停止 spill(若目标内存已释放够了)。若一次性塞满所有行,仲裁器只能在整个 spill 完成后才能看到效果。

4. PrestoPage 序列化限制

SpillState::appendToPartition 调用 validateSpillBytesSize,检查单次写入的估算字节数不超过 2 GBstd::numeric_limits<int32_t>::max())——这是 PrestoPage 协议的字段宽度限制。切分 run 使每次写入远小于这个上限,避免序列化失败。

maxSpillRunRows_ 的取值:
  HashBuildSpiller          → spillConfig->maxSpillRunRows(默认数十万行)
  AggregationInputSpiller   → spillConfig->maxSpillRunRows
  AggregationOutputSpiller  → spillConfig->maxSpillRunRows
  SortInputSpiller          → spillConfig->maxSpillRunRows
  SortOutputSpiller         → spillConfig->maxSpillRunRows
  NoRowContainerSpiller     → 0(无上限,直接 appendToPartition,无 RowContainer)

3.5 行列转换:extractSpill 与 extractSpillVector

spill 的核心转换:行式 RowContainer → 列式 RowVector。这发生在 writeSpill → extractSpillVector → extractSpill 调用链中。

extractSpillVector — 控制每批大小

int64_t SpillerBase::extractSpillVector(
    SpillRows& rows,
    int32_t maxRows,      // 上限:64 行
    int64_t maxBytes,     // 上限:256 KB(估算)
    RowVectorPtr& spillVector,
    size_t& nextBatchIndex) {   // 游标,指向 rows[] 中下一个未处理的行

  // 1. 确定本批行数:不超过 maxRows,且估算总字节不超过 maxBytes
  int32_t numRows = 0;
  int64_t bytes = 0;
  auto limit = std::min(rows.size() - nextBatchIndex, (size_t)maxRows);

  for (; numRows < limit; ++numRows) {
    bytes += container_->rowSize(rows[nextBatchIndex + numRows]);
    if (bytes > maxBytes) {
      ++numRows;  // 超了也要包含这一行(至少保证提取 1 行,避免死循环)
      break;
    }
  }

  // 2. 真正执行行 → 列转换
  extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector);
  nextBatchIndex += numRows;
  return bytes;
}

rowSize() 的计算

uint32_t RowContainer::rowSize(const char* row) const {
  return fixedRowSize_ +
      (rowSizeOffset_
           ? *reinterpret_cast<const uint32_t*>(row + rowSizeOffset_)
           : 0);
}

fixedRowSize_ 是编译时确定的固定部分大小;rowSizeOffset_ 指向行尾的一个 uint32_t 字段,记录该行的变长部分(out-of-line 字符串)总字节数,两者相加为该行的完整内存占用估算。

extractSpill — 行式 → 列式核心转换

void SpillerBase::extractSpill(
    folly::Range<char**> rows,   // 本批 char* 指针
    RowVectorPtr& resultPtr) {

  // 1. 分配或复用 RowVector
  if (resultPtr == nullptr) {
    resultPtr = BaseVector::create<RowVector>(
        rowType_, rows.size(), memory::spillMemoryPool());
  } else {
    resultPtr->prepareForReuse();  // 复用已有分配,避免重复 malloc
    resultPtr->resize(rows.size());
  }

  auto* result = resultPtr.get();

  // 2. 提取各普通列(key + dependent 列)
  for (auto i = 0; i < container_->columnTypes().size(); ++i) {
    container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
  }

  // 3. 提取各 accumulator 列(聚合中间状态)
  const auto& accumulators = container_->accumulators();
  for (auto i = 0; i < accumulators.size(); ++i) {
    accumulators[i].extractForSpill(
        rows, result->childAt(i + container_->columnTypes().size()));
  }
}

spillMemoryPool() 是专用的 spill 内存池,与算子的 operator pool 独立,防止 spill 写入过程中分配的临时内存被算子内存统计。


3.6 RowContainer 行布局与列提取原理

理解行列转换,需要先了解 RowContainer 的行内存布局:

低地址 ↓normalizedKey (可选, 8 字节)← 排序优化前缀null bits (每列 1 bit, 对齐到 8 位)← 列 nullable 标志固定宽度列值 (int32/64, float/double … inline)变长列引用 StringView = 8B ptr + 8B size (数据 out-of-line)accumulator 固定部分 (各 aggregate 状态, inline)← 每个 agg 对齐存放rowSizeOffset (uint32_t, 变长部分总字节数)← 仅含变长列时存在probed flag (1 bit, 仅 right / full join)高地址 ↑
Fig. RowContainer 行布局:normalizedKey 前缀 → null bits → 列值 → accumulator → 元信息

extractColumn(rows[], numRows, columnIndex, result) 的工作:

1. 确定该列在行内的 byte offset(通过 RowColumn 对象,编译时固定)
2. 读取 null bit 填充 result 的 null bitmap
3. 对每行读取 offset 处的值:
   - 固定宽度列:直接 memcpy 4/8 字节
   - StringView 列:读取 ptr+size,若数据是 inline(≤12B)直接复制;
     否则从 HashStringAllocator 的堆上读取实际字符串,写入 result 的 string buffer

关键性能特征

  • 同一列的 offset 在所有行中相同(columnAt(i) 编译时固定),无需解析行 header
  • 变长列(StringView)在行中只存引用,实际字符串在 HashStringAllocator 堆上;提取时需要一次间接访问,可能有 cache miss
  • freezeAndExecute 在 spill 期间冻结 HashStringAllocator,防止其他线程并发修改变长数据

3.7 Accumulator 中间态提取

对于聚合算子,extractForSpill 提取的不是最终聚合结果,而是中间状态(partial intermediate)

void Accumulator::extractForSpill(folly::Range<char**> groups, VectorPtr& result) const {
  spillExtractFunction_(groups, result);
  // 实际调用:aggregate->extractAccumulators(groups.data(), groups.size(), &result)
}

extractAccumulatorsAggregate 虚函数,每种聚合函数自己实现。例如:

  • sum(int64) → 提取 int64 partial sum
  • avg(double) → 提取 (sum: double, count: int64) row struct
  • array_agg(T) → 提取序列化的数组(out-of-line 存储,需要 copy)

恢复(unspill)时,updateRow 调用 aggregate->addSingleGroupIntermediateResults,将磁盘读回的中间状态与内存中的行合并:

void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
  mergeSelection_.setValid(input.currentIndex(), true);
  for (auto i = 0; i < aggregates_.size(); ++i) {
    mergeArgs_[0] = input.current().childAt(i + keyChannels_.size());
    // 将 spill 文件中的中间状态合并到 row 的 accumulator
    aggregates_[i].function->addSingleGroupIntermediateResults(
        row, mergeSelection_, mergeArgs_, false);
  }
}

这实现了"流式归并聚合":相同 key 的行按排好的顺序依次到来,逐行 merge accumulator,避免将整个 group 缓存在内存中。


3.8 writeSpill 内的 Batch 切分(kTargetBatchRows / kTargetBatchBytes)

即使在同一个 SpillRun 内部,writeSpill 也不是一次性将所有行物化为 RowVector,而是每次 extractSpillVector 只处理 最多 64 行或 256 KB(whichever first):

SpillRun.rows = [ptr0, ptr1, ..., ptr_N]   // N 可能有数十万
                    ↓
writeSpill 循环:
  batch 0: [ptr0  .. ptr63]   → extractSpill → RowVector(64行) → appendToPartition
  batch 1: [ptr64 .. ptr127]  → extractSpill → RowVector(64行) → appendToPartition
  ...
  batch K: [ptr_last_start .. ptr_N]        → extractSpill → RowVector(M行) → appendToPartition

为什么要切成 64 行/256KB 的小 batch?

① RowVector 内存峰值控制

每次 extractSpill 分配(或复用)一个 RowVector,其内存由 spillMemoryPool 分配。单批 64 行的 RowVector 内存占用可控(通常几 KB 到几百 KB),而 prepareForReuse() 使相邻批次复用同一块内存,峰值内存 ≈ 单批大小,而非整个 run 大小。

② 与 SpillWriter 写缓冲的配合

SpillWriter 内部有 writeBufferSize 的写缓冲,每次 appendToPartition → write() 将 PrestoPage 数据追加到缓冲;缓冲满时才真正调用文件系统 write()(刷盘)。小批次使每次 write() 调用的数据大小贴近 writeBufferSize,I/O 效率高。

③ "至少 1 行"的保证防止死循环

if (bytes > maxBytes) {
  ++numRows;  // 即使超了也把这一行纳入
  break;
}

若某行本身 > 256 KB(如包含超大字符串),不加这一行就会无限循环(numRows=0 → written 不推进)。++numRows 确保每次循环至少推进一行。

④ 64 行是何依据?

64 行是经验值,使得:

  • 一个 RowVector 的列式存储(FlatVector<int64> = 64×8=512B)可以完全装入 L1 cache
  • extractColumn 的内层循环(每列 64 次写入)具有良好的向量化潜力
  • 与 SpillMergeStream 归并读取的 batch 大小相配(读时也是小批量迭代,减少归并树的重平衡次数)

3.9 runSpill — 并发写入

void SpillerBase::runSpill(bool lastRun) {
  spillStats_->spillRuns.fetch_add(1);

  std::vector<std::shared_ptr<AsyncSource<SpillStatus>>> writes;
  for (const auto& [id, spillRun] : spillRuns_) {
    if (spillRun.rows.empty()) continue;
    writes.push_back(
        memory::createAsyncMemoryReclaimTask<SpillStatus>(
            [partitionId = id, this]() { return writeSpill(partitionId); }));
    // 第 2 个及之后的任务提交到 executor(后台线程),第 1 个在当前线程执行
    if ((writes.size() > 1) && executor_ != nullptr) {
      executor_->add([source = writes.back()]() { source->prepare(); });
    }
  }

  // 折叠守卫:无论成功失败,确保所有后台任务都被 move()(drain)
  auto sync = folly::makeGuard([&]() {
    for (auto& write : writes) { try { write->move(); } catch (...) {} }
  });

  // 主线程阻塞等待所有任务(含第 1 个本线程任务)
  for (auto& write : writes) {
    results.push_back(write->move());
  }

  // 检查错误 + 清空 SpillRun(归还指针数组内存)
  for (auto& result : results) {
    if (result->error) std::rethrow_exception(result->error);
    spillRuns_.at(result->partitionId).clear();
    if (needSort()) {
      state_.finishFile(result->partitionId); // sorted spill:每 run 一个文件
    }
  }
}

并发策略细节

  • 第 1 个分区的写任务在调用线程上通过 write->move() 触发(AsyncSource::prepare() 从未被调用,move() 内联执行)
  • 第 2+ 个分区提交到 executor_(后台 CPU 线程池)并发执行
  • createAsyncMemoryReclaimTask 确保任务在内存仲裁器视角下是"可取消的回收任务",若系统 OOM 时仲裁器能感知到这些任务正在运行

state_.finishFile(partitionId) 调用 SpillWriter::finishFile(),flush 当前文件并记录 SpillFileInfo,下次 appendToPartition 会创建新文件。这使每个 sorted run 独立成文,后续归并时文件间已有序。


3.10 finishSpill — 收尾

void SpillerBase::finishSpill(SpillPartitionSet& partitionSet) {
  finalizeSpill();  // finalized_ = true,之后不再允许写入

  for (const auto& partitionId : state_.spilledPartitionIdSet()) {
    // 递归 spill 时,要将本层 partition ID 包装进父 ID 形成完整路径
    auto wholeId = parentId_.has_value()
        ? SpillPartitionId(parentId_.value(), partitionId.partitionNumber())
        : partitionId;

    // state_.finish(partitionId) 调用 SpillWriter::finish(),
    // 关闭所有文件,返回 SpillFiles(文件路径、大小、排序键等元数据)
    if (partitionSet.count(wholeId) == 0) {
      partitionSet.emplace(wholeId,
          std::make_unique<SpillPartition>(wholeId, state_.finish(partitionId)));
    } else {
      // 多个 Spiller(多线程 HashBuild)的文件合并到同一 partition
      partitionSet[wholeId]->addFiles(state_.finish(partitionId));
    }
  }
}

partitionSetstd::map<SpillPartitionId, ...>,按 SpillPartitionId 有序排列(DFS 前序),确保递归 spill 场景下父 partition 先于子 partition 被处理。


3.11 各子类 Spiller 详解

HashBuildSpiller

needSort      : false(hash join 恢复时重建 hash 表,顺序无关)
HashBitRange  : [startPartitionBit, startPartitionBit + numPartitionBits)(分区存储)
targetFileSize: spillConfig->maxFileSize
maxSpillRunRows: spillConfig->maxSpillRunRows
parentId_     : restoringPartitionId_(递归时指定父 partition)

差异点——重写 extractSpill 附加 probed flag 列:

void HashBuildSpiller::extractSpill(folly::Range<char**> rows, RowVectorPtr& resultPtr) {
  // 先提取普通列(key + dependent)
  for (auto i = 0; i < container_->columnTypes().size(); ++i) {
    container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
  }
  // 若是 right/full join,额外提取每行的 probed 标志位
  if (spillProbeFlag_) {
    auto* flagVector = result->childAt(types.size())->asFlatVector<bool>();
    for (auto i = 0; i < rows.size(); ++i) {
      flagVector->set(i, container_->hasProbedFlag(rows[i]));
      // hasProbedFlag 直接读行内 probedFlagOffset_ 处的 bit
    }
  }
}

spill() 重载1(spillTriggered_=true + 全量 spill):

void HashBuildSpiller::spill() {
  spillTriggered_ = true;
  SpillerBase::spill(nullptr);  // fillSpillRuns(nullptr) → 从头扫全部行
}

spill(partitionId, spillVector) 重载2(直通路径,已 triggered 后):

void HashBuildSpiller::spill(const SpillPartitionId& id, const RowVectorPtr& vec) {
  // 无需经过 RowContainer,直接 appendToPartition
  if (!state_.isPartitionSpilled(id)) state_.setPartitionSpilled(id);
  state_.appendToPartition(id, vec);
}

AggregationInputSpiller

needSort      : true(按所有 group key 排序,支持流式归并聚合)
HashBitRange  : [startPartitionBit, startPartitionBit + numPartitionBits)
targetFileSize: uint64_t::max(文件大小不限,由 maxFileSize 配置控制)
maxSpillRunRows: spillConfig->maxSpillRunRows
sortingKeys   : 所有 key 列,升序,nulls last

继承默认 extractSpill:提取 key 列 + 各 accumulator 的 extractAccumulators 中间状态。

每次 runSpill 后(needSort=true),调用 state_.finishFile(id) 关闭当前文件——每个 run 的数据写到独立的 sorted 文件。后续 createOrderedReader 用败者树对所有文件做 N 路归并,还原全局有序流。

关键约束:rows->stringAllocator().freezeAndExecute([&]() { inputSpiller_->spill(); }) 在 spill 期间冻结 HashStringAllocator,防止 accumulator 在 spill 时并发分配/释放变长内存。


AggregationOutputSpiller

needSort      : false(hash 表 scan 顺序即为输出顺序,不需要重排)
HashBitRange  : {}(空,单 partition 0)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows

spill(startRowIter)rowIterator 指定的偏移开始扫描(支持从已输出的位置继续):

void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) {
  state_.setPartitionSpilled(SpillPartitionId(0)); // 单分区
  SpillerBase::spill(&startRowIter);  // 从 startRowIter 开始迭代
}

重写 runSpill:在最后一次 run 后强制 finishFile

void AggregationOutputSpiller::runSpill(bool lastRun) {
  SpillerBase::runSpill(lastRun);
  if (lastRun) {
    for (const auto& [id, _] : spillRuns_) {
      state_.finishFile(id); // 确保最后的数据落盘
    }
  }
}

SortInputSpiller

needSort      : true(按 sort key 排序)
HashBitRange  : {}(空,单 partition 0,OrderBy 无 hash 分区)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows
sortingKeys   : 所有 sort 列,对应 CompareFlags(方向 + nulls 位置)

spillerStoreType_ 把 sort 列排在最前面,这样归并时只需比较前 N 列(减少 cache miss)。

spill() 直接调用 SpillerBase::spill(nullptr),无参数——从 RowContainer 头部开始扫全部行。


SortOutputSpiller

needSort      : false(调用方已传入排好序的 SpillRows)
HashBitRange  : {}(单 partition 0)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows

接口与其他 spiller 不同——接受外部已排序的行指针数组:

void SortOutputSpiller::spill(SpillRows& rows) {
  auto& run = createOrGetSpillRun(SpillPartitionId(0));
  // 直接接管外部已排序的行指针(sortedRows_ 的剩余部分)
  run.rows = SpillRows(rows.begin(), rows.end(), run.rows.get_allocator());
  for (const auto* row : rows) run.numBytes += container_->rowSize(row);
  markSeenPartitionsSpilled();
  runSpill(/*lastRun=*/true);  // 一次性完成,不分多 run
}

重写 runSpill:执行完后立即 finishFile,因为这是唯一一次写入。


NoRowContainerSpiller / MergeSpiller

needSort      : false
container_    : nullptr(无 RowContainer)
maxSpillRunRows: 0(不使用)

无 RowContainer,通过 spill(partitionId, spillVector) 直接将外部 RowVector 写入磁盘,跳过整个 fillSpillRuns → runSpill → extractSpill 流程:

void NoRowContainerSpiller::spill(
    const SpillPartitionId& id, const RowVectorPtr& spillVector) {
  if (!state_.isPartitionSpilled(id)) state_.setPartitionSpilled(id);
  state_.appendToPartition(id, spillVector); // 直接写磁盘
}

MergeSpillerNoRowContainerSpiller 的子类,额外携带 sortingKeys(用于 hash join probe 侧的归并 spill,需要保留排序信息以便后续 probe 侧归并读取)。


各子类对比

子类 needSort HashBitRange container_ 特殊行为
HashBuildSpiller false 多分区 重写 extractSpill:附加 probed flag 列;双重 spill() 入口
AggregationInputSpiller true 多分区 每 run 对应独立 sorted 文件;freeze allocator
AggregationOutputSpiller false 单分区(空) 从迭代器偏移开始;重写 runSpill 收尾
SortInputSpiller true 单分区(空) sort 列前置;每 run 独立 sorted 文件
SortOutputSpiller false 单分区(空) 接受外部已排序 SpillRows;重写 runSpill
NoRowContainerSpiller false 多分区 nullptr 直接 appendToPartition,无 fill/extract 流程
MergeSpiller false 多分区 nullptr 同上,额外保留 sortingKeys

4. HashJoin Spill 实现

4.1 整体流程

HashJoin spill 只发生在 Build 侧(HashBuild),主要用于处理 build 表过大的情况。Probe 侧配合 build 侧,读取对应的 spill 分区数据。

Build 阶段addInput → ensureInputFits → [内存不足] → reclaimspill 触发后:addInput → spillInput → 直接 spill 到对应 partitionnoMoreInput → finishHashBuild → finishSpill → joinBridge.setHashTablepostHashBuildProcess → waitForProbe / setupSpillInputProbe 阶段探测内存中的 hash 表 + 同步 spill 对应分区的 probe 侧数据joinBridge.setSpillPartitions → build 侧 setupSpillInputRestore 阶段(递归,每个 spill partition 一轮)setupSpillInput → processSpillInput → addInput(重建表)→ finishHashBuild → 新一轮 probe / 再次 spill(递归)
Fig. HashJoin spill 三阶段:Build 落盘 → Probe 同步分区 → Restore 逐分区递归重建

4.2 HashBuildSpiller

class HashBuildSpiller : public SpillerBase {
  const bool spillProbeFlag_;   // 是否需要 spill probed 标记(right/full join)
  bool spillTriggered_{false};  // 一旦触发 spill,后续所有输入直接 spill

  // spill() 重载1:spill RowContainer 中的所有行(内存仲裁触发)
  void spill() {
    spillTriggered_ = true;
    SpillerBase::spill(nullptr);  // → fillSpillRuns → runSpill → writeSpill
  }

  // spill() 重载2:spill 一个 partition 的输入向量(spillTriggered 后的直通路径)
  void spill(const SpillPartitionId& partitionId, const RowVectorPtr& spillVector) {
    VELOX_CHECK(spillTriggered_);
    if (!state_.isPartitionSpilled(partitionId)) {
      state_.setPartitionSpilled(partitionId);
    }
    state_.appendToPartition(partitionId, spillVector);
  }
};

**重写 extractSpill**:对于 right/full outer join,需要在 spill 数据中附加一列 bool probed_flag,记录该行是否已被 probe 侧命中:

void HashBuildSpiller::extractSpill(folly::Range<char**> rows, RowVectorPtr& resultPtr) {
  // 提取普通列
  for (auto i = 0; i < types.size(); ++i) {
    container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
  }
  // 若需要 probed flag,从 RowContainer 的标记位提取
  if (spillProbeFlag_) {
    auto* flagVector = result->childAt(types.size())->asFlatVector<bool>();
    for (auto i = 0; i < rows.size(); ++i) {
      flagVector->set(i, container_->hasProbedFlag(rows[i]));
    }
  }
}

4.3 setupSpiller

HashBuild 初始化时和每次 restore 时调用:

void HashBuild::setupSpiller(SpillPartition* spillPartition) {
  if (!canSpill()) return;

  // 首次调用:构造 spillType_(在普通列后可能追加 bool 列用于 probed flag)
  if (spillType_ == nullptr) {
    spillType_ = hashJoinTableSpillType(tableType_, joinType_);
    if (needProbedFlagSpill_) {
      spillProbedFlagChannel_ = spillType_->size() - 1;
      // 初始化 false 常量向量(build 侧 spill 时所有行未被 probe)
      spillProbedFlagVector_ = std::make_shared<ConstantVector<bool>>(
          pool(), 0, /*isNull=*/false, BOOLEAN(), false);
    }
  }

  uint8_t startPartitionBit = config->startPartitionBit;

  if (spillPartition != nullptr) {
    // Restore 场景:从 spill 文件中读取数据重建 hash 表
    spillInputReader_ = spillPartition->createUnorderedReader(
        config->readBufferSize, pool(), spillStats_.get());
    restoringPartitionId_ = spillPartition->id();

    // 递归 spill:bit offset 右移一层
    startPartitionBit = partitionBitOffset(
        spillPartition->id(), startPartitionBit, numPartitionBits) + numPartitionBits;

    // 若超过最大 spill 层数,禁用本轮 spill(只能尽力处理)
    if (config->exceedSpillLevelLimit(startPartitionBit)) {
      exceededMaxSpillLevelLimit_ = true;
      return;
    }
  }

  spiller_ = std::make_unique<HashBuildSpiller>(
      joinType_, restoringPartitionId_,
      table_->rows(), spillType_,
      HashBitRange(startPartitionBit, startPartitionBit + config->numPartitionBits),
      config, spillStats_.get());
}

4.4 ensureInputFits — 内存压力检测

每次 addInput 前调用,是 proactive spill 的入口:

void HashBuild::ensureInputFits(RowVectorPtr& input) {
  if (!canSpill() || spiller_ == nullptr || spiller_->spillTriggered()) return;

  // 估算本批 input 引起的内存增量
  const auto tableIncrementBytes = table_->hashTableSizeIncrease(input->size());
  const auto rowContainerIncrementBytes = rows->sizeIncrement(input->size(), ...);
  const auto incrementBytes = rowContainerIncrementBytes + tableIncrementBytes;

  // 若当前预留充足,直接返回
  if (availableReservationBytes >= minReservationBytes) {
    if (freeRows > input->size() && outOfLineFreeBytes >= flatBytes) return;
    if (pool()->availableReservation() > 2 * incrementBytes) return;
  }

  // 尝试扩容 reservation(扩容本身可能触发内存仲裁,进而触发 spill)
  const auto targetIncrementBytes = std::max<int64_t>(
      incrementBytes * 2,
      currentUsage * spillConfig_->spillableReservationGrowthPct / 100);

  {
    Operator::ReclaimableSectionGuard guard(this); // 标记为可被仲裁
    if (pool()->maybeReserve(targetIncrementBytes)) {
      if (spiller_->spillTriggered()) {
        // 扩容过程中仲裁器触发了 spill,不需要这份预留了
        pool()->release();
      }
      return;
    }
  }
  // maybeReserve 失败(OOM),后续仲裁器会调用 reclaim()
}

4.5 spillInput — 正在 spill 时的直通路径

一旦 spiller_->spillTriggered() 为 true,所有新输入不再进入 hash 表,而是直接按 partition 分发到 spill 文件:

void HashBuild::spillInput(const RowVectorPtr& input) {
  if (!canSpill() || spiller_ == nullptr || !spiller_->spillTriggered()
      || !activeRows_.hasSelections()) return;

  // 1. 重新计算所有行的 hash(与 addInput 中的计算可能重复,代价可接受)
  computeSpillPartitions(input); // 结果存入 spillPartitions_[row]

  // 2. 按 partition 分组,并从 activeRows_ 中移除(防止重复插入 hash 表)
  for (auto row = 0; row < numInput; ++row) {
    if (!activeRows_.isValid(row)) continue;
    activeRows_.setValid(row, false); // 从 active 中移除
    rawSpillInputIndicesBuffers_[spillPartitions_[row]][numSpillInputs_[partition]++] = row;
  }

  // 3. 为非 spill 原始输入构建 spillChildVectors_(key + dependent + probed_flag)
  maybeSetupSpillChildVectors(input);

  // 4. 按 partition 逐批 spill
  for (uint32_t partition = 0; partition < numSpillInputs_.size(); ++partition) {
    spillPartition(partition, numSpillInputs_[partition],
                   spillInputIndicesBuffers_[partition], input);
  }
}

4.6 finishHashBuild — 收尾与传递

所有 build 线程完成后,最后一个线程执行:

bool HashBuild::finishHashBuild() {
  // ...(等待所有 peer 线程完成)

  SpillPartitionSet spillPartitions;

  // 收集所有 peer 的 spill 文件(多线程 build 的各自 spill 合并)
  for (auto* build : otherBuilds) {
    if (build->spiller_ != nullptr) {
      build->spiller_->finishSpill(spillPartitions);
    }
  }
  if (spiller_ != nullptr) {
    spiller_->finishSpill(spillPartitions);
    removeEmptyPartitions(spillPartitions);
  }

  // 构建 hash 表(只包含未 spill 的行)
  table_->prepareJoinTable(std::move(otherTables), ...);

  // 注册 tableSpillFunc:允许 join bridge 在 probe 后 spill hash 表
  HashJoinTableSpillFunc tableSpillFunc;
  if (canReclaim()) {
    tableSpillFunc = [hashBitRange = spiller_->hashBits(), ...](auto table) {
      return spillHashJoinTable(table, ...);
    };
  }

  // 将 hash 表 + spill partitions 一起传给 join bridge
  joinBridge_->setHashTable(table, std::move(spillPartitions),
                            joinHasNullKeys_, std::move(tableSpillFunc));
}

4.7 reclaim — 内存仲裁触发路径

内存仲裁器调用 reclaim() 强制 spill:

void HashBuild::reclaim(uint64_t, memory::MemoryReclaimer::Stats& stats) {
  // 状态检查:只有 kRunning/kWaitForBuild/kYield 状态且不在非可回收区才能 spill
  if (nonReclaimableState()) {
    ++stats.numNonReclaimableAttempts;
    return;
  }

  // 暂停所有 peer driver(task->pauseRequested() 保证)
  const std::vector<Operator*> operators =
      task->findPeerOperators(pipelineId, this);

  // 检查所有 peer 都可被回收
  for (auto* op : operators) {
    if (static_cast<HashBuild*>(op)->nonReclaimableState()) {
      ++stats.numNonReclaimableAttempts;
      return;
    }
  }

  // 收集所有 peer 的 spiller,统一 spill 整个 hash 表
  std::vector<HashBuildSpiller*> spillers;
  for (auto* op : operators) {
    spillers.push_back(static_cast<HashBuild*>(op)->spiller_.get());
  }
  spillHashJoinTable(spillers, config); // 核心:spill 所有 peer 的 hash 表

  // 清空 hash 表(已 spill,不再需要)+ 释放 memory pool reservation
  for (auto* op : operators) {
    static_cast<HashBuild*>(op)->table_->clear(true);
    static_cast<HashBuild*>(op)->pool()->release();
  }
}

spillHashJoinTable 内部对每个 spiller 调用 spiller->spill()(即 HashBuildSpiller::spill()),将所有行按 hash 分区写到磁盘。

4.8 postHashBuildProcess / setupSpillInput — 递归恢复

Build 完成后,询问 join bridge 是否有 spill 数据需要 restore:

void HashBuild::postHashBuildProcess() {
  if (!canSpill()) {
    setState(State::kFinish);
    return;
  }

  // 向 join bridge 请求下一个要恢复的 spill partition
  auto spillInput = joinBridge_->spillInputOrFuture(&future_);
  if (!spillInput.has_value()) {
    setState(State::kWaitForProbe); // 等待 probe 侧完成并传递 spill partition
    return;
  }
  setupSpillInput(std::move(spillInput.value()));
}

void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
  if (spillInput.spillPartition == nullptr) {
    setState(State::kFinish); // 没有更多 spill partition,结束
    return;
  }

  // 重置所有状态,为本次 restore 重新初始化
  table_.reset(); spiller_.reset(); spillInputReader_.reset();
  restoringPartitionId_.reset();

  // 列顺序重置(spill 文件中列已按 key + dependent 排好)
  std::iota(keyChannels_.begin(), keyChannels_.end(), 0);
  std::iota(dependentChannels_.begin(), dependentChannels_.end(), keyChannels_.size());

  setupTable();
  setupSpiller(spillInput.spillPartition.get()); // 建立新的 spiller(可能是递归层)
  processSpillInput();                           // 开始读取并重建
}

4.9 processSpillInput — 从 spill 文件重建

void HashBuild::processSpillInput() {
  while (spillInputReader_->nextBatch(spillInput_)) {
    // 复用 addInput 路径:重新 hash + 插入 hash 表(或再次 spill 到下一层)
    addInput(std::move(spillInput_));
    if (!isRunning()) return;
    if (shouldYield()) {
      state_ = State::kYield;
      future_ = ContinueFuture{folly::Unit{}};
      return;
    }
  }
  noMoreInputInternal(); // → finishHashBuild → postHashBuildProcess(循环)
}

这形成了递归循环:processSpillInput → addInput → ensureInputFits → [spill] → noMoreInput → finishHashBuild → postHashBuildProcess → setupSpillInput → processSpillInput

4.10 Probe 侧 Spill 详解

Probe 侧(HashProbe)的 spill 涉及三条路径,通过 HashJoinBridge 与 build 侧协调:

HashJoinBridge — Build / Probe 协调中枢setHashTable(table, spillPartitionSet, …)probe 侧 asyncWaitForHashTable 取到 HashBuildResultspillPartitionIds 非空 → 触发 input spillprobeFinished()last prober 调用,通知 bridge 准备下一个要恢复的 partition从 spillPartitionSets_ 取下一个 → split 为 N shard 分发给各 build 线程appendSpilledHashTablePartitions(spillPartitionSet)probe 侧 reclaim 时,将被 spill 的 hash table 分区交还 bridge
Fig. HashJoinBridge:setHashTable / probeFinished / append… 三个方法协调 build 与 probe 的 spill

HashBuildResult — Build/Probe 信息传递载体

struct HashBuildResult {
  std::shared_ptr<BaseHashTable> table;                  // 刚建好的 hash 表
  std::optional<SpillPartitionId> restoredPartitionId;   // 非 null 表示本次是 restore 轮
  SpillPartitionIdSet spillPartitionIds;                 // 本轮 build 中被 spill 的 partition ID
  bool hasNullKeys;
};

spillPartitionIds 非空 ⟺ build 侧触发了 spill,probe 侧必须同步将对应行 spill 到磁盘。 restoredPartitionId 非 null ⟺ 本次 hash 表是从 spill 文件 restore 得来的,probe 侧需要从对应的 spill 文件中读取之前被 spill 的 probe 行。


路径 1:input spill(探测侧主路径)

触发时机asyncWaitForHashTable 拿到 HashBuildResult 后,发现 spillPartitionIds 非空。

void HashProbe::asyncWaitForHashTable() {
  auto hashBuildResult = joinBridge_->tableOrFuture(&future_);
  table_ = hashBuildResult->table;
  initializeResultIter();

  // 1. 若本次是 restore 轮,打开对应 partition 的 probe spill 文件
  maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId);

  // 2. 若 build 侧有 spill partition,初始化 inputSpiller_
  maybeSetupInputSpiller(hashBuildResult->spillPartitionIds);
  checkMaxSpillLevel(hashBuildResult->restoredPartitionId);
}

**maybeSetupInputSpiller**:

void HashProbe::maybeSetupInputSpiller(
    const SpillPartitionIdSet& spillPartitionIds) {

  spillInputPartitionIds_ = spillPartitionIds;
  if (spillInputPartitionIds_.empty()) return;

  // 计算 bit offset(与 build 侧一致,保证 hash 分区对齐)
  const auto bitOffset = partitionBitOffset(
      *spillInputPartitionIds_.begin(),
      spillConfig()->startPartitionBit,
      spillConfig()->numPartitionBits);

  // NoRowContainerSpiller:无 RowContainer,直接将外部 RowVector 写到磁盘
  inputSpiller_ = std::make_unique<NoRowContainerSpiller>(
      probeType_,
      restoringPartitionId_,  // 递归时携带父 partition ID
      HashBitRange(bitOffset, bitOffset + spillConfig()->numPartitionBits),
      spillConfig(),
      spillStats_.get());

  // 只 spill build 侧已 spill 的那些 partition(其余 partition 可以正常 probe)
  inputSpiller_->setPartitionsSpilled(spillInputPartitionIds_);

  // SpillPartitionFunction:计算每行属于哪个 partition(与 build 侧使用相同的 hash bit 范围)
  spillPartitionFunction_ = std::make_unique<SpillPartitionFunction>(
      SpillPartitionIdLookup(spillInputPartitionIds_, ...),
      probeType_, keyChannels_);
}

**spillInput**(在 addInput 中调用):

void HashProbe::spillInput(RowVectorPtr& input) {
  const auto numInputRows = input->size();
  // 预分配各 partition 的 indices buffer(复用避免重复 malloc)
  prepareInputIndicesBuffers(numInputRows,
      inputSpiller_->state().spilledPartitionIdSet());

  // 为每行计算所属 partition
  spillPartitionFunction_->partition(*input, spillPartitions_);

  vector_size_t numNonSpillingInput = 0;
  for (auto row = 0; row < numInputRows; ++row) {
    const auto& partitionId = spillPartitions_[row];
    if (!inputSpiller_->state().isPartitionSpilled(partitionId)) {
      // 该 partition 未被 build 侧 spill,行留在内存继续 probe
      rawNonSpillInputIndicesBuffer_[numNonSpillingInput++] = row;
    } else {
      // 该 partition 已被 build 侧 spill,行需要 spill 到磁盘
      rawSpillInputIndicesBuffers_.at(partitionId)
          [numSpillInputs_.at(partitionId)++] = row;
    }
  }

  if (numNonSpillingInput == numInputRows) return; // 无行需要 spill

  // 强制加载所有 lazy 列(spill 序列化需要实际数据)
  for (int32_t i = 0; i < input->childrenSize(); ++i) {
    input->childAt(i)->loadedVector();
  }

  // 按 partition 分批 spill(wrap 创建 DictionaryVector,不拷贝数据,只共享 indices)
  for (const auto& [partitionId, numRows] : numSpillInputs_) {
    if (numRows == 0) continue;
    inputSpiller_->spill(
        partitionId,
        wrap(numRows, spillInputIndicesBuffers_.at(partitionId), input));
  }

  if (numNonSpillingInput == 0) {
    input = nullptr;     // 所有行都 spill 了,无需继续 probe
  } else {
    input = wrap(numNonSpillingInput, nonSpillInputIndicesBuffer_, input);
    // 只保留未 spill 的行继续 probe
  }
}

关键设计wrap 使用 DictionaryVector,不拷贝行数据,只是 indices 引用,spill 写入时才通过 loadedVector() 强制物化。这样同一个 input batch 里属于不同 partition 的行可以零拷贝地分发到各自的 spill 文件。

**noMoreInputInternal**(探测完成,收尾 input spill):

void HashProbe::noMoreInputInternal() {
  noMoreSpillInput_ = true;
  if (!spillInputPartitionIds_.empty()) {
    // 关闭 inputSpiller_,将所有分区的 SpillFiles 收集到 inputSpillPartitionSet_
    inputSpiller_->finishSpill(inputSpillPartitionSet_);
    // 注:NoRowContainerSpiller 不排序,spillSortTimeNanos 永远为 0
  }

  // 等待所有 peer probe 线程完成(防止一个线程先收尾而另一个还在写同一 partition)
  if (!operatorCtx_->task()->allPeersFinished(...)) {
    setState(ProbeOperatorState::kWaitForPeers);
    return;
  }

  lastProber_ = true; // 最后一个完成的 probe 线程负责通知 build 侧
}

路径 2:restore 轮的 probe(从 spill 文件读取 probe 行)

当 build 侧从某个 spill partition 重建 hash 表后,probe 侧必须把之前 spill 到磁盘的对应 probe 行读回重新 probe。

**maybeSetupSpillInputReader**:

void HashProbe::maybeSetupSpillInputReader(
    const std::optional<SpillPartitionId>& restoredPartitionId) {
  if (!restoredPartitionId.has_value()) return;

  // 从 inputSpillPartitionSet_ 中取出对应 partition 的 SpillPartition
  auto iter = inputSpillPartitionSet_.find(restoredPartitionId.value());
  auto partition = std::move(iter->second);
  restoringPartitionId_ = restoredPartitionId;

  // 创建无序读取器(probe 行之间无顺序要求,顺序读即可)
  spillInputReader_ = partition->createUnorderedReader(
      spillConfig_->readBufferSize, pool(), spillStats_.get());
  inputSpillPartitionSet_.erase(iter);
}

**addSpillInput**(在 isBlockedkRunning 状态时轮询调用):

void HashProbe::addSpillInput() {
  if (input_ != nullptr || noMoreSpillInput_) return;

  if (!spillInputReader_->nextBatch(input_)) {
    // spill 文件读完,走正常的 noMoreInput 流程
    noMoreInputInternal();
    return;
  }

  // 复用 addInput 路径(包含 spillInput 检查,支持递归 spill)
  addInput(std::move(input_));
}

路径 3:output spill(内存仲裁触发 reclaim)

触发条件:probe 正在处理某批 input 时,输出过大导致内存仲裁器调用 reclaim()

**reclaim**:

void HashProbe::reclaim(uint64_t, memory::MemoryReclaimer::Stats& stats) {
  if (exceededMaxSpillLevelLimit_ || nonReclaimableState()) {
    ++stats.numNonReclaimableAttempts; return;
  }

  const auto probeOps = findPeerOperators();
  bool hasMoreProbeInput = false;
  for (auto* probeOp : probeOps) {
    if (probeOp->nonReclaimableState()) { ++stats.numNonReclaimableAttempts; return; }
    hasMoreProbeInput |= !probeOp->noMoreSpillInput_;
  }

  // 步骤 1:并发将所有 peer 的 pending input 产出的 output 先 spill 到磁盘
  spillOutput(probeOps);

  // 步骤 2:若还有 probe input 未处理,spill hash 表(释放 build 侧内存)
  SpillPartitionSet spillPartitionSet;
  if (hasMoreProbeInput) {
    spillPartitionSet = spillHashJoinTable(
        table_, restoringPartitionId_, tableSpillHashBits_,
        joinNode_, spillConfig(), spillStats_.get());
  }

  // 步骤 3:通知各 probe 算子设置 inputSpiller_(后续输入按新 partitions spill)
  const auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);
  for (auto* probeOp : probeOps) {
    probeOp->clearBuffers();
    if (!spillPartitionSet.empty()) {
      probeOp->maybeSetupInputSpiller(spillPartitionIdSet);
    }
    probeOp->pool()->release();
  }

  // 步骤 4:清空 hash 表 + 将 spill partitions 注册到 join bridge(供 build 侧后续恢复)
  table_->clear(true);
  if (!spillPartitionIdSet.empty()) {
    joinBridge_->appendSpilledHashTablePartitions(std::move(spillPartitionSet));
  }
}

spillOutput(单个算子)

void HashProbe::spillOutput() {
  if (input_ == nullptr && !needLastProbe()) return;

  // 用 NoRowContainerSpiller 将 pending input 产生的所有 output 写到磁盘(单一 partition 0)
  auto outputSpiller = std::make_unique<NoRowContainerSpiller>(
      outputType_, std::nullopt, HashBitRange{}, spillConfig(), spillStats_.get());
  outputSpiller->setPartitionsSpilled({SpillPartitionId(0)});

  // 调用 getOutputInternal(toSpillOutput=true):产出 output 但直接 spill 而非返回
  for (;;) {
    auto output = getOutputInternal(/*toSpillOutput=*/true);
    if (output != nullptr) {
      for (int32_t i = 0; i < output->childrenSize(); ++i) {
        output->childAt(i)->loadedVector(); // 强制物化 lazy 列
      }
      outputSpiller->spill(SpillPartitionId(0), output);
      continue;
    }
    if (input_ == nullptr) break; // 当前 input 批次已处理完
    // right semi join 特殊:input_ 不为 null 但 output 为 null 属于正常状态
    break;
  }

  // 收尾:将 spill 文件信息存入 spillOutputPartitionSet_(供后续读回)
  outputSpiller->finishSpill(spillOutputPartitionSet_);
  removeEmptyPartitions(spillOutputPartitionSet_);
}

读回阶段:下次 getOutput 调用时,maybeReadSpillOutput() 将磁盘数据读回:

bool HashProbe::maybeReadSpillOutput() {
  if (spillOutputReader_ == nullptr) {
    maybeSetupSpillOutputReader(); // 首次:打开 UnorderedStreamReader
  }
  if (spillOutputReader_ == nullptr) return false;

  if (!spillOutputReader_->nextBatch(output_)) {
    spillOutputReader_.reset(); // 读完,重置
    return false;
  }
  return true; // 将读回的 output 直接返回给调用方
}

output spill 特殊注意getOutputInternal(toSpillOutput=true) 被调用时,若发现 input_ 为 null(即没有 pending input),会提前返回 null——这保证了 reclaim 路径不会误进入"probe 完成逻辑"(如 prepareForSpillRestore),避免状态混乱。


prepareForSpillRestore — 多轮恢复的关键

当一轮 probe(含当前 hash 表和对应 spill 文件的 probe 行)全部完成后,最后一个 probe 线程(lastProber_)负责启动下一轮恢复:

void HashProbe::prepareForSpillRestore() {
  // 重置本轮相关状态
  noMoreSpillInput_ = false;
  if (lastProber_) {
    table_->clear(true); // 清空当前 hash 表,释放内存
  }
  table_.reset();
  inputSpiller_.reset();
  spillInputReader_.reset();
  restoringPartitionId_.reset();
  spillInputPartitionIds_.clear();
  spillOutputReader_.reset();

  if (!lastProber_) return;

  // 通知 join bridge:probe 侧已完成,请准备下一个 spill partition
  joinBridge_->probeFinished();
  // bridge 内部:从 spillPartitionSets_ 取下一个 partition,均分为 N shard,
  // 放入 restoringSpillShards_,build 侧 spillInputOrFuture() 返回各自的 shard

  // 唤醒等待的 peer probe 线程(之前在 kWaitForPeers 状态阻塞)
  wakeupPeerOperators();
  lastProber_ = false;
}

Probe 侧 Spill 全生命周期

asyncWaitForHashTable()
    │
    ├─ spillPartitionIds 非空 → maybeSetupInputSpiller()
    │      创建 NoRowContainerSpiller,标记需要 spill 的 partition 集合
    │
    ├─ restoredPartitionId 非 null → maybeSetupSpillInputReader()
    │      打开 UnorderedStreamReader,顺序读取之前 spill 的 probe 行
    │
addInput(input)
    │
    └─ needToSpillInput()? → spillInput(input)
           按 hash 分 partition:
             spilled partition → DictionaryVector wrap → inputSpiller_.spill()
             non-spilled partition → 继续正常 probe(无拷贝)
    │
getOutput() → ensureOutputFits()
    │
    ├─ maybeReadSpillOutput():读取上次 spillOutput 写到磁盘的 output batch
    │
    └─ 内存不足 → reclaim():
           1. spillOutput(probeOps)        并发 spill 各 probe 的 pending output
           2. spillHashJoinTable(table)     将 hash 表 spill 到磁盘
           3. maybeSetupInputSpiller(ids)   各 probe 设置新 inputSpiller_
           4. appendSpilledHashTablePartitions() → 通知 bridge
    │
isBlocked(kRunning) + spillInputReader_ 非 null → addSpillInput()
    │    逐 batch 从 spill 文件读取 probe 行 → addInput() → spillInput() → probe
    │
noMoreInputInternal()
    │
    ├─ inputSpiller_.finishSpill(inputSpillPartitionSet_)
    └─ allPeersFinished() → lastProber_ = true
             │
             ▼
        hasMoreSpillData()? → prepareForSpillRestore()
             │                    joinBridge_->probeFinished()
             │                    wakeupPeerOperators()
             ▼
        asyncWaitForHashTable()  ← 循环(等待 build 侧重建 hash 表)
             │
             └─ 所有 spill partition 处理完 → setState(kFinish)

5. HashAggregation Spill 实现

5.1 两种 Spiller

HashAggregation 的 spill 逻辑集中在 GroupingSet 中,区分两个阶段:

inputSpiller_  (AggregationInputSpiller):
  - 在 input 处理阶段触发(hash 表太大)
  - needSort = true:按 group key 排序
  - 分区:按 hash(group key)
  - 目的:spill 后 hash 表清空,腾出内存继续处理输入

outputSpiller_ (AggregationOutputSpiller):
  - 在 output 处理阶段触发(输出时内存不足)
  - needSort = false:已是输出顺序(hash 表 scan 顺序),不需要重排
  - 无分区(HashBitRange 为空)
  - 目的:将剩余输出行先 spill 到磁盘,之后边读边输出

两者互斥:同一个 GroupingSet 要么用 inputSpiller_,要么用 outputSpiller_,不会同时存在。

5.2 GroupingSet::spill() — input spill

HashAggregation::reclaim()ensureInputFits 调用:

void GroupingSet::spill() {
  if (table_ == nullptr || table_->numDistinct() == 0) return;

  auto* rows = table_->rows();
  VELOX_CHECK_NULL(outputSpiller_);

  // 首次 spill:初始化 inputSpiller_
  if (inputSpiller_ == nullptr) {
    // sortingKeys:按所有 group key 列排序(升序,nulls 在末尾)
    const auto sortingKeys = SpillState::makeSortingKeys(
        std::vector<CompareFlags>(rows->keyTypes().size()));
    inputSpiller_ = std::make_unique<AggregationInputSpiller>(
        rows,
        makeSpillType(),  // key 列 + accumulator 列的类型
        HashBitRange(startPartitionBit, startPartitionBit + numPartitionBits),
        sortingKeys,
        spillConfig_,
        spillStats_);
  }

  // 冻结 HashStringAllocator:spill 可能多线程执行,防止 accumulator 并发分配
  rows->stringAllocator().freezeAndExecute([&]() {
    inputSpiller_->spill();
  });

  // distinct 聚合:记录本次 spill 生成的文件数(用于后续归并去重)
  if (isDistinct() && numDistinctSpillFilesPerPartition_.empty()) {
    // 记录每个 partition 的文件数,文件 ID < 该数的是 distinct 文件
    for (int partition = 0; partition < maxPartitions; ++partition) {
      numDistinctSpillFilesPerPartition_[partition] =
          inputSpiller_->state().numFinishedFiles(SpillPartitionId(partition));
    }
  }

  // 清空 hash 表(所有行已 spill 到磁盘)
  table_->clear(/*freeTable=*/true);
}

makeSpillType() 生成 spill 类型:

RowTypePtr GroupingSet::makeSpillType() const {
  std::vector<TypePtr> types;
  // Group key 列
  for (auto& hasher : hashers_) types.push_back(hasher->type());
  // Accumulator 的中间状态类型(不是最终结果类型)
  for (auto& accumulator : accumulators(false)) {
    types.push_back(accumulator.spillType());
  }
  return ROW(names, types);
}

5.3 GroupingSet::spill(rowIterator) — output spill

HashAggregation::reclaim() 在 output 阶段调用:

void GroupingSet::spill(const RowContainerIterator& rowIterator) {
  VELOX_CHECK(!hasSpilled()); // 确保没有 inputSpiller_

  auto* rows = table_->rows();
  outputSpiller_ = std::make_unique<AggregationOutputSpiller>(
      rows, makeSpillType(), spillConfig_, spillStats_);

  rows->stringAllocator().freezeAndExecute([&]() {
    outputSpiller_->spill(rowIterator); // 从 rowIterator 指定的位置开始 spill
  });
  table_->clear(/*freeTable=*/true);
}

AggregationOutputSpiller::spill(rowIterator) 内部:

void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) {
  // 只标记单一分区(partition 0),无 hash 分区
  state_.setPartitionSpilled(SpillPartitionId(0));
  SpillerBase::spill(&startRowIter); // 从 rowIterator 位置开始迭代
}

AggregationOutputSpiller::runSpill 重写:每次 run 后立即 finishFile(保证每个 run 独立一个文件,便于后续归并)。

5.4 reclaim 触发路径

void HashAggregation::reclaim(uint64_t targetBytes,
                               memory::MemoryReclaimer::Stats& stats) {
  // 1. 先尝试 compact(清理 accumulator 的字符串存储),代价低
  const auto compactedBytes = groupingSet_->compact();
  if (compactedBytes >= targetBytes) return;

  if (!canSpill()) { /* ... */ return; }

  if (groupingSet_->hasSpilled()) {
    if (isOutputProcessing) {
      // 已在输出阶段且已 spill:无法再次 spill,报告无法回收
      return;
    }
  }

  if (isOutputProcessing) {
    // 在输出阶段:spill 从 resultIterator_ 位置起的剩余行
    groupingSet_->spill(resultIterator_);
  } else {
    // 在输入阶段:spill 所有行(hash 表全量 dump)
    groupingSet_->spill();
  }
}

5.5 getOutputWithSpill — 归并输出

groupingSet_->hasSpilled() 为 true 时,getOutput 路径切换为:

bool GroupingSet::getOutputWithSpill(int32_t maxOutputRows,
                                      int32_t maxOutputBytes,
                                      const RowVectorPtr& result) {
  if (outputSpillPartition_ == -1) {
    // 首次进入:初始化归并结构
    mergeRows_ = std::make_unique<RowContainer>(keyTypes, ...); // 临时 RowContainer
    initializeAggregates(*mergeRows_, false);

    // 将所有 spill 文件信息收集到 spillPartitionSet_
    inputSpiller_->finishSpill(spillPartitionSet_);
    // 或:outputSpiller_->finishSpill(spillPartitionSet_);

    removeEmptyPartitions(spillPartitionSet_);
    prepareNextSpillPartitionOutput(); // 打开第一个 partition 的 TreeOfLosers
  }

  // 逐 partition 归并输出
  return mergeNext(maxOutputRows, maxOutputBytes, result);
}

bool GroupingSet::prepareNextSpillPartitionOutput() {
  merge_ = nullptr;
  if (spillPartitionSet_.empty()) return false;

  auto it = spillPartitionSet_.begin();
  outputSpillPartition_ = it->first.partitionNumber();
  // createOrderedReader:构建 N 路败者树(含文件数超限时的预归并)
  merge_ = it->second->createOrderedReader(*spillConfig_, pool_, spillStats_);
  spillPartitionSet_.erase(it);
  return true;
}

5.6 mergeNextWithAggregates — 有序归并聚合

bool GroupingSet::mergeNextWithAggregates(
    int32_t maxOutputRows, int32_t maxOutputBytes, const RowVectorPtr& result) {

  bool nextKeyIsEqual{false};
  for (;;) {
    // 从败者树取下一行(带 equal 标志:与上一行 key 是否相同)
    auto [stream, equal] = merge_->nextWithEquals();

    if (stream == nullptr) {
      // 当前 partition 耗尽:输出已积累的行
      extractSpillResult(result);
      if (result->size() > 0) return true;
      // 换下一个 partition
      if (!prepareNextSpillPartitionOutput()) return false;
      continue;
    }

    if (!nextKeyIsEqual) {
      // 新的 group key:在 mergeRows_ 中创建新行,初始化 accumulators
      mergeState_ = mergeRows_->newRow();
      initializeRow(*stream, mergeState_); // 复制 key 列 + 初始化 accumulator
    }
    // 用当前行更新 accumulator(partial merge)
    updateRow(*stream, mergeState_);
    nextKeyIsEqual = equal;
    stream->pop();

    // 达到行数/字节数上限:输出
    if (!nextKeyIsEqual && (mergeRows_->numRows() >= maxOutputRows
                            || mergeRowBytes() >= maxOutputBytes)) {
      extractSpillResult(result);
      return true;
    }
  }
}

关键merge_->nextWithEquals() 利用已排序的特性,保证相同 key 的行连续出现,从而可以在流式归并的同时完成聚合(无需将整个 group 缓存在内存中)。

5.7 mergeNextWithoutAggregates — distinct 归并

对于 SELECT DISTINCT 场景,spill 文件分为两类:

  • distinct 文件(文件 ID < numDistinctSpillFilesPerPartition_):已输出过的 distinct 行
  • input 文件(文件 ID >= 阈值):新输入的行

归并时:若某个相同 key 的 run 中包含来自 distinct 文件的行,则跳过该 run(因为已经输出过);否则输出该 run 中的一行。


6. OrderBy Spill 实现

6.1 两种 Spiller

OrderBy 的 spill 通过 SortBuffer 实现,同样区分两个阶段:

inputSpiller_  (SortInputSpiller):
  - input 阶段:RowContainer 行数过多时触发
  - needSort = true:按 sort key 排序后写入
  - 无 hash 分区(HashBitRange 为空)
  - 写入一个 partition(partition 0)

outputSpiller_ (SortOutputSpiller):
  - output 阶段:已完成内排序,输出时内存不足
  - needSort = false:外部传入已排序的 SpillRows
  - 接收 sortedRows_ 中未输出部分,直接写入

6.2 SortBuffer::spillInput

void SortBuffer::spillInput() {
  if (inputSpiller_ == nullptr) {
    // sortingKeys:将所有 sort column + 其余列按 sort 顺序排列
    const auto sortingKeys = SpillState::makeSortingKeys(sortCompareFlags_);
    inputSpiller_ = std::make_unique<SortInputSpiller>(
        data_.get(),         // RowContainer
        spillerStoreType_,   // sort 列在前,其余列在后的类型
        sortingKeys,
        spillConfig_,
        spillStats_);
  }
  inputSpiller_->spill();  // fillSpillRuns → sort → writeSpill → 写磁盘
  data_->clear();           // 清空 RowContainer,腾出内存
}

spillerStoreType_ 的列顺序重排(构造时完成):

// sort 列排在最前面(归并时只需比较前几列)
for (auto i = 0; i < numSortKeys; ++i) {
  sorted_types.push_back(input->childAt(sortColumnIndices[i])->type());
  sorted_names.push_back(input->nameOf(sortColumnIndices[i]));
}
// 其余列追加在后面
for (auto i = 0; i < input->size(); ++i) {
  if (!isSortKey(i)) {
    sorted_types.push_back(input->childAt(i)->type());
    sorted_names.push_back(input->nameOf(i));
  }
}

这样 spill 文件中每行的前 N 列就是排序键,归并比较时只需读取前 N 列,节省反序列化开销。

6.3 SortBuffer::spillOutput

void SortBuffer::spillOutput() {
  if (hasSpilled()) return;                     // 已经 spill 过
  if (numOutputRows_ == sortedRows_.size()) return; // 全部输出完了

  outputSpiller_ = std::make_unique<SortOutputSpiller>(
      data_.get(), spillerStoreType_, spillConfig_, spillStats_);

  // 将 sortedRows_ 中未输出的部分([numOutputRows_, end))直接传给 outputSpiller_
  auto spillRows = SpillerBase::SpillRows(
      sortedRows_.begin() + numOutputRows_, sortedRows_.end(),
      *memory::spillMemoryPool());
  outputSpiller_->spill(spillRows);

  data_->clear();
  sortedRows_.clear(); sortedRows_.shrink_to_fit();
  finishSpill(); // output spill 只触发一次,立即收尾
}

SortOutputSpiller::spill(SpillRows) 内部:

void SortOutputSpiller::spill(SpillRows& rows) {
  auto& spillRun = createOrGetSpillRun(SpillPartitionId(0));
  spillRun.rows = SpillRows(rows.begin(), rows.end(), ...);
  // 计算 numBytes
  for (const auto* row : rows) spillRun.numBytes += container_->rowSize(row);
  markSeenPartitionsSpilled();

  runSpill(true); // rows 已按 sort key 有序,跳过排序直接写入
}

SortOutputSpiller::runSpill 重写:在父类 runSpill 后立即调用 finishFile(保证文件封尾)。

6.4 SortBuffer::finishSpill

void SortBuffer::finishSpill() {
  // inputSpiller_ 和 outputSpiller_ 互斥,只有一个会被初始化
  if (inputSpiller_ != nullptr) {
    inputSpiller_->finishSpill(spillPartitionSet_);
  } else {
    outputSpiller_->finishSpill(spillPartitionSet_);
  }
  // OrderBy 无 hash 分区,只有一个 partition
  VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
}

6.5 SortBuffer::getOutputWithSpill

void SortBuffer::prepareOutputWithSpill() {
  if (spillMerger_ != nullptr) return; // 已初始化

  VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
  // 创建 TreeOfLosers:归并所有 spill 文件(可能包含内存中剩余行)
  spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader(
      *spillConfig_, pool(), spillStats_);
  spillPartitionSet_.clear();
}

void SortBuffer::getOutputWithSpill() {
  int32_t outputRow = 0, outputSize = 0;
  while (outputRow + outputSize < output_->size()) {
    SpillMergeStream* stream = spillMerger_->next();
    VELOX_CHECK_NOT_NULL(stream);

    spillSources_[outputSize] = &stream->current();
    spillSourceRows_[outputSize] = stream->currentIndex(&isEndOfBatch);
    ++outputSize;

    if (isEndOfBatch) {
      // batch 到边界时先复制出来,再 pop 触发加载下一 batch
      gatherCopy(output_.get(), outputRow, outputSize,
                 spillSources_, spillSourceRows_, columnMap_);
      outputRow += outputSize; outputSize = 0;
    }
    stream->pop();
  }
  // 处理最后一批
  if (outputSize != 0) {
    gatherCopy(output_.get(), outputRow, outputSize, ...);
  }
  numOutputRows_ += output_->size();
}

gatherCopy 使用 columnMap_ 将 spill 文件中的列顺序(sort 列在前)映射回原始输出列顺序。


7. 关键设计决策总结

7.1 有序 vs. 无序 Spill

算子 Spill 类型 原因
HashJoin Build 无序 恢复时重建 hash 表,顺序无关
HashAggregation Input 有序(按 group key) 支持流式归并聚合,避免全量恢复
HashAggregation Output 无序(hash 表 scan 顺序) 已是输出顺序,不需要重排
OrderBy Input 有序(按 sort key) 支持多路归并产生有序输出
OrderBy Output 有序(已排序传入) 同上

7.2 两阶段 Spill(Input/Output Spiller 分离)

Aggregation 和 OrderBy 都支持两阶段 spill:

  • Input 阶段:按 hash 分区 spill,允许多次触发;之后 hash 表清空继续处理输入
  • Output 阶段:无分区,只触发一次;将剩余行 spill 后用归并输出

两阶段互斥,通过 inputSpiller_/outputSpiller_ 非空判断。

7.3 内存预留机制(proactive spill)

触发链:ensureInputFits → maybeReserve → 触发内存仲裁 → reclaim → spill

关键参数:
  minSpillableReservationPct     确保始终有足够"可 spill"预留空间
  spillableReservationGrowthPct  每次扩容幅度,平衡内存利用率与 spill 频率

设计意图:通过 proactive reservation,在真正 OOM 之前触发 spill,避免在不可中断的关键路径上被动 OOM。

7.4 HashBuild 多线程 Spill 协调

所有 peer HashBuild 线程共享同一个 joinBridge
仲裁器触发 reclaim 时:
  1. 暂停所有 peer driver(task->pause())
  2. 检查所有 peer 均可回收
  3. 统一 spill 所有 peer 的 hash 表
  4. 清空所有 peer 的表 + 释放预留

这确保所有线程的 hash 表碎片被一次性清理,而不是只清理一个线程的。

7.5 递归 Spill(HashJoin 专属)

初次 spill:bit [32:29] 用于分区 → 8 个 spill partitions
第一次递归:bit [28:26] 用于分区 → 8 个子 partitions
最多 maxSpillLevel 层(默认 3 层)

终止条件:
  1. 达到 maxSpillLevel
  2. 某 partition 能完全装入内存(不再 spill)
  3. partition 数据为空

7.6 probed flag Spill(Right/Full Outer Join)

对于 right join / full outer join,hash 表中每行有一个 probed 标记位,记录该行是否已与 probe 行匹配。Spill 时需要将此标记一同写入磁盘(作为额外的 bool 列),恢复时读回并设置回 RowContainer 的对应行。


8. 递归 Spill 流程图

HashBuild 初始化addInput(循环) → ensureInputFits内存不足?reclaim() / maybeReserve 触发仲裁spillHashJoinTable:每分区 fillSpillRuns→sort→writeSpill→磁盘 · table_->clear() · spillTriggered_=truespillTriggered?YES → spillInput 按 partition 直接写 · NO → insertIntoHashTablenoMoreInput → finishHashBuild所有 peer 完成合并 spill files → prepareJoinTable(只含未 spill 行)joinBridge_.setHashTable(table, spillPartitions)Probe 侧 probe + 同步 spill 对应 probe 行 → postHashBuildProcess有 spill partition?YES → setupSpillInput(next) → processSpillInput → addInput(从 spill 读)内存不足 → 递归 spill(bit offset += numPartitionBits) · NO → setState(kFinish) → END
Fig. HashBuild 递归 spill 全流程:内存不足触发 spill,逐 spill partition 递归恢复直到 kFinish

9. C++ 编码品味:值得学习的细节

这一章从代码本身出发,梳理 Velox spill 系统在 C++ 工程实践上值得学习的具体技法。每一条都有对应的代码位置。


9.1 SCOPE_EXIT:让清理逻辑紧贴触发逻辑

// HashBuild::finishHashBuild
SCOPE_EXIT {
  // 无论函数从哪个 return 路径退出,都必须唤醒等待的 peer
  peers.clear();
  for (auto& promise : promises) {
    promise.setValue();
  }
};
// SortBuffer::getOutput
SCOPE_EXIT {
  pool_->release(); // 每次 getOutput 返回后都归还未用预留
};

SCOPE_EXIT(底层是 folly::ScopeGuard)比 try-catch 清晰得多:清理代码与触发它的上下文紧邻,阅读者一眼就能看到"这个函数结束时会做什么",而不需要去找 finally 块或析构函数。

对比写法

// ❌ 反例:try-catch 把清理和逻辑分离
try {
  doWork();
  pool_->release(); // 容易漏掉 early return 路径
} catch (...) {
  pool_->release();
  throw;
}

// ✅ SCOPE_EXIT:写一次,所有路径都覆盖
SCOPE_EXIT { pool_->release(); };
doWork();

9.2 folly::makeGuard:异步任务的"排水阀"

// SpillerBase::runSpill
auto sync = folly::makeGuard([&]() {
  for (auto& write : writes) {
    try {
      write->move();  // 强制消费所有 pending 任务
    } catch (const std::exception&) {
      // 清理路径不能 throw
    }
  }
});

// 主线程消费结果(可能 throw)
for (auto& write : writes) {
  results.push_back(write->move()); // 若某个任务失败,这里 throw
}
// 若 results 循环 throw,guard 析构时仍会 drain 剩余任务

这里的 makeGuard 扮演"排水阀"角色:无论主逻辑成功还是抛异常,都确保所有后台异步任务被 move()(消费)掉,避免后台任务持有的资源(文件句柄、内存)泄漏。

关键细节:guard 内部的 catch (const std::exception&) 是有意的——清理路径本身不能 throw,第一个错误已经在 results 循环里被捕获了。


9.3 跨线程异常传递:exception_ptr 模式

多线程 spill 时,后台线程的异常无法直接传播到主线程。Velox 使用 std::exception_ptr 将异常"装箱":

// 后台线程(writeSpill)
try {
  // ... 做实际工作
  return std::make_unique<SpillStatus>(id, written, nullptr); // 成功
} catch (const std::exception&) {
  // 捕获任何异常,装箱为 exception_ptr
  return std::make_unique<SpillStatus>(id, 0, std::current_exception());
}

// 主线程(runSpill)
for (auto& result : results) {
  if (result->error != nullptr) {
    std::rethrow_exception(result->error); // 在主线程重新抛出
  }
  // ... 处理正常结果
}

结果对象 SpillStatus 的定义简洁体现了这个模式:

struct SpillStatus {
  SpillPartitionId partitionId;
  uint32_t rowsWritten;
  std::exception_ptr error; // nullptr 表示成功,否则携带异常
};

这使得多线程错误处理与单线程代码风格一致——主线程的调用者只需检查返回值或 catch 异常,不需要感知线程。


9.4 "1 + N" 并发策略:第一个任务不切线程

for (const auto& [id, spillRun] : spillRuns_) {
  writes.push_back(
      memory::createAsyncMemoryReclaimTask<SpillStatus>(
          [partitionId = id, this]() { return writeSpill(partitionId); }));

  // 只有第 2 个及之后的任务才提交到后台 executor
  if ((writes.size() > 1) && executor_ != nullptr) {
    executor_->add([source = writes.back()]() { source->prepare(); });
  }
}

// 主线程顺序 move(),第一个任务在这里内联执行
for (auto& write : writes) {
  results.push_back(write->move()); // 若未 prepare(),move() 内联执行 lambda
}

设计精妙处AsyncSource::move() 的语义是"如果还没执行就内联执行,已经执行就等待结果"。第一个任务从未被 prepare(),所以在 move() 调用时内联执行于主线程。

好处:

  • 若只有一个 partition(常见情况),完全不产生线程切换开销
  • 主线程始终有工作做,不会空等
  • N 个后台线程 + 主线程并发,吞吐量最大化

HashProbe::spillOutput 对多个 probe 算子也用了完全一样的模式。


9.5 CHECK vs DCHECK:按热度分配断言代价

// 生产代码的不变式用 VELOX_CHECK(release 也执行)
VELOX_CHECK(!finalized_);                       // finishSpill 之后不能再写
VELOX_CHECK_EQ(numWritten, run.rows.size());    // 写出的行数必须对得上

// 热路径上的显然正确断言用 VELOX_DCHECK(只在 debug 执行)
VELOX_DCHECK_GE(partitionNum, 0);              // hash 分区号必然 >= 0(inner loop)
VELOX_DCHECK(isPartitionSpilled(id));          // appendToPartition 前必须已设置

原则:

  • VELOX_CHECK_* 用于外部可观察的不变式,违反时意味着调用方 bug 或状态异常,即使在 release 也要发现
  • VELOX_DCHECK_* 用于内部实现的显然正确断言,理论上不可能失败,加它只是为了 debug 期间辅助排查

fillSpillRuns 的内层循环(每行都执行一次)里用 VELOX_DCHECK 而非 VELOX_CHECK,避免 release 构建中每行多一次分支,对亿级行场景有实际性能影响。


9.6 FOLLY_LIKELY / FOLLY_UNLIKELY:把预期写进代码

// createOrGetSpillRun:绝大多数情况 spillRun 已存在
if (FOLLY_UNLIKELY(!spillRuns_.contains(id))) {
  spillRuns_.emplace(id, SpillRun(*memory::spillMemoryPool()));
}

// getOutputWithSpill:几乎每行都需要复制,到达 batch 边界是例外
if (FOLLY_UNLIKELY(isEndOfBatch)) {
  gatherCopy(...);
}

// 最后一批输出大多非空
if (FOLLY_LIKELY(outputSize != 0)) {
  gatherCopy(...);
}

FOLLY_LIKELY/UNLIKELY 不仅是编译器分支预测提示,更是可执行的注释——它明确告诉读者"作者预期这条路径是 hot/cold 的"。这比写 // rarely happens 注释更可靠,因为注释会过期,而这行代码会随着逻辑一起维护。


9.7 TestValue::adjust:无侵入式测试注入

// SpillerBase 构造函数
TestValue::adjust("facebook::velox::exec::SpillerBase", this);

// HashBuild::addInput 入口
TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this);

// HashBuild::reclaim 入口
TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

TestValue::adjust 在生产代码中是空操作(编译器会完全优化掉)。在测试中,可以向特定注入点注册回调,在精确时机修改状态或注入错误,无需任何条件编译。

配套的 TestScopedSpillInjection 更进一步:

// 测试代码:
TestScopedSpillInjection injection(20, ".*", 10);
// poolName 匹配 ".*" 时,testingTriggerSpill() 有 20% 概率返回 true,最多触发 10 次

// 生产代码:
if (testingTriggerSpill(pool_->name())) {
  spill(); // 测试强制触发 spill
}

这使得 spill 的边界条件测试("在恰好第 3 次 addInput 时触发 spill")变得非常精确,而生产代码无任何额外开销。


9.8 NanosecondTimer:RAII 计时的最小实现

uint64_t execTimeNs{0};
{
  NanosecondTimer timer(&execTimeNs);  // 构造时记录起始时间
  // ... 被测代码 ...
} // 析构时写入 execTimeNs

spillStats_->spillFillTimeNanos.fetch_add(execTimeNs, std::memory_order_relaxed);

几个值得注意的细节:

① 先积累,再 fetch_add:不在块内直接 fetch_add,而是先存到局部变量,块结束后一次 atomic 操作。减少 atomic 操作次数,也使代码结构更清晰。

**② memory_order_relaxed**:spill 统计不需要与其他内存操作同步,用 relaxed 避免不必要的内存屏障。

③ 一个特殊处理

// NOTE: Always set a non-zero sort time to avoid flakiness in tests which check sort time.
updateSpillSortTime(std::max<uint64_t>(1, sortTimeNs));

即使排序耗时 0 纳秒(几乎不可能但理论上存在),也写入 1,防止测试误判"排序未发生"。这是一个小而精的防御性设计。


9.9 SpillPartitionId:把所有接口设施配齐

SpillPartitionId 是一个需要在多种容器中使用的值类型,代码为它配齐了所有标准设施:

// 1. 比较运算符:支持 std::map(有序容器)
bool operator<(const SpillPartitionId& other) const;
bool operator>(const SpillPartitionId& other) const {
  return other < *this; // 复用 < 实现 >,不重复逻辑
}
bool operator==(const SpillPartitionId& other) const = default; // C++20 default

// 2. std::hash 特化:支持 folly::F14FastMap/Set
namespace std {
template <>
struct hash<SpillPartitionId> {
  uint32_t operator()(const SpillPartitionId& id) const {
    return std::hash<uint32_t>()(id.encodedId()); // 直接哈希底层整数,O(1)
  }
};
}

// 3. fmt::formatter 特化:支持 fmt::format("{}", id)
template <>
struct fmt::formatter<SpillPartitionId> : formatter<std::string> {
  auto format(SpillPartitionId s, format_context& ctx) const {
    return formatter<std::string>::format(s.toString(), ctx);
  }
};

// 4. operator<< :支持 LOG(INFO) << id
inline std::ostream& operator<<(std::ostream& os, SpillPartitionId id) {
  return os << id.toString();
}

operator> 通过调用 operator< 实现而非独立写逻辑,是很好的习惯——减少了两个实现可能不一致的风险。


9.10 withWLock / withRLock:锁的最小化持有

// SpillState::appendToPartition
partitionWriters_.withWLock([&](auto& lockedWriters) {
  // 只在创建 writer 时持有写锁
  if (!lockedWriters.contains(id)) {
    lockedWriters.emplace(id, std::make_unique<SpillWriter>(...));
  }
}); // 锁在此释放

// 实际写入在锁外进行(不同 partition 的写入完全并发)
return partitionWriter(id)->write(rows, ...);
// testingSpilledFilePaths(只读,用 rlock)
partitionWriters_.withRLock([&](const auto& partitionWriters) {
  for (const auto& [id, writer] : partitionWriters) {
    // ... 读取
  }
});

folly::SynchronizedwithWLock/withRLock lambda 接口使锁的范围在代码结构上可见——lambda 的括号就是临界区的边界,不需要手动 lock_guard

关键设计:写锁只保护"创建 writer"(初始化一次),不保护"写数据"。这是因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突,无需锁保护。最快的锁是不持有锁


9.11 SpillRun::sorted:防止双重排序的状态位

struct SpillRun {
  SpillRows rows;
  uint64_t numBytes{0};
  bool sorted{false}; // 是否已排序
};

void SpillerBase::ensureSorted(SpillRun& run) {
  if (run.sorted || !needSort()) { // 已排序则跳过
    return;
  }
  // ... 执行排序
  run.sorted = true; // 标记,防止重复排序
}

这个 sorted 标志看起来很简单,但它解决了一个微妙的 bug 风险:如果 ensureSorted 被意外调用两次(代码重构时很容易发生),不加保护会导致正确排好序的数据被 re-sort 破坏(稳定排序会保持顺序,但非稳定排序可能打乱)。一个布尔标志,一个 guard,防御性地堵死了这条 bug 路径。


9.12 pool_->release():主动归还,而非等待回收

// SortBuffer::noMoreInput()
// Releases the unused memory reservation after processing input.
pool_->release();

// SortBuffer::getOutput()
SCOPE_EXIT {
  pool_->release();
};

// HashBuild::ensureTableFits() 中扩容后
if (spiller_->spillTriggered()) {
  pool()->release(); // spill 已触发,扩容的预留用不上了,主动归还
}

pool_->release() 的语义是"我承诺不再需要当前预留的超额内存"。Velox 在每个"阶段完成"的节点系统性地调用它,而不是等待 GC 或析构。

这是 push 模型而非 pull 模型:算子主动告诉内存池"我这块不要了",内存池立即可以分配给其他需要者。在多算子并发的大查询中,这种主动性使内存回转速度快得多。


9.13 succinctBytes():有信息量的日志

LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
             << " for memory pool " << pool_->name()
             << ", root pool: " << pool_->root()->name()
             << ", used: " << succinctBytes(pool_->usedBytes())
             << ", reservation: " << succinctBytes(pool_->reservedBytes())
             << ", root pool reservation: "
             << succinctBytes(pool_->root()->reservedBytes());

几个细节值得学习:

**① succinctBytes()**:自动选择合适单位(B/KB/MB/GB),比裸输出字节数可读性高 10 倍。

② 固定的信息层次pool 名 → root pool 名 → 已用 → 已预留 → root 已预留。这个模板在整个 spill 系统中完全一致,运维人员看一眼就知道去哪里找信息,不需要猜测每条日志的格式。

③ WARNING 而非 ERROR:预留失败不是错误,只是触发 spill 的信号。级别选择准确。


9.14 testing* 前缀:测试可观测性的统一约定

// SpillState::testingSpilledFilePaths()
// SpillState::testingSpilledFileIds()
// SpillState::testingNonEmptySpilledPartitionIdSet()
// HashBuild::testingExceededMaxSpillLevelLimit()
// HashProbe::testingHasInputSpiller()
// HashProbe::testingExceededMaxSpillLevelLimit()

所有为测试暴露的内部状态方法,一律以 testing 前缀命名。这个约定的价值:

  • 接口清晰度:任何 testing* 方法,调用者立刻知道"这不是业务接口"
  • grep 友好grep 'testing[A-Z]' 精确列出所有测试专用接口,代码审查时可以专门审视
  • 永不 friend:Velox 明确禁止 friend 声明(CLAUDE.md 有规定),测试访问内部状态只能通过这类 accessor,倒逼设计者只暴露真正需要的状态,而非用 friend 开后门

9.15 constexpr 局部常量:让魔数有名字

void SpillerBase::fillSpillRuns(...) {
  constexpr int32_t kHashBatchSize = 4096; // 每次从 RowContainer 取多少行
  // ...
}

std::unique_ptr<SpillerBase::SpillStatus> SpillerBase::writeSpill(...) {
  constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K
  constexpr int32_t kTargetBatchRows = 64;
  // ...
}

这些常量定义在函数内部而非文件级别,有两个好处:

① 最小作用域kHashBatchSize 只在 fillSpillRuns 里有意义,定义在全局或类级别会误导读者以为它有更广泛的语义。

② 紧贴使用处:常量定义和使用之间距离最短,读者不需要跳转文件来理解这个数字的含义。

1 << 18262144 更清楚地表达了"这是 2 的幂次"的意图,同时命名为 kTargetBatchBytes 说明了它是目标(上限)而非精确值。


编码品味小结

技法 核心价值 代表位置
SCOPE_EXIT 清理与触发紧邻,所有 return 路径都覆盖 SortBuffer::getOutput, HashBuild::finishHashBuild
folly::makeGuard 排水阀 异步任务无论成败都被消费,防止资源泄漏 SpillerBase::runSpill
exception_ptr 跨线程传递 后台线程异常装箱,主线程重抛,调用方无感知 SpillerBase::writeSpill / runSpill
"1 + N" 并发 首任务内联,后续并发,单 partition 时零开销 SpillerBase::runSpill
DCHECK 分热路径 release 不付 inner loop 断言代价 fillSpillRuns inner loop
FOLLY_LIKELY/UNLIKELY 可执行的"热度注释",不会过期 createOrGetSpillRun, getOutputWithSpill
TestValue::adjust 无侵入注入,生产零开销 所有关键函数入口
NanosecondTimer RAII 单行计时,早 return 安全 fillSpillRuns, ensureSorted
配齐值类型接口 <, >, ==, hash, formatter, << 一起提供 SpillPartitionId
withWLock 最小临界区 锁只保护初始化,写操作在锁外 SpillState::appendToPartition
sorted 防御位 防双重排序,用状态堵死隐患 SpillRun / ensureSorted
pool_->release() 主动归还 push 模型快速回转,而非等待析构 noMoreInput, getOutput
succinctBytes() 统一日志格式 固定信息层次,可读,可预期 所有 LOG(WARNING)
testing* 前缀约定 测试接口一眼可辨,禁用 friend 倒逼最小暴露 所有 testing* 方法
constexpr 局部常量 最小作用域,紧贴使用处,有名字 kHashBatchSize, kTargetBatchRows

10. 架构设计解读:优雅背后的工程思考

Velox spill 系统的代码量约 12,000 行,但它的设计并不复杂——真正值得品味的,是每一处决策背后的约束权衡系统性思考。这一章从"为什么要这样设计"出发,梳理其核心设计哲学。


10.1 最根本的约束:OOM 不能是答案

数据库引擎处理大数据时,内存不可能无限大。任何算子都需要回答这个问题:当数据超过可用内存时,怎么办?

最简单的答案是抛出 OOM 异常。但这会让用户的查询白白失败,代价极高。Velox 的选择是:任何时候都不能因为数据量大而失败,必须能完成查询(在 spill 开启的情况下)。

这一承诺对系统设计造成了深远影响:

  • 不能假设数据能放进内存——每一个数据结构都要有"放不下时的出路"
  • 不能只支持一次 spill——必须支持递归 spill(spill 后恢复,恢复时再次 OOM,再次 spill)
  • 不能只在"好时机"才 spill——必须能在任意时刻被打断(内存仲裁 reclaim

这三点决定了架构的基本形态。


10.2 分层:每层只管自己的事

整个 spill 系统被分成 5 层,每层接口极简:

算子层     → 决定「何时」和「触发哪种」spill
Spiller层  → 决定「如何」把 RowContainer 内容写到磁盘(行→列,分区,排序)
SpillState → 决定「写到哪个文件」
SpillFile  → 决定「如何」序列化/反序列化(PrestoPage 格式)
SpillConfig→ 决定「用什么参数」

这种分层带来的核心好处不是"解耦"这么虚的东西,而是每一层可以独立演进

  • 要换序列化格式?只改 SpillFile,上面所有层不感知
  • 要支持新算子 spill?只实现新的 Spiller 子类,底层文件 I/O 不变
  • 要调整内存触发策略?只改算子层的 ensureInputFits,文件写入逻辑不变

这在一个长期演进的系统里价值巨大。HashJoin、HashAggregation、OrderBy 三类算子的 spill 逻辑差异极大,但它们复用了同一套文件 I/O 和统计框架,维护成本大幅降低。


10.3 分区:把"全量恢复"变成"分批恢复"

Spill 最天真的设计是:内存不够时,把所有数据写到一个文件;恢复时,把整个文件读回来。

问题在于:如果数据总量是内存的 10 倍,一次性读回仍然 OOM。

Velox 的解法是按 hash 分区

数据 → hash(key) → partition 0, 1, 2, ..., 7(8 个分区)

恢复时:
  一次只加载 partition 0 → 处理完 → 加载 partition 1 → ...
  每次内存中只有 1/8 的数据

这个设计的精妙之处在于:分区和处理逻辑是解耦的。HashBuild 恢复 partition 0 时,它完全复用了正常的 addInputinsertHashTable 路径,没有任何特殊处理。"从 spill 文件恢复" 和 "从上游算子输入" 在代码路径上几乎完全一样。

SpillPartitionId 的 bit 编码设计支持这种分批恢复天然地扩展到多层递归:

Level 0:bit[31:29] = 0, 分区号存在 bit[2:0]  → 8 个 partition
Level 1:bit[31:29] = 1, 分区号存在 bit[5:3]  → 每个 Level 0 partition 又有 8 个子 partition
Level 2:...

当 Level 0 的某个 partition 恢复时仍然 OOM,直接用下一层 bit 范围再 spill。恢复路径完全复用,递归深度理论上无限(受 maxSpillLevel 限制)。


10.4 有序与无序的分叉:根据"读回时的需求"决定

Spill 时要不要排序,完全由读回时的需求决定,而不是写入时的方便:

算子 读回时的需求 要不要排序
HashJoin Build 重建 hash 表(插入顺序无关) 不排序(节省 CPU)
HashAggregation 流式归并聚合(需要相同 key 相邻) 排序(按 group key)
OrderBy 多路归并产生有序输出(需要各 run 内有序) 排序(按 sort key)

这个设计的价值在于:对于不需要排序的场景,完全不付排序的代价。HashJoin 是最常见的 spill 场景,它不排序节省了大量 CPU 时间。

而对于需要排序的场景(Aggregation、OrderBy),排序在 spill 时一次性完成,读回时的 N 路归并就是线性扫描,极其高效。如果不在 spill 时排序,读回时就必须全量加载再排序,峰值内存反而更高。


10.5 两阶段 Spill:尊重算子的生命周期

Aggregation 和 OrderBy 各有两个 Spiller(input/output),这不是过度设计,而是对算子生命周期的精确建模:

Input 阶段:数据持续流入 → 多次触发 spill → hash 表/排序缓冲反复清空重填
Output 阶段:数据已全部处理 → 只可能触发一次 spill → 之后只有读出

两个阶段的需求截然不同:

Input 阶段 spill(AggregationInputSpiller / SortInputSpiller)

  • 需要 hash 分区(支持分批恢复)
  • 需要排序(支持流式归并)
  • 可以多次触发(每次清空 hash 表继续处理输入)

Output 阶段 spill(AggregationOutputSpiller / SortOutputSpiller)

  • 无需分区(只触发一次,全量写出)
  • 无需排序(数据已经就绪,直接写入已有顺序)
  • 只触发一次(之后只读,无需再写)

用同一个 Spiller 处理两个阶段会导致代码复杂化,且无法针对性优化。分开设计使每个 Spiller 的逻辑极度简单。


10.6 SpillRun 切分:在多个约束之间找到平衡点

maxSpillRunRows 参数控制每次 fill-run 的行数,这个设计同时解决了四个互不相关的问题:

约束 1:指针数组内存(SpillRun.rows 每行 8 字节,1 亿行 = 800MB)
约束 2:排序辅助内存(大 run 排序 cache miss 严重)
约束 3:PrestoPage 2GB 序列化限制
约束 4:内存仲裁的观测粒度(每 run 之间才能看到内存下降)

一个参数,四个收益,这是典型的"一石多鸟"设计。

值得注意的是,这个设计并没有在代码里写四个 // TODO: 原因X,而是通过一个统一的机制自然地满足了所有约束。这种"机制而非策略"的思维是优雅设计的标志。


10.7 批量行列转换:峰值内存与 I/O 效率的双重优化

extractSpillVector 每次只处理最多 64 行或 256KB,这个"小批量"设计同样服务于两个目标:

峰值内存

不切分时:SpillRun 10 万行 → 物化为一个 10 万行 RowVector → 可能数 GB
切分后:  每次 64 行 RowVector → 几百 KB → prepareForReuse 复用同一块内存
          峰值 = 单批大小,与 run 总大小无关

I/O 效率

SpillWriter 有 writeBufferSize 写缓冲(默认几 MB)
每次 appendToPartition 追加到缓冲,满了才刷盘
64 行 ≈ 几十 KB → 多次 append 后缓冲满 → 一次大 write()
这比每 10 万行一次大 write() 的 I/O 效率相当,但内存峰值小得多

64 这个数字也不是随意的——它使得每列 FlatVector 的数据(64 × sizeof(int64) = 512B)恰好能放入 L1 cache,extractColumn 的内层循环具有最优的 cache 利用率。


10.8 主动预留 vs. 被动仲裁:两道防线

Velox 的内存触发有两条路径,共同构成防 OOM 的双重保险:

防线 1 — 主动预留(proactive):
  ensureInputFits → maybeReserve(targetIncrementBytes)
    ├─ 成功:继续处理(预留了足够空间)
    └─ 失败:[内存仲裁在此可能已触发 reclaim]

防线 2 — 被动仲裁(reactive):
  内存仲裁器观测到系统内存紧张 → 调用 reclaim()
    └─ 强制 spill 当前算子

两者的分工:

  • 防线 1 发生在算子自己的处理循环中,知道"我即将需要多少内存",可以精确预留
  • 防线 2 发生在任意时刻,无需算子配合,是最后的兜底手段

ReclaimableSectionGuard 是连接两者的桥梁——算子在调用 maybeReserve 前设置它,告诉仲裁器"现在可以来抢我的内存"。这样即使 maybeReserve 本身触发了全局内存整理,算子也能安全地被 spill。


10.9 NoRowContainerSpiller:让接口适应现实,而非让现实适应接口

HashProbe 侧 spill 时,行数据在 RowVectorPtr 里,根本不经过 RowContainer。如果强制所有 spill 都走 RowContainer → extractSpill → 磁盘 的路径,就需要先把 RowVector 插入 RowContainer 再提取出来,白白多一次转换。

NoRowContainerSpiller 的存在体现了一个原则:抽象应该覆盖真实场景,而不是把真实场景削足适履。它直接接受 RowVectorPtr,跳过整个 fill/extract 流程,只保留写文件这一核心功能。

同样的思路也体现在 SortOutputSpiller 上:sortedRows_ 已经是排好序的行指针数组,如果再经过 fillSpillRuns → 按 hash 分区 → sort 这个流程,不仅多余还会破坏已有的顺序。它重写接口,接受外部已排序的 SpillRows,直接写入。


10.10 HashJoinBridge:异步协调的最小化接口

Build 和 Probe 在不同线程运行,需要协调三件事:

  1. Build 完成 → 通知 Probe 开始
  2. Probe 完成 → 通知 Build 恢复 spill partition
  3. Build/Probe 任意一方 spill → 另一方配合

HashJoinBridge 用三个接口覆盖了全部场景:

setHashTable(table, spillPartitionSet) // Build → Probe:我建好了,这些 partition 被 spill 了
probeFinished()                        // Probe → Build:我探测完了,请处理下一个 spill partition
appendSpilledHashTablePartitions(set)  // Probe reclaim → Build:我 spill 了你的 hash 表

最优雅的设计是:这三个接口覆盖了所有递归层级,无论是第 0 层还是第 3 层递归,调用的接口完全一样。递归深度的增加不需要任何新的协调原语。

对比一下如果没有这种设计,每一层递归都要单独管理 future/promise 对,代码复杂度会指数级增长。


10.11 DictionaryVector wrap:零拷贝分区

Probe 侧 spillInput 需要将一个 batch 的行按 partition 分发到多个 spill 文件。最直接的做法是按 partition 拷贝行数据,但这意味着每行数据被读一次、写一次——对于宽行(含字符串列)代价极高。

Velox 的解法是 DictionaryVector

原始 input:[row0, row1, row2, row3, row4]
partition 0 的 indices buffer:[1, 3]
partition 1 的 indices buffer:[0, 2, 4]

wrap(2, partition0_buffer, input) → DictionaryVector{indices=[1,3], base=input}
  ↓ 序列化时才真正读取 row1, row3 的数据

这使得"分区"这个操作的代价只是分配 indices buffer 和一次 hash 计算,不涉及任何数据拷贝。真正的 I/O 推迟到 appendToPartition → SpillWriter::write → 序列化,此时数据只被读一次、写一次。

这是"懒求值"(lazy evaluation)思想在 spill 路径中的具体应用。


10.12 小结:设计模式归纳

回顾整个 spill 系统,可以归纳出几个贯穿始终的设计模式:

① 约束驱动设计(Constraint-Driven Design) 每个设计决策都能追溯到一个具体约束:SpillRun 切分 → PrestoPage 2GB 限制;64 行批次 → L1 cache;DictionaryVector → 零拷贝。没有无目的的"为了灵活性而灵活性"。

② 机制 vs. 策略分离(Mechanism vs. Policy Separation) SpillerBase 提供机制(fill/sort/write 的流程),各子类通过 needSort()、重写 extractSpill() 等钩子注入策略。算子层决定"何时 spill",Spiller 层决定"如何 spill",文件层决定"如何存储"。

③ 路径复用(Path Reuse) HashBuild 恢复 spill partition 时走的是和正常处理输入完全相同的 addInput 路径。HashProbe restore 轮也走同一套 addInput → spillInput → probe 路径。不为"恢复"写特殊逻辑,而是让恢复成为正常路径的自然一部分。这减少了测试覆盖的难度,也降低了特殊路径引入 bug 的风险。

④ 懒求值(Lazy Evaluation) 数据尽可能晚才物化:行指针不拷贝数据(SpillRun)→ DictionaryVector 不拷贝行(probe 分区)→ lazy 列在 spill 前才 loadedVector()。每一层都只在真正需要时才付出代价。

⑤ 尊重生命周期(Lifecycle Awareness) Input Spiller 和 Output Spiller 的分离不是技术必要性,而是对算子语义的尊重:input 阶段和 output 阶段的资源需求、触发频率、分区需求都不同,强行合并会产生大量条件分支,拆开则各自极简。


这套设计的优雅,不在于它的聪明,而在于它的诚实——每个决策都清楚地知道自己在解决什么约束,不做多余的事,也不留下欠债。


Velox Spiller
#Velox #QueryExecution · 2025-11-30