1. 整体架构 Velox 的 spill 系统是一个分层架构,每层职责明确:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ┌─────────────────────────────────────────────────────────────────┐ │ 算子层 HashBuild / GroupingSet(HashAggregation) / SortBuffer │ │ 触发时机:ensureInputFits / ensureOutputFits / reclaim() │ ├─────────────────────────────────────────────────────────────────┤ │ Spiller 层 SpillerBase 及子类 │ │ 职责:从 RowContainer 提取行 → 按 hash 分区 → 排序 → 批量写入 │ │ 子类:HashBuildSpiller / AggregationInputSpiller / │ │ AggregationOutputSpiller / SortInputSpiller / │ │ SortOutputSpiller / NoRowContainerSpiller / MergeSpiller │ ├─────────────────────────────────────────────────────────────────┤ │ 状态管理层 SpillState │ │ 职责:管理每个分区的 SpillWriter,追踪已 spill 分区集合 │ ├─────────────────────────────────────────────────────────────────┤ │ 分区管理层 SpillPartitionId / SpillPartition / SpillPartitionSet│ │ 职责:层级分区 ID,持有 SpillFiles,提供有序/无序读取接口 │ ├─────────────────────────────────────────────────────────────────┤ │ 文件 I/O 层 SpillWriter / SpillReadFile │ │ 职责:RowVector ↔ PrestoPage 序列化/反序列化,管理多文件切分 │ ├─────────────────────────────────────────────────────────────────┤ │ 配置与统计层 SpillConfig / SpillStats │ │ 职责:全局配置,原子统计计数器 │ └─────────────────────────────────────────────────────────────────┘
核心设计理念 :
分区化 :spill 数据按 hash key 分区,使得每次只需将一个分区加载回内存,避免全量恢复
递归 spill :若恢复某分区后仍 OOM,可对该分区再次 spill(最多 maxSpillLevel 层)
有序与无序分离 :sort-based 算子(Aggregation/OrderBy)spill 时保序,以支持归并输出;HashJoin 无需排序
两阶段 spill :Aggregation/OrderBy 区分 input 阶段与 output 阶段,各用不同的 Spiller
2. 基础设施层详解 2.1 SpillConfig — 配置层 文件:velox/common/base/SpillConfig.h
SpillConfig 是所有 spill 行为的控制中枢。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 struct SpillConfig { GetSpillDirectoryPathCB getSpillDirPathCb; UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb; std::string fileNamePrefix; uint64_t maxFileSize; uint64_t writeBufferSize; uint64_t readBufferSize; folly::Executor* executor; int32_t minSpillableReservationPct; int32_t spillableReservationGrowthPct; uint64_t maxSpillRunRows; uint64_t writerFlushThresholdSize; uint8_t startPartitionBit; uint8_t numPartitionBits; int32_t maxSpillLevel; uint32_t numMaxMergeFiles; std::optional<PrefixSortConfig> prefixSortConfig; common::CompressionKind compressionKind; std::string fileCreateConfig; uint32_t windowMinReadBatchRows; };
关键计算方法 :
1 2 3 4 5 int32_t spillLevel (uint8_t startBitOffset) const ;bool exceedSpillLevelLimit (uint8_t startBitOffset) const ;
递归 spill 时,每下一层 startPartitionBit 向右移 numPartitionBits 位,spillLevel 就是 (startBitOffset - config.startPartitionBit) / numPartitionBits。
2.2 SpillStats — 统计层 文件:velox/exec/SpillStats.h
所有字段都是 std::atomic,线程安全,支持跨线程并发更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 struct SpillStats { std::atomic_uint64_t spillRuns; std::atomic_uint64_t spilledRows; std::atomic_uint64_t spilledInputBytes; std::atomic_uint64_t spilledBytes; std::atomic_uint32_t spilledPartitions; std::atomic_uint64_t spilledFiles; std::atomic_uint64_t spillMaxLevelExceededCount; std::atomic_uint64_t spillFillTimeNanos; std::atomic_uint64_t spillSortTimeNanos; std::atomic_uint64_t spillExtractVectorTimeNanos; std::atomic_uint64_t spillSerializationTimeNanos; std::atomic_uint64_t spillFlushTimeNanos; std::atomic_uint64_t spillWriteTimeNanos; std::atomic_uint64_t spillWrites; std::atomic_uint64_t spillReadBytes; std::atomic_uint64_t spillReads; std::atomic_uint64_t spillReadTimeNanos; std::atomic_uint64_t spillDeserializationTimeNanos; IoStats ioStats; };
每个算子持有自己的 SpillStats 对象(通过 spillStats_.get() 传递),同时每次更新都调用对应的 updateGlobal*() 函数汇总到进程级全局统计(globalSpillStats()),供 query profile 使用。
2.3 SpillFile — 文件 I/O 层 文件:velox/exec/SpillFile.h/.cpp
SpillFileInfo — 元数据 1 2 3 4 5 6 7 8 9 struct SpillFileInfo { uint32_t id; RowTypePtr type; std::string path; uint64_t size; std::vector<SpillSortKey> sortingKeys; common::CompressionKind compressionKind; }; using SpillFiles = std::vector<SpillFileInfo>;
SpillWriter — 写路径 继承 SerializedPageFileWriter(底层用 PrestoPage 格式)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 构造参数: type, sortingKeys : 数据类型与排序键 compressionKind : 压缩方式 pathPrefix : 文件路径前缀,实际路径为 prefix + "_" + fileId targetFileSize : 单文件目标大小 writeBufferSize : 写缓冲区大小 updateAndCheckSpillLimitCb : quota 回调 关键流程: write(rows, ranges) → 序列化 RowVector 片段,写入缓冲 finishFile() → 强制 flush + 关闭当前文件,记录 SpillFileInfo finish() → 收尾所有文件,返回 SpillFiles 列表 文件切分逻辑(父类 SerializedPageFileWriter 实现): 每次 write 后,若当前文件大小 >= targetFileSize → 自动 finishFile()
每次 updateFileStats() 回调会调用 updateAndCheckSpillLimitCb,若全局 spill 字节超过 query limit 则抛 VELOX_SPILL_LIMIT_EXCEEDED 异常。
SpillReadFile — 读路径 1 2 3 create(fileInfo, bufferSize, pool, stats) → 打开文件,设置读缓冲 nextBatch(batch) → 反序列化下一 batch RowVector → 若文件系统支持 async IO,双缓冲预读
2.4 SpillPartitionId — 层级分区标识 文件:velox/exec/Spill.h:277
这是支持递归 spill 的核心数据结构。用单个 uint32_t 编码完整的层级路径:
1 2 3 4 5 6 7 8 9 10 11 Bit 布局(低位在前): bits [2:0] : Level 1 分区号(最多 8 个) bits [5:3] : Level 2 分区号 bits [8:6] : Level 3 分区号 bits [11:9] : Level 4 分区号 bits [28:12] : 未使用 bits [31:29] : 当前所在层级(0-3) 常量: kMaxSpillLevel = 3 (最多 4 层,层级编号 0~3) kMaxPartitionBits = 3 (每层最多 8 分区)
构造方式 :
1 2 SpillPartitionId (uint32_t partitionNumber); SpillPartitionId (SpillPartitionId parent, uint32_t n);
有序性 (operator<):
1 2 3 4 5 6 排序规则: 1. 相同父节点的子分区按分区号从小到大排列 2. 不同层级时,按层级路径字典序比较 3. 浅层节点排在其子节点前面(DFS 前序) 示例:p_0 < p_1 < p_2_0 < p_2_1 < p_2_2 < p_3
这个有序性使 SpillPartitionSet(std::map<SpillPartitionId, ...>)能以 DFS 前序遍历,确保处理完父 partition 后再处理其子 partition(递归 spill 场景)。
2.5 SpillState — 分区状态管理 文件:velox/exec/Spill.h:581, Spill.cpp:131
每个 Spiller 持有一个 SpillState,负责管理所有分区的写入器:
1 2 3 4 5 6 7 8 class SpillState { SpillPartitionIdSet spilledPartitionIdSet_; folly::Synchronized<SpillPartitionWriterSet> partitionWriters_; };
**核心方法 appendToPartition**:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 uint64_t SpillState::appendToPartition ( const SpillPartitionId& id, const RowVectorPtr& rows) { auto spillDir = getSpillDirPathCb_ (); partitionWriters_.withWLock ([&](auto & lockedWriters) { if (!lockedWriters.contains (id)) { lockedWriters.emplace (id, std::make_unique <SpillWriter>( rows->type (), sortingKeys_, compressionKind_, fmt::format("{}/{}-spill-{}" , spillDir, fileNamePrefix_, id.encodedId ()), targetFileSize_, writeBufferSize_, ...)); } }); validateSpillBytesSize (rows->estimateFlatSize ()); updateSpilledInputBytes (bytes); IndexRange range{0 , rows->size ()}; return partitionWriter (id)->write (rows, folly::Range <IndexRange*>(&range, 1 )); }
关键设计 :partitionWriters_ 只在创建 writer 时加 wLock,实际 write() 不加锁——因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突。
2.6 SpillPartition / SpillPartitionSet — 读回路径 文件:velox/exec/Spill.h:457
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class SpillPartition { SpillPartitionId id_; SpillFiles files_; uint64_t size_; std::unique_ptr<UnorderedStreamReader<BatchStream>> createUnorderedReader (...); std::unique_ptr<TreeOfLosers<SpillMergeStream>> createOrderedReader (...); std::vector<std::unique_ptr<SpillPartition>> split (int numShards); }; using SpillPartitionSet = std::map<SpillPartitionId, std::unique_ptr<SpillPartition>>;
createOrderedReader 的防 FD 耗尽机制 :
1 2 3 4 5 6 7 若 files_.size() > numMaxMergeFiles(且 numMaxMergeFiles >= 2): 1. 将文件分组,每组最多 numMaxMergeFiles 个 2. 对每组文件创建 TreeOfLosers 做局部归并,写出中间文件 3. 递归,直到文件数 <= numMaxMergeFiles 4. 最终对剩余文件建 TreeOfLosers 返回 这避免了同时打开 O(所有文件) 个文件描述符导致的 OOM 或 FD 耗尽。
2.7 读流层:SpillMergeStream / BatchStream 文件:velox/exec/Spill.h:56
BatchStream — 无序批次流(HashJoin 用):
1 2 3 接口:nextBatch(batch) → bool 实现:FileSpillBatchStream - 单文件 ConcatFilesSpillBatchStream - 多文件顺序串联
SpillMergeStream — 有序合并流(Aggregation/OrderBy 用):
1 2 3 4 5 6 接口:hasData() / currentIndex() / current() / pop() 实现:FileSpillMergeStream - 单文件 ConcatFilesSpillMergeStream - 多文件顺序串联(文件间已有序) 关键方法 compare():按 sortingKeys_ 逐列比较当前行,供 TreeOfLosers 使用 关键方法 pop():消费当前行;若当前 batch 用完,调用 nextBatch() 加载下一批
TreeOfLosers<SpillMergeStream> 是 N 路归并排序的败者树实现,next() 每次返回所有流中最小的行所在的流。
3. Spiller 层详解 3.1 SpillerBase — 抽象基类 文件:velox/exec/Spiller.h:29, Spiller.cpp:31
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class SpillerBase { protected : RowContainer* const container_; folly::Executor* const executor_; const HashBitRange bits_; const RowTypePtr rowType_; const uint64_t maxSpillRunRows_; const std::optional<SpillPartitionId> parentId_; bool finalized_{false }; SpillState state_; folly::F14FastMap<SpillPartitionId, SpillRun> spillRuns_; virtual bool needSort () const = 0 ; virtual std::string type () const = 0 ; virtual void extractSpill (folly::Range<char **>, RowVectorPtr&) ; virtual void runSpill (bool lastRun) ; };
3.2 SpillRun — 内存中的 spill 缓冲 1 2 3 4 5 6 7 8 9 10 11 12 struct SpillRun { SpillRows rows; uint64_t numBytes{0 }; bool sorted{false }; void clear () { rows.clear (); rows.shrink_to_fit (); numBytes = 0 ; sorted = false ; } };
SpillRun::rows 存的是 RowContainer 内部的原始行指针(char*),不拷贝行数据本身 ,仅持有指针。真正的行数据仍在 RowContainer 的 Arena 内存中,直到 extractSpill 时才物化为列式 RowVector。每个分区(SpillPartitionId)对应一个 SpillRun,存在 spillRuns_ map 中。
3.3 fillSpillRuns — 数据填充 这是 spill 流程的第一步,负责扫描 RowContainer、计算 hash、将行指针按分区放入对应 SpillRun。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 bool SpillerBase::fillSpillRuns (RowContainerIterator* iterator) { constexpr int32_t kHashBatchSize = 4096 ; bool lastRun{false }; uint64_t totalRows{0 }; for (;;) { const auto numRows = container_->listRows (iterator, kHashBatchSize, rows.data ()); if (numRows == 0 ) { lastRun = true ; break ; } auto rowSet = folly::Range <char **>(rows.data (), numRows); if (!isSinglePartition) { for (auto i = 0 ; i < container_->keyTypes ().size (); ++i) { container_->hash (i, rowSet, i > 0 , hashes.data ()); } } for (auto i = 0 ; i < numRows; ++i) { const auto partitionNum = isSinglePartition ? 0 : bits_.partition (hashes[i]); auto & spillRun = createOrGetSpillRun (SpillPartitionId (partitionNum)); spillRun.rows.push_back (rows[i]); spillRun.numBytes += container_->rowSize (rows[i]); } totalRows += numRows; if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) break ; } markSeenPartitionsSpilled (); return lastRun; }
hash() 的多列迭代细节:第 i=0 列 combine=false,直接写 hashes[];后续列 combine=true,将新 hash 值 XOR 到已有结果上。这使多列 hash 只需遍历一次行集合,CPU 友好。
3.4 SpillRun 为何切分 / maxSpillRunRows 的作用 spill() 的主循环是 do { fillSpillRuns; runSpill; } while (!lastRun)。maxSpillRunRows_ 控制每轮 fillSpillRuns 最多处理多少行,从而将整个大 RowContainer 的 spill 分成多个小 run 依次执行。这样做有四个原因:
1. 指针数组本身的内存代价
SpillRun::rows 是 std::vector<char*>,每个元素 8 字节。若 RowContainer 有 1 亿行,不切分时仅指针数组就需要 800 MB 。maxSpillRunRows_ 通常设为数十万行,指针数组只占几 MB。
2. 排序的内存代价(仅 sorted spill)
ensureSorted 对 run.rows 原地排序(TimSort / PrefixSort),比较函数需要间接访问行数据。run.rows.size() 越大,排序所需的辅助内存和 cache miss 越高。切小 run 使每次排序在 cache 中完成。
3. 分步释放,配合内存仲裁
每轮 runSpill 结束后调用 run.clear()(含 shrink_to_fit()),立即归还指针数组内存。内存仲裁器在两轮 run 之间可观察到内存已下降,有机会停止 spill(若目标内存已释放够了)。若一次性塞满所有行,仲裁器只能在整个 spill 完成后才能看到效果。
4. PrestoPage 序列化限制
SpillState::appendToPartition 调用 validateSpillBytesSize,检查单次写入的估算字节数不超过 2 GB (std::numeric_limits<int32_t>::max())——这是 PrestoPage 协议的字段宽度限制。切分 run 使每次写入远小于这个上限,避免序列化失败。
1 2 3 4 5 6 7 maxSpillRunRows_ 的取值: HashBuildSpiller → spillConfig->maxSpillRunRows(默认数十万行) AggregationInputSpiller → spillConfig->maxSpillRunRows AggregationOutputSpiller → spillConfig->maxSpillRunRows SortInputSpiller → spillConfig->maxSpillRunRows SortOutputSpiller → spillConfig->maxSpillRunRows NoRowContainerSpiller → 0(无上限,直接 appendToPartition,无 RowContainer)
spill 的核心转换:行式 RowContainer → 列式 RowVector 。这发生在 writeSpill → extractSpillVector → extractSpill 调用链中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 int64_t SpillerBase::extractSpillVector ( SpillRows& rows, int32_t maxRows, int64_t maxBytes, RowVectorPtr& spillVector, size_t & nextBatchIndex) { int32_t numRows = 0 ; int64_t bytes = 0 ; auto limit = std::min (rows.size () - nextBatchIndex, (size_t )maxRows); for (; numRows < limit; ++numRows) { bytes += container_->rowSize (rows[nextBatchIndex + numRows]); if (bytes > maxBytes) { ++numRows; break ; } } extractSpill (folly::Range (&rows[nextBatchIndex], numRows), spillVector); nextBatchIndex += numRows; return bytes; }
rowSize() 的计算 :
1 2 3 4 5 6 uint32_t RowContainer::rowSize (const char * row) const { return fixedRowSize_ + (rowSizeOffset_ ? *reinterpret_cast <const uint32_t *>(row + rowSizeOffset_) : 0 ); }
fixedRowSize_ 是编译时确定的固定部分大小;rowSizeOffset_ 指向行尾的一个 uint32_t 字段,记录该行的变长部分(out-of-line 字符串)总字节数,两者相加为该行的完整内存占用估算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void SpillerBase::extractSpill ( folly::Range<char **> rows, RowVectorPtr& resultPtr) { if (resultPtr == nullptr ) { resultPtr = BaseVector::create <RowVector>( rowType_, rows.size (), memory::spillMemoryPool ()); } else { resultPtr->prepareForReuse (); resultPtr->resize (rows.size ()); } auto * result = resultPtr.get (); for (auto i = 0 ; i < container_->columnTypes ().size (); ++i) { container_->extractColumn (rows.data (), rows.size (), i, result->childAt (i)); } const auto & accumulators = container_->accumulators (); for (auto i = 0 ; i < accumulators.size (); ++i) { accumulators[i].extractForSpill ( rows, result->childAt (i + container_->columnTypes ().size ())); } }
spillMemoryPool() 是专用的 spill 内存池,与算子的 operator pool 独立,防止 spill 写入过程中分配的临时内存被算子内存统计。
3.6 RowContainer 行布局与列提取原理 理解行列转换,需要先了解 RowContainer 的行内存布局:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 每行在 Arena 中的内存布局(低地址 → 高地址): ┌──────────────────────────────────────────────────────┐ │ normalizedKey (可选,8 字节) │ ← 排序优化前缀 ├──────────────────────────────────────────────────────┤ │ null bits(每列 1 bit,对齐到 8 位) │ ← 列 nullable 标志 ├──────────────────────────────────────────────────────┤ │ 固定宽度列值(int32/int64/float/double 等,inline 存储)│ │ 变长列引用(StringView = 8B ptr + 8B size,实际数据 │ │ 通过 HashStringAllocator out-of-line 存储)│ ├──────────────────────────────────────────────────────┤ │ accumulator 固定部分(各 aggregate 状态,inline 存储) │ ← 每个 agg 对齐存放 ├──────────────────────────────────────────────────────┤ │ rowSizeOffset(uint32_t,变长部分总字节数) │ ← 仅含变长列时存在 ├──────────────────────────────────────────────────────┤ │ probed flag(1 bit,仅 right/full join) │ └──────────────────────────────────────────────────────┘
extractColumn(rows[], numRows, columnIndex, result) 的工作:
1 2 3 4 5 6 1. 确定该列在行内的 byte offset(通过 RowColumn 对象,编译时固定) 2. 读取 null bit 填充 result 的 null bitmap 3. 对每行读取 offset 处的值: - 固定宽度列:直接 memcpy 4/8 字节 - StringView 列:读取 ptr+size,若数据是 inline(≤12B)直接复制; 否则从 HashStringAllocator 的堆上读取实际字符串,写入 result 的 string buffer
关键性能特征 :
同一列的 offset 在所有行中相同(columnAt(i) 编译时固定),无需解析行 header
变长列(StringView)在行中只存引用,实际字符串在 HashStringAllocator 堆上;提取时需要一次间接访问,可能有 cache miss
freezeAndExecute 在 spill 期间冻结 HashStringAllocator,防止其他线程并发修改变长数据
3.7 Accumulator 中间态提取 对于聚合算子,extractForSpill 提取的不是最终聚合结果,而是中间状态(partial intermediate) :
1 2 3 4 void Accumulator::extractForSpill (folly::Range<char **> groups, VectorPtr& result) const { spillExtractFunction_ (groups, result); }
extractAccumulators 是 Aggregate 虚函数,每种聚合函数自己实现。例如:
sum(int64) → 提取 int64 partial sum
avg(double) → 提取 (sum: double, count: int64) row struct
array_agg(T) → 提取序列化的数组(out-of-line 存储,需要 copy)
恢复(unspill)时,updateRow 调用 aggregate->addSingleGroupIntermediateResults,将磁盘读回的中间状态与内存中的行合并:
1 2 3 4 5 6 7 8 9 void GroupingSet::updateRow (SpillMergeStream& input, char * row) { mergeSelection_.setValid (input.currentIndex (), true ); for (auto i = 0 ; i < aggregates_.size (); ++i) { mergeArgs_[0 ] = input.current ().childAt (i + keyChannels_.size ()); aggregates_[i].function->addSingleGroupIntermediateResults ( row, mergeSelection_, mergeArgs_, false ); } }
这实现了”流式归并聚合”:相同 key 的行按排好的顺序依次到来,逐行 merge accumulator,避免将整个 group 缓存在内存中。
3.8 writeSpill 内的 Batch 切分(kTargetBatchRows / kTargetBatchBytes) 即使在同一个 SpillRun 内部,writeSpill 也不是一次性将所有行物化为 RowVector,而是每次 extractSpillVector 只处理 最多 64 行或 256 KB (whichever first):
1 2 3 4 5 6 7 SpillRun.rows = [ptr0, ptr1, ..., ptr_N] // N 可能有数十万 ↓ writeSpill 循环: batch 0: [ptr0 .. ptr63] → extractSpill → RowVector(64行) → appendToPartition batch 1: [ptr64 .. ptr127] → extractSpill → RowVector(64行) → appendToPartition ... batch K: [ptr_last_start .. ptr_N] → extractSpill → RowVector(M行) → appendToPartition
为什么要切成 64 行/256KB 的小 batch?
① RowVector 内存峰值控制
每次 extractSpill 分配(或复用)一个 RowVector,其内存由 spillMemoryPool 分配。单批 64 行的 RowVector 内存占用可控(通常几 KB 到几百 KB),而 prepareForReuse() 使相邻批次复用同一块内存,峰值内存 ≈ 单批大小,而非整个 run 大小。
② 与 SpillWriter 写缓冲的配合
SpillWriter 内部有 writeBufferSize 的写缓冲,每次 appendToPartition → write() 将 PrestoPage 数据追加到缓冲;缓冲满时才真正调用文件系统 write()(刷盘)。小批次使每次 write() 调用的数据大小贴近 writeBufferSize,I/O 效率高。
③ “至少 1 行”的保证防止死循环
1 2 3 4 if (bytes > maxBytes) { ++numRows; break ; }
若某行本身 > 256 KB(如包含超大字符串),不加这一行就会无限循环(numRows=0 → written 不推进)。++numRows 确保每次循环至少推进一行。
④ 64 行是何依据?
64 行是经验值,使得:
一个 RowVector 的列式存储(FlatVector<int64> = 64×8=512B)可以完全装入 L1 cache
extractColumn 的内层循环(每列 64 次写入)具有良好的向量化潜力
与 SpillMergeStream 归并读取的 batch 大小相配(读时也是小批量迭代,减少归并树的重平衡次数)
3.9 runSpill — 并发写入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void SpillerBase::runSpill (bool lastRun) { spillStats_->spillRuns.fetch_add (1 ); std::vector<std::shared_ptr<AsyncSource<SpillStatus>>> writes; for (const auto & [id, spillRun] : spillRuns_) { if (spillRun.rows.empty ()) continue ; writes.push_back ( memory::createAsyncMemoryReclaimTask <SpillStatus>( [partitionId = id, this ]() { return writeSpill (partitionId); })); if ((writes.size () > 1 ) && executor_ != nullptr ) { executor_->add ([source = writes.back ()]() { source->prepare (); }); } } auto sync = folly::makeGuard ([&]() { for (auto & write : writes) { try { write->move (); } catch (...) {} } }); for (auto & write : writes) { results.push_back (write->move ()); } for (auto & result : results) { if (result->error) std::rethrow_exception (result->error); spillRuns_.at (result->partitionId).clear (); if (needSort ()) { state_.finishFile (result->partitionId); } } }
并发策略细节 :
第 1 个分区的写任务在调用线程 上通过 write->move() 触发(AsyncSource::prepare() 从未被调用,move() 内联执行)
第 2+ 个分区提交到 executor_(后台 CPU 线程池)并发执行
createAsyncMemoryReclaimTask 确保任务在内存仲裁器视角下是”可取消的回收任务”,若系统 OOM 时仲裁器能感知到这些任务正在运行
state_.finishFile(partitionId) 调用 SpillWriter::finishFile(),flush 当前文件并记录 SpillFileInfo,下次 appendToPartition 会创建新文件。这使每个 sorted run 独立成文,后续归并时文件间已有序。
3.10 finishSpill — 收尾 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void SpillerBase::finishSpill (SpillPartitionSet& partitionSet) { finalizeSpill (); for (const auto & partitionId : state_.spilledPartitionIdSet ()) { auto wholeId = parentId_.has_value () ? SpillPartitionId (parentId_.value (), partitionId.partitionNumber ()) : partitionId; if (partitionSet.count (wholeId) == 0 ) { partitionSet.emplace (wholeId, std::make_unique <SpillPartition>(wholeId, state_.finish (partitionId))); } else { partitionSet[wholeId]->addFiles (state_.finish (partitionId)); } } }
partitionSet 是 std::map<SpillPartitionId, ...>,按 SpillPartitionId 有序排列(DFS 前序),确保递归 spill 场景下父 partition 先于子 partition 被处理。
3.11 各子类 Spiller 详解 HashBuildSpiller 1 2 3 4 5 needSort : false(hash join 恢复时重建 hash 表,顺序无关) HashBitRange : [startPartitionBit, startPartitionBit + numPartitionBits)(分区存储) targetFileSize: spillConfig->maxFileSize maxSpillRunRows: spillConfig->maxSpillRunRows parentId_ : restoringPartitionId_(递归时指定父 partition)
差异点——重写 extractSpill 附加 probed flag 列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void HashBuildSpiller::extractSpill (folly::Range<char **> rows, RowVectorPtr& resultPtr) { for (auto i = 0 ; i < container_->columnTypes ().size (); ++i) { container_->extractColumn (rows.data (), rows.size (), i, result->childAt (i)); } if (spillProbeFlag_) { auto * flagVector = result->childAt (types.size ())->asFlatVector <bool >(); for (auto i = 0 ; i < rows.size (); ++i) { flagVector->set (i, container_->hasProbedFlag (rows[i])); } } }
spill() 重载1(spillTriggered_=true + 全量 spill):
1 2 3 4 void HashBuildSpiller::spill () { spillTriggered_ = true ; SpillerBase::spill (nullptr ); }
spill(partitionId, spillVector) 重载2(直通路径,已 triggered 后):
1 2 3 4 5 void HashBuildSpiller::spill (const SpillPartitionId& id, const RowVectorPtr& vec) { if (!state_.isPartitionSpilled (id)) state_.setPartitionSpilled (id); state_.appendToPartition (id, vec); }
1 2 3 4 5 needSort : true(按所有 group key 排序,支持流式归并聚合) HashBitRange : [startPartitionBit, startPartitionBit + numPartitionBits) targetFileSize: uint64_t::max(文件大小不限,由 maxFileSize 配置控制) maxSpillRunRows: spillConfig->maxSpillRunRows sortingKeys : 所有 key 列,升序,nulls last
继承默认 extractSpill:提取 key 列 + 各 accumulator 的 extractAccumulators 中间状态。
每次 runSpill 后(needSort=true),调用 state_.finishFile(id) 关闭当前文件——每个 run 的数据写到独立的 sorted 文件 。后续 createOrderedReader 用败者树对所有文件做 N 路归并,还原全局有序流。
关键约束:rows->stringAllocator().freezeAndExecute([&]() { inputSpiller_->spill(); }) 在 spill 期间冻结 HashStringAllocator,防止 accumulator 在 spill 时并发分配/释放变长内存。
AggregationOutputSpiller 1 2 3 4 needSort : false(hash 表 scan 顺序即为输出顺序,不需要重排) HashBitRange : {}(空,单 partition 0) targetFileSize: uint64_t::max maxSpillRunRows: spillConfig->maxSpillRunRows
spill(startRowIter) 从 rowIterator 指定的偏移开始扫描(支持从已输出的位置继续):
1 2 3 4 void AggregationOutputSpiller::spill (const RowContainerIterator& startRowIter) { state_.setPartitionSpilled (SpillPartitionId (0 )); SpillerBase::spill (&startRowIter); }
重写 runSpill:在最后一次 run 后强制 finishFile:
1 2 3 4 5 6 7 8 void AggregationOutputSpiller::runSpill (bool lastRun) { SpillerBase::runSpill (lastRun); if (lastRun) { for (const auto & [id, _] : spillRuns_) { state_.finishFile (id); } } }
1 2 3 4 5 needSort : true(按 sort key 排序) HashBitRange : {}(空,单 partition 0,OrderBy 无 hash 分区) targetFileSize: uint64_t::max maxSpillRunRows: spillConfig->maxSpillRunRows sortingKeys : 所有 sort 列,对应 CompareFlags(方向 + nulls 位置)
spillerStoreType_ 把 sort 列排在最前面,这样归并时只需比较前 N 列(减少 cache miss)。
spill() 直接调用 SpillerBase::spill(nullptr),无参数——从 RowContainer 头部开始扫全部行。
SortOutputSpiller 1 2 3 4 needSort : false(调用方已传入排好序的 SpillRows) HashBitRange : {}(单 partition 0) targetFileSize: uint64_t::max maxSpillRunRows: spillConfig->maxSpillRunRows
接口与其他 spiller 不同——接受外部已排序的行指针数组:
1 2 3 4 5 6 7 8 void SortOutputSpiller::spill (SpillRows& rows) { auto & run = createOrGetSpillRun (SpillPartitionId (0 )); run.rows = SpillRows (rows.begin (), rows.end (), run.rows.get_allocator ()); for (const auto * row : rows) run.numBytes += container_->rowSize (row); markSeenPartitionsSpilled (); runSpill (true ); }
重写 runSpill:执行完后立即 finishFile,因为这是唯一一次写入。
NoRowContainerSpiller / MergeSpiller 1 2 3 needSort : false container_ : nullptr(无 RowContainer) maxSpillRunRows: 0(不使用)
无 RowContainer,通过 spill(partitionId, spillVector) 直接将外部 RowVector 写入磁盘,跳过整个 fillSpillRuns → runSpill → extractSpill 流程:
1 2 3 4 5 void NoRowContainerSpiller::spill ( const SpillPartitionId& id, const RowVectorPtr& spillVector) { if (!state_.isPartitionSpilled (id)) state_.setPartitionSpilled (id); state_.appendToPartition (id, spillVector); }
MergeSpiller 是 NoRowContainerSpiller 的子类,额外携带 sortingKeys(用于 hash join probe 侧的归并 spill,需要保留排序信息以便后续 probe 侧归并读取)。
各子类对比
子类
needSort
HashBitRange
container_
特殊行为
HashBuildSpiller
false
多分区
✓
重写 extractSpill:附加 probed flag 列;双重 spill() 入口
AggregationInputSpiller
true
多分区
✓
每 run 对应独立 sorted 文件;freeze allocator
AggregationOutputSpiller
false
单分区(空)
✓
从迭代器偏移开始;重写 runSpill 收尾
SortInputSpiller
true
单分区(空)
✓
sort 列前置;每 run 独立 sorted 文件
SortOutputSpiller
false
单分区(空)
✓
接受外部已排序 SpillRows;重写 runSpill
NoRowContainerSpiller
false
多分区
nullptr
直接 appendToPartition,无 fill/extract 流程
MergeSpiller
false
多分区
nullptr
同上,额外保留 sortingKeys
4. HashJoin Spill 实现 4.1 整体流程 HashJoin spill 只发生在 Build 侧(HashBuild),主要用于处理 build 表过大的情况。Probe 侧配合 build 侧,读取对应的 spill 分区数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ┌─────────────────────────────────────────────────────────────┐ │ Build 阶段 │ │ addInput → ensureInputFits → [内存不足] → reclaim │ │ ↓ spill triggered 后 │ │ addInput → spillInput → 直接 spill 到对应 partition │ │ ↓ noMoreInput │ │ finishHashBuild → finishSpill → joinBridge.setHashTable │ │ ↓ │ │ postHashBuildProcess → waitForProbe / setupSpillInput │ ├─────────────────────────────────────────────────────────────┤ │ Probe 阶段 │ │ 探测内存中的 hash 表 + 同步 spill 对应分区的 probe 侧数据 │ │ joinBridge.setSpillPartitions → build 侧 setupSpillInput │ ├─────────────────────────────────────────────────────────────┤ │ Restore 阶段(递归,每个 spill partition 一轮) │ │ setupSpillInput → processSpillInput → addInput(重建表) │ │ → finishHashBuild → 新一轮 probe / 再次 spill(递归) │ └─────────────────────────────────────────────────────────────┘
4.2 HashBuildSpiller 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class HashBuildSpiller : public SpillerBase { const bool spillProbeFlag_; bool spillTriggered_{false }; void spill () { spillTriggered_ = true ; SpillerBase::spill (nullptr ); } void spill (const SpillPartitionId& partitionId, const RowVectorPtr& spillVector) { VELOX_CHECK (spillTriggered_); if (!state_.isPartitionSpilled (partitionId)) { state_.setPartitionSpilled (partitionId); } state_.appendToPartition (partitionId, spillVector); } };
**重写 extractSpill**:对于 right/full outer join,需要在 spill 数据中附加一列 bool probed_flag,记录该行是否已被 probe 侧命中:
1 2 3 4 5 6 7 8 9 10 11 12 13 void HashBuildSpiller::extractSpill (folly::Range<char **> rows, RowVectorPtr& resultPtr) { for (auto i = 0 ; i < types.size (); ++i) { container_->extractColumn (rows.data (), rows.size (), i, result->childAt (i)); } if (spillProbeFlag_) { auto * flagVector = result->childAt (types.size ())->asFlatVector <bool >(); for (auto i = 0 ; i < rows.size (); ++i) { flagVector->set (i, container_->hasProbedFlag (rows[i])); } } }
4.3 setupSpiller 在 HashBuild 初始化时和每次 restore 时调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void HashBuild::setupSpiller (SpillPartition* spillPartition) { if (!canSpill ()) return ; if (spillType_ == nullptr ) { spillType_ = hashJoinTableSpillType (tableType_, joinType_); if (needProbedFlagSpill_) { spillProbedFlagChannel_ = spillType_->size () - 1 ; spillProbedFlagVector_ = std::make_shared<ConstantVector<bool >>( pool (), 0 , false , BOOLEAN (), false ); } } uint8_t startPartitionBit = config->startPartitionBit; if (spillPartition != nullptr ) { spillInputReader_ = spillPartition->createUnorderedReader ( config->readBufferSize, pool (), spillStats_.get ()); restoringPartitionId_ = spillPartition->id (); startPartitionBit = partitionBitOffset ( spillPartition->id (), startPartitionBit, numPartitionBits) + numPartitionBits; if (config->exceedSpillLevelLimit (startPartitionBit)) { exceededMaxSpillLevelLimit_ = true ; return ; } } spiller_ = std::make_unique <HashBuildSpiller>( joinType_, restoringPartitionId_, table_->rows (), spillType_, HashBitRange (startPartitionBit, startPartitionBit + config->numPartitionBits), config, spillStats_.get ()); }
每次 addInput 前调用,是 proactive spill 的入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void HashBuild::ensureInputFits (RowVectorPtr& input) { if (!canSpill () || spiller_ == nullptr || spiller_->spillTriggered ()) return ; const auto tableIncrementBytes = table_->hashTableSizeIncrease (input->size ()); const auto rowContainerIncrementBytes = rows->sizeIncrement (input->size (), ...); const auto incrementBytes = rowContainerIncrementBytes + tableIncrementBytes; if (availableReservationBytes >= minReservationBytes) { if (freeRows > input->size () && outOfLineFreeBytes >= flatBytes) return ; if (pool ()->availableReservation () > 2 * incrementBytes) return ; } const auto targetIncrementBytes = std::max <int64_t >( incrementBytes * 2 , currentUsage * spillConfig_->spillableReservationGrowthPct / 100 ); { Operator::ReclaimableSectionGuard guard (this ) ; if (pool ()->maybeReserve (targetIncrementBytes)) { if (spiller_->spillTriggered ()) { pool ()->release (); } return ; } } }
一旦 spiller_->spillTriggered() 为 true,所有新输入不再进入 hash 表,而是直接按 partition 分发到 spill 文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void HashBuild::spillInput (const RowVectorPtr& input) { if (!canSpill () || spiller_ == nullptr || !spiller_->spillTriggered () || !activeRows_.hasSelections ()) return ; computeSpillPartitions (input); for (auto row = 0 ; row < numInput; ++row) { if (!activeRows_.isValid (row)) continue ; activeRows_.setValid (row, false ); rawSpillInputIndicesBuffers_[spillPartitions_[row]][numSpillInputs_[partition]++] = row; } maybeSetupSpillChildVectors (input); for (uint32_t partition = 0 ; partition < numSpillInputs_.size (); ++partition) { spillPartition (partition, numSpillInputs_[partition], spillInputIndicesBuffers_[partition], input); } }
4.6 finishHashBuild — 收尾与传递 所有 build 线程完成后,最后一个线程执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 bool HashBuild::finishHashBuild () { SpillPartitionSet spillPartitions; for (auto * build : otherBuilds) { if (build->spiller_ != nullptr ) { build->spiller_->finishSpill (spillPartitions); } } if (spiller_ != nullptr ) { spiller_->finishSpill (spillPartitions); removeEmptyPartitions (spillPartitions); } table_->prepareJoinTable (std::move (otherTables), ...); HashJoinTableSpillFunc tableSpillFunc; if (canReclaim ()) { tableSpillFunc = [hashBitRange = spiller_->hashBits (), ...](auto table) { return spillHashJoinTable (table, ...); }; } joinBridge_->setHashTable (table, std::move (spillPartitions), joinHasNullKeys_, std::move (tableSpillFunc)); }
4.7 reclaim — 内存仲裁触发路径 内存仲裁器调用 reclaim() 强制 spill:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void HashBuild::reclaim (uint64_t , memory::MemoryReclaimer::Stats& stats) { if (nonReclaimableState ()) { ++stats.numNonReclaimableAttempts; return ; } const std::vector<Operator*> operators = task->findPeerOperators (pipelineId, this ); for (auto * op : operators) { if (static_cast <HashBuild*>(op)->nonReclaimableState ()) { ++stats.numNonReclaimableAttempts; return ; } } std::vector<HashBuildSpiller*> spillers; for (auto * op : operators) { spillers.push_back (static_cast <HashBuild*>(op)->spiller_.get ()); } spillHashJoinTable (spillers, config); for (auto * op : operators) { static_cast <HashBuild*>(op)->table_->clear (true ); static_cast <HashBuild*>(op)->pool ()->release (); } }
spillHashJoinTable 内部对每个 spiller 调用 spiller->spill()(即 HashBuildSpiller::spill()),将所有行按 hash 分区写到磁盘。
4.8 postHashBuildProcess / setupSpillInput — 递归恢复 Build 完成后,询问 join bridge 是否有 spill 数据需要 restore:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void HashBuild::postHashBuildProcess () { if (!canSpill ()) { setState (State::kFinish); return ; } auto spillInput = joinBridge_->spillInputOrFuture (&future_); if (!spillInput.has_value ()) { setState (State::kWaitForProbe); return ; } setupSpillInput (std::move (spillInput.value ())); } void HashBuild::setupSpillInput (HashJoinBridge::SpillInput spillInput) { if (spillInput.spillPartition == nullptr ) { setState (State::kFinish); return ; } table_.reset (); spiller_.reset (); spillInputReader_.reset (); restoringPartitionId_.reset (); std::iota (keyChannels_.begin (), keyChannels_.end (), 0 ); std::iota (dependentChannels_.begin (), dependentChannels_.end (), keyChannels_.size ()); setupTable (); setupSpiller (spillInput.spillPartition.get ()); processSpillInput (); }
1 2 3 4 5 6 7 8 9 10 11 12 13 void HashBuild::processSpillInput () { while (spillInputReader_->nextBatch (spillInput_)) { addInput (std::move (spillInput_)); if (!isRunning ()) return ; if (shouldYield ()) { state_ = State::kYield; future_ = ContinueFuture{folly::Unit{}}; return ; } } noMoreInputInternal (); }
这形成了递归循环:processSpillInput → addInput → ensureInputFits → [spill] → noMoreInput → finishHashBuild → postHashBuildProcess → setupSpillInput → processSpillInput。
4.10 Probe 侧 Spill 详解 Probe 侧(HashProbe)的 spill 涉及三条路径,通过 HashJoinBridge 与 build 侧协调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌─────────────────────────────────────────────────────────────────┐ │ HashJoinBridge — Build/Probe 协调中枢 │ │ │ │ setHashTable(table, spillPartitionSet, ...) │ │ ↓ probe 侧 asyncWaitForHashTable 取到 HashBuildResult │ │ ↓ HashBuildResult.spillPartitionIds 非空 → 触发 input spill │ │ │ │ probeFinished() │ │ ↓ last prober 调用,通知 bridge 准备下一个要恢复的 partition │ │ ↓ bridge 从 spillPartitionSets_ 取下一个 partition │ │ ↓ split 为 N shard → 分发给各 build 线程 │ │ │ │ appendSpilledHashTablePartitions(spillPartitionSet) │ │ ↓ probe 侧 reclaim 时,将被 spill 的 hash table 分区交还 bridge│ └─────────────────────────────────────────────────────────────────┘
HashBuildResult — Build/Probe 信息传递载体 1 2 3 4 5 6 struct HashBuildResult { std::shared_ptr<BaseHashTable> table; std::optional<SpillPartitionId> restoredPartitionId; SpillPartitionIdSet spillPartitionIds; bool hasNullKeys; };
spillPartitionIds 非空 ⟺ build 侧触发了 spill,probe 侧必须同步将对应行 spill 到磁盘。restoredPartitionId 非 null ⟺ 本次 hash 表是从 spill 文件 restore 得来的,probe 侧需要从对应的 spill 文件中读取之前被 spill 的 probe 行。
触发时机 :asyncWaitForHashTable 拿到 HashBuildResult 后,发现 spillPartitionIds 非空。
1 2 3 4 5 6 7 8 9 10 11 12 void HashProbe::asyncWaitForHashTable () { auto hashBuildResult = joinBridge_->tableOrFuture (&future_); table_ = hashBuildResult->table; initializeResultIter (); maybeSetupSpillInputReader (hashBuildResult->restoredPartitionId); maybeSetupInputSpiller (hashBuildResult->spillPartitionIds); checkMaxSpillLevel (hashBuildResult->restoredPartitionId); }
**maybeSetupInputSpiller**:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void HashProbe::maybeSetupInputSpiller ( const SpillPartitionIdSet& spillPartitionIds) { spillInputPartitionIds_ = spillPartitionIds; if (spillInputPartitionIds_.empty ()) return ; const auto bitOffset = partitionBitOffset ( *spillInputPartitionIds_.begin (), spillConfig ()->startPartitionBit, spillConfig ()->numPartitionBits); inputSpiller_ = std::make_unique <NoRowContainerSpiller>( probeType_, restoringPartitionId_, HashBitRange (bitOffset, bitOffset + spillConfig ()->numPartitionBits), spillConfig (), spillStats_.get ()); inputSpiller_->setPartitionsSpilled (spillInputPartitionIds_); spillPartitionFunction_ = std::make_unique <SpillPartitionFunction>( SpillPartitionIdLookup (spillInputPartitionIds_, ...), probeType_, keyChannels_); }
**spillInput**(在 addInput 中调用):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 void HashProbe::spillInput (RowVectorPtr& input) { const auto numInputRows = input->size (); prepareInputIndicesBuffers (numInputRows, inputSpiller_->state ().spilledPartitionIdSet ()); spillPartitionFunction_->partition (*input, spillPartitions_); vector_size_t numNonSpillingInput = 0 ; for (auto row = 0 ; row < numInputRows; ++row) { const auto & partitionId = spillPartitions_[row]; if (!inputSpiller_->state ().isPartitionSpilled (partitionId)) { rawNonSpillInputIndicesBuffer_[numNonSpillingInput++] = row; } else { rawSpillInputIndicesBuffers_.at (partitionId) [numSpillInputs_.at (partitionId)++] = row; } } if (numNonSpillingInput == numInputRows) return ; for (int32_t i = 0 ; i < input->childrenSize (); ++i) { input->childAt (i)->loadedVector (); } for (const auto & [partitionId, numRows] : numSpillInputs_) { if (numRows == 0 ) continue ; inputSpiller_->spill ( partitionId, wrap (numRows, spillInputIndicesBuffers_.at (partitionId), input)); } if (numNonSpillingInput == 0 ) { input = nullptr ; } else { input = wrap (numNonSpillingInput, nonSpillInputIndicesBuffer_, input); } }
关键设计 :wrap 使用 DictionaryVector,不拷贝行数据,只是 indices 引用,spill 写入时才通过 loadedVector() 强制物化。这样同一个 input batch 里属于不同 partition 的行可以零拷贝地分发到各自的 spill 文件。
**noMoreInputInternal**(探测完成,收尾 input spill):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void HashProbe::noMoreInputInternal () { noMoreSpillInput_ = true ; if (!spillInputPartitionIds_.empty ()) { inputSpiller_->finishSpill (inputSpillPartitionSet_); } if (!operatorCtx_->task ()->allPeersFinished (...)) { setState (ProbeOperatorState::kWaitForPeers); return ; } lastProber_ = true ; }
路径 2:restore 轮的 probe(从 spill 文件读取 probe 行) 当 build 侧从某个 spill partition 重建 hash 表后,probe 侧必须把之前 spill 到磁盘的对应 probe 行读回重新 probe。
**maybeSetupSpillInputReader**:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void HashProbe::maybeSetupSpillInputReader ( const std::optional<SpillPartitionId>& restoredPartitionId) { if (!restoredPartitionId.has_value ()) return ; auto iter = inputSpillPartitionSet_.find (restoredPartitionId.value ()); auto partition = std::move (iter->second); restoringPartitionId_ = restoredPartitionId; spillInputReader_ = partition->createUnorderedReader ( spillConfig_->readBufferSize, pool (), spillStats_.get ()); inputSpillPartitionSet_.erase (iter); }
**addSpillInput**(在 isBlocked 的 kRunning 状态时轮询调用):
1 2 3 4 5 6 7 8 9 10 11 12 void HashProbe::addSpillInput () { if (input_ != nullptr || noMoreSpillInput_) return ; if (!spillInputReader_->nextBatch (input_)) { noMoreInputInternal (); return ; } addInput (std::move (input_)); }
路径 3:output spill(内存仲裁触发 reclaim) 触发条件 :probe 正在处理某批 input 时,输出过大导致内存仲裁器调用 reclaim()。
**reclaim**:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void HashProbe::reclaim (uint64_t , memory::MemoryReclaimer::Stats& stats) { if (exceededMaxSpillLevelLimit_ || nonReclaimableState ()) { ++stats.numNonReclaimableAttempts; return ; } const auto probeOps = findPeerOperators (); bool hasMoreProbeInput = false ; for (auto * probeOp : probeOps) { if (probeOp->nonReclaimableState ()) { ++stats.numNonReclaimableAttempts; return ; } hasMoreProbeInput |= !probeOp->noMoreSpillInput_; } spillOutput (probeOps); SpillPartitionSet spillPartitionSet; if (hasMoreProbeInput) { spillPartitionSet = spillHashJoinTable ( table_, restoringPartitionId_, tableSpillHashBits_, joinNode_, spillConfig (), spillStats_.get ()); } const auto spillPartitionIdSet = toSpillPartitionIdSet (spillPartitionSet); for (auto * probeOp : probeOps) { probeOp->clearBuffers (); if (!spillPartitionSet.empty ()) { probeOp->maybeSetupInputSpiller (spillPartitionIdSet); } probeOp->pool ()->release (); } table_->clear (true ); if (!spillPartitionIdSet.empty ()) { joinBridge_->appendSpilledHashTablePartitions (std::move (spillPartitionSet)); } }
spillOutput(单个算子) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void HashProbe::spillOutput () { if (input_ == nullptr && !needLastProbe ()) return ; auto outputSpiller = std::make_unique <NoRowContainerSpiller>( outputType_, std::nullopt , HashBitRange{}, spillConfig (), spillStats_.get ()); outputSpiller->setPartitionsSpilled ({SpillPartitionId (0 )}); for (;;) { auto output = getOutputInternal (true ); if (output != nullptr ) { for (int32_t i = 0 ; i < output->childrenSize (); ++i) { output->childAt (i)->loadedVector (); } outputSpiller->spill (SpillPartitionId (0 ), output); continue ; } if (input_ == nullptr ) break ; break ; } outputSpiller->finishSpill (spillOutputPartitionSet_); removeEmptyPartitions (spillOutputPartitionSet_); }
读回阶段 :下次 getOutput 调用时,maybeReadSpillOutput() 将磁盘数据读回:
1 2 3 4 5 6 7 8 9 10 11 12 bool HashProbe::maybeReadSpillOutput () { if (spillOutputReader_ == nullptr ) { maybeSetupSpillOutputReader (); } if (spillOutputReader_ == nullptr ) return false ; if (!spillOutputReader_->nextBatch (output_)) { spillOutputReader_.reset (); return false ; } return true ; }
output spill 特殊注意 :getOutputInternal(toSpillOutput=true) 被调用时,若发现 input_ 为 null(即没有 pending input),会提前返回 null——这保证了 reclaim 路径不会误进入”probe 完成逻辑”(如 prepareForSpillRestore),避免状态混乱。
prepareForSpillRestore — 多轮恢复的关键 当一轮 probe(含当前 hash 表和对应 spill 文件的 probe 行)全部完成后,最后一个 probe 线程(lastProber_)负责启动下一轮恢复:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void HashProbe::prepareForSpillRestore () { noMoreSpillInput_ = false ; if (lastProber_) { table_->clear (true ); } table_.reset (); inputSpiller_.reset (); spillInputReader_.reset (); restoringPartitionId_.reset (); spillInputPartitionIds_.clear (); spillOutputReader_.reset (); if (!lastProber_) return ; joinBridge_->probeFinished (); wakeupPeerOperators (); lastProber_ = false ; }
Probe 侧 Spill 全生命周期 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 asyncWaitForHashTable() │ ├─ spillPartitionIds 非空 → maybeSetupInputSpiller() │ 创建 NoRowContainerSpiller,标记需要 spill 的 partition 集合 │ ├─ restoredPartitionId 非 null → maybeSetupSpillInputReader() │ 打开 UnorderedStreamReader,顺序读取之前 spill 的 probe 行 │ addInput(input) │ └─ needToSpillInput()? → spillInput(input) 按 hash 分 partition: spilled partition → DictionaryVector wrap → inputSpiller_.spill() non-spilled partition → 继续正常 probe(无拷贝) │ getOutput() → ensureOutputFits() │ ├─ maybeReadSpillOutput():读取上次 spillOutput 写到磁盘的 output batch │ └─ 内存不足 → reclaim(): 1. spillOutput(probeOps) 并发 spill 各 probe 的 pending output 2. spillHashJoinTable(table) 将 hash 表 spill 到磁盘 3. maybeSetupInputSpiller(ids) 各 probe 设置新 inputSpiller_ 4. appendSpilledHashTablePartitions() → 通知 bridge │ isBlocked(kRunning) + spillInputReader_ 非 null → addSpillInput() │ 逐 batch 从 spill 文件读取 probe 行 → addInput() → spillInput() → probe │ noMoreInputInternal() │ ├─ inputSpiller_.finishSpill(inputSpillPartitionSet_) └─ allPeersFinished() → lastProber_ = true │ ▼ hasMoreSpillData()? → prepareForSpillRestore() │ joinBridge_->probeFinished() │ wakeupPeerOperators() ▼ asyncWaitForHashTable() ← 循环(等待 build 侧重建 hash 表) │ └─ 所有 spill partition 处理完 → setState(kFinish)
5. HashAggregation Spill 实现 5.1 两种 Spiller HashAggregation 的 spill 逻辑集中在 GroupingSet 中,区分两个阶段:
1 2 3 4 5 6 7 8 9 10 11 inputSpiller_ (AggregationInputSpiller): - 在 input 处理阶段触发(hash 表太大) - needSort = true:按 group key 排序 - 分区:按 hash(group key) - 目的:spill 后 hash 表清空,腾出内存继续处理输入 outputSpiller_ (AggregationOutputSpiller): - 在 output 处理阶段触发(输出时内存不足) - needSort = false:已是输出顺序(hash 表 scan 顺序),不需要重排 - 无分区(HashBitRange 为空) - 目的:将剩余输出行先 spill 到磁盘,之后边读边输出
两者互斥:同一个 GroupingSet 要么用 inputSpiller_,要么用 outputSpiller_,不会同时存在。
由 HashAggregation::reclaim() 或 ensureInputFits 调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 void GroupingSet::spill () { if (table_ == nullptr || table_->numDistinct () == 0 ) return ; auto * rows = table_->rows (); VELOX_CHECK_NULL (outputSpiller_); if (inputSpiller_ == nullptr ) { const auto sortingKeys = SpillState::makeSortingKeys ( std::vector <CompareFlags>(rows->keyTypes ().size ())); inputSpiller_ = std::make_unique <AggregationInputSpiller>( rows, makeSpillType (), HashBitRange (startPartitionBit, startPartitionBit + numPartitionBits), sortingKeys, spillConfig_, spillStats_); } rows->stringAllocator ().freezeAndExecute ([&]() { inputSpiller_->spill (); }); if (isDistinct () && numDistinctSpillFilesPerPartition_.empty ()) { for (int partition = 0 ; partition < maxPartitions; ++partition) { numDistinctSpillFilesPerPartition_[partition] = inputSpiller_->state ().numFinishedFiles (SpillPartitionId (partition)); } } table_->clear (true ); }
makeSpillType() 生成 spill 类型:
1 2 3 4 5 6 7 8 9 10 RowTypePtr GroupingSet::makeSpillType () const { std::vector<TypePtr> types; for (auto & hasher : hashers_) types.push_back (hasher->type ()); for (auto & accumulator : accumulators (false )) { types.push_back (accumulator.spillType ()); } return ROW (names, types); }
5.3 GroupingSet::spill(rowIterator) — output spill 由 HashAggregation::reclaim() 在 output 阶段调用:
1 2 3 4 5 6 7 8 9 10 11 12 void GroupingSet::spill (const RowContainerIterator& rowIterator) { VELOX_CHECK (!hasSpilled ()); auto * rows = table_->rows (); outputSpiller_ = std::make_unique <AggregationOutputSpiller>( rows, makeSpillType (), spillConfig_, spillStats_); rows->stringAllocator ().freezeAndExecute ([&]() { outputSpiller_->spill (rowIterator); }); table_->clear (true ); }
AggregationOutputSpiller::spill(rowIterator) 内部:
1 2 3 4 5 void AggregationOutputSpiller::spill (const RowContainerIterator& startRowIter) { state_.setPartitionSpilled (SpillPartitionId (0 )); SpillerBase::spill (&startRowIter); }
AggregationOutputSpiller::runSpill 重写:每次 run 后立即 finishFile(保证每个 run 独立一个文件,便于后续归并)。
5.4 reclaim 触发路径 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void HashAggregation::reclaim (uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) { const auto compactedBytes = groupingSet_->compact (); if (compactedBytes >= targetBytes) return ; if (!canSpill ()) { return ; } if (groupingSet_->hasSpilled ()) { if (isOutputProcessing) { return ; } } if (isOutputProcessing) { groupingSet_->spill (resultIterator_); } else { groupingSet_->spill (); } }
5.5 getOutputWithSpill — 归并输出 当 groupingSet_->hasSpilled() 为 true 时,getOutput 路径切换为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 bool GroupingSet::getOutputWithSpill (int32_t maxOutputRows, int32_t maxOutputBytes, const RowVectorPtr& result) { if (outputSpillPartition_ == -1 ) { mergeRows_ = std::make_unique <RowContainer>(keyTypes, ...); initializeAggregates (*mergeRows_, false ); inputSpiller_->finishSpill (spillPartitionSet_); removeEmptyPartitions (spillPartitionSet_); prepareNextSpillPartitionOutput (); } return mergeNext (maxOutputRows, maxOutputBytes, result); } bool GroupingSet::prepareNextSpillPartitionOutput () { merge_ = nullptr ; if (spillPartitionSet_.empty ()) return false ; auto it = spillPartitionSet_.begin (); outputSpillPartition_ = it->first.partitionNumber (); merge_ = it->second->createOrderedReader (*spillConfig_, pool_, spillStats_); spillPartitionSet_.erase (it); return true ; }
5.6 mergeNextWithAggregates — 有序归并聚合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 bool GroupingSet::mergeNextWithAggregates ( int32_t maxOutputRows, int32_t maxOutputBytes, const RowVectorPtr& result) { bool nextKeyIsEqual{false }; for (;;) { auto [stream, equal] = merge_->nextWithEquals (); if (stream == nullptr ) { extractSpillResult (result); if (result->size () > 0 ) return true ; if (!prepareNextSpillPartitionOutput ()) return false ; continue ; } if (!nextKeyIsEqual) { mergeState_ = mergeRows_->newRow (); initializeRow (*stream, mergeState_); } updateRow (*stream, mergeState_); nextKeyIsEqual = equal; stream->pop (); if (!nextKeyIsEqual && (mergeRows_->numRows () >= maxOutputRows || mergeRowBytes () >= maxOutputBytes)) { extractSpillResult (result); return true ; } } }
关键 :merge_->nextWithEquals() 利用已排序的特性,保证相同 key 的行连续出现,从而可以在流式归并的同时完成聚合(无需将整个 group 缓存在内存中)。
5.7 mergeNextWithoutAggregates — distinct 归并 对于 SELECT DISTINCT 场景,spill 文件分为两类:
distinct 文件 (文件 ID < numDistinctSpillFilesPerPartition_):已输出过的 distinct 行
input 文件 (文件 ID >= 阈值):新输入的行
归并时:若某个相同 key 的 run 中包含来自 distinct 文件的行,则跳过该 run(因为已经输出过);否则输出该 run 中的一行。
6. OrderBy Spill 实现 6.1 两种 Spiller OrderBy 的 spill 通过 SortBuffer 实现,同样区分两个阶段:
1 2 3 4 5 6 7 8 9 10 inputSpiller_ (SortInputSpiller): - input 阶段:RowContainer 行数过多时触发 - needSort = true:按 sort key 排序后写入 - 无 hash 分区(HashBitRange 为空) - 写入一个 partition(partition 0) outputSpiller_ (SortOutputSpiller): - output 阶段:已完成内排序,输出时内存不足 - needSort = false:外部传入已排序的 SpillRows - 接收 sortedRows_ 中未输出部分,直接写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void SortBuffer::spillInput () { if (inputSpiller_ == nullptr ) { const auto sortingKeys = SpillState::makeSortingKeys (sortCompareFlags_); inputSpiller_ = std::make_unique <SortInputSpiller>( data_.get (), spillerStoreType_, sortingKeys, spillConfig_, spillStats_); } inputSpiller_->spill (); data_->clear (); }
spillerStoreType_ 的列顺序重排(构造时完成):
1 2 3 4 5 6 7 8 9 10 11 12 for (auto i = 0 ; i < numSortKeys; ++i) { sorted_types.push_back (input->childAt (sortColumnIndices[i])->type ()); sorted_names.push_back (input->nameOf (sortColumnIndices[i])); } for (auto i = 0 ; i < input->size (); ++i) { if (!isSortKey (i)) { sorted_types.push_back (input->childAt (i)->type ()); sorted_names.push_back (input->nameOf (i)); } }
这样 spill 文件中每行的前 N 列就是排序键,归并比较时只需读取前 N 列,节省反序列化开销。
6.3 SortBuffer::spillOutput 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void SortBuffer::spillOutput () { if (hasSpilled ()) return ; if (numOutputRows_ == sortedRows_.size ()) return ; outputSpiller_ = std::make_unique <SortOutputSpiller>( data_.get (), spillerStoreType_, spillConfig_, spillStats_); auto spillRows = SpillerBase::SpillRows ( sortedRows_.begin () + numOutputRows_, sortedRows_.end (), *memory::spillMemoryPool ()); outputSpiller_->spill (spillRows); data_->clear (); sortedRows_.clear (); sortedRows_.shrink_to_fit (); finishSpill (); }
SortOutputSpiller::spill(SpillRows) 内部:
1 2 3 4 5 6 7 8 9 void SortOutputSpiller::spill (SpillRows& rows) { auto & spillRun = createOrGetSpillRun (SpillPartitionId (0 )); spillRun.rows = SpillRows (rows.begin (), rows.end (), ...); for (const auto * row : rows) spillRun.numBytes += container_->rowSize (row); markSeenPartitionsSpilled (); runSpill (true ); }
SortOutputSpiller::runSpill 重写:在父类 runSpill 后立即调用 finishFile(保证文件封尾)。
6.4 SortBuffer::finishSpill 1 2 3 4 5 6 7 8 9 10 void SortBuffer::finishSpill () { if (inputSpiller_ != nullptr ) { inputSpiller_->finishSpill (spillPartitionSet_); } else { outputSpiller_->finishSpill (spillPartitionSet_); } VELOX_CHECK_EQ (spillPartitionSet_.size (), 1 ); }
6.5 SortBuffer::getOutputWithSpill 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void SortBuffer::prepareOutputWithSpill () { if (spillMerger_ != nullptr ) return ; VELOX_CHECK_EQ (spillPartitionSet_.size (), 1 ); spillMerger_ = spillPartitionSet_.begin ()->second->createOrderedReader ( *spillConfig_, pool (), spillStats_); spillPartitionSet_.clear (); } void SortBuffer::getOutputWithSpill () { int32_t outputRow = 0 , outputSize = 0 ; while (outputRow + outputSize < output_->size ()) { SpillMergeStream* stream = spillMerger_->next (); VELOX_CHECK_NOT_NULL (stream); spillSources_[outputSize] = &stream->current (); spillSourceRows_[outputSize] = stream->currentIndex (&isEndOfBatch); ++outputSize; if (isEndOfBatch) { gatherCopy (output_.get (), outputRow, outputSize, spillSources_, spillSourceRows_, columnMap_); outputRow += outputSize; outputSize = 0 ; } stream->pop (); } if (outputSize != 0 ) { gatherCopy (output_.get (), outputRow, outputSize, ...); } numOutputRows_ += output_->size (); }
gatherCopy 使用 columnMap_ 将 spill 文件中的列顺序(sort 列在前)映射回原始输出列顺序。
7. 关键设计决策总结 7.1 有序 vs. 无序 Spill
算子
Spill 类型
原因
HashJoin Build
无序
恢复时重建 hash 表,顺序无关
HashAggregation Input
有序(按 group key)
支持流式归并聚合,避免全量恢复
HashAggregation Output
无序(hash 表 scan 顺序)
已是输出顺序,不需要重排
OrderBy Input
有序(按 sort key)
支持多路归并产生有序输出
OrderBy Output
有序(已排序传入)
同上
Aggregation 和 OrderBy 都支持两阶段 spill:
Input 阶段 :按 hash 分区 spill,允许多次触发;之后 hash 表清空继续处理输入
Output 阶段 :无分区,只触发一次;将剩余行 spill 后用归并输出
两阶段互斥,通过 inputSpiller_/outputSpiller_ 非空判断。
7.3 内存预留机制(proactive spill) 1 2 3 4 5 触发链:ensureInputFits → maybeReserve → 触发内存仲裁 → reclaim → spill 关键参数: minSpillableReservationPct 确保始终有足够"可 spill"预留空间 spillableReservationGrowthPct 每次扩容幅度,平衡内存利用率与 spill 频率
设计意图:通过 proactive reservation,在真正 OOM 之前触发 spill,避免在不可中断的关键路径上被动 OOM。
7.4 HashBuild 多线程 Spill 协调 1 2 3 4 5 6 7 8 所有 peer HashBuild 线程共享同一个 joinBridge 仲裁器触发 reclaim 时: 1. 暂停所有 peer driver(task->pause()) 2. 检查所有 peer 均可回收 3. 统一 spill 所有 peer 的 hash 表 4. 清空所有 peer 的表 + 释放预留 这确保所有线程的 hash 表碎片被一次性清理,而不是只清理一个线程的。
7.5 递归 Spill(HashJoin 专属) 1 2 3 4 5 6 7 8 初次 spill:bit [32:29] 用于分区 → 8 个 spill partitions 第一次递归:bit [28:26] 用于分区 → 8 个子 partitions 最多 maxSpillLevel 层(默认 3 层) 终止条件: 1. 达到 maxSpillLevel 2. 某 partition 能完全装入内存(不再 spill) 3. partition 数据为空
7.6 probed flag Spill(Right/Full Outer Join) 对于 right join / full outer join,hash 表中每行有一个 probed 标记位,记录该行是否已与 probe 行匹配。Spill 时需要将此标记一同写入磁盘(作为额外的 bool 列),恢复时读回并设置回 RowContainer 的对应行。
8. 递归 Spill 流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 HashBuild 初始化 │ ▼ addInput(循环) │ ▼ ensureInputFits │ 内存不足? ┌─YES──────────────────────────────────────────────────────┐ │ reclaim() / maybeReserve 触发仲裁 │ │ spillHashJoinTable(spillers) │ │ ├─ spiller->spill() 对每个 partition │ │ │ fillSpillRuns → sort(no) → writeSpill → 磁盘 │ │ └─ table_->clear() │ │ spillTriggered_ = true │ └──────────────────────────────────────────────────────────┘ │ spillTriggered? ├─YES: spillInput(input) → 直接按 partition 写到 spill 文件 └─NO : insertIntoHashTable(input) │ ▼ noMoreInput → finishHashBuild │ ▼ 所有 peer 完成 → 合并 spill files → prepareJoinTable(只含未 spill 行) │ ▼ joinBridge_.setHashTable(table, spillPartitions) │ ▼ Probe 侧开始 probe + 同步 spill 对应的 probe 行 │ ▼ postHashBuildProcess │ 有 spill partition? ├─NO : setState(kFinish) ──────────────────── END └─YES: setupSpillInput(nextSpillPartition) │ ▼ processSpillInput │ ▼ addInput(从 spill 文件读取,复用 addInput 路径) │ 内存不足? ──YES──► 同上(递归 spill,bit offset +numPartitionBits) │ ▼ finishHashBuild → joinBridge(递归) │ └──────────► 下一个 spill partition(循环直至所有 partition 处理完)
9. C++ 编码品味:值得学习的细节 这一章从代码本身出发,梳理 Velox spill 系统在 C++ 工程实践上值得学习的具体技法。每一条都有对应的代码位置。
9.1 SCOPE_EXIT:让清理逻辑紧贴触发逻辑 1 2 3 4 5 6 7 8 SCOPE_EXIT { peers.clear (); for (auto & promise : promises) { promise.setValue (); } };
1 2 3 4 SCOPE_EXIT { pool_->release (); };
SCOPE_EXIT(底层是 folly::ScopeGuard)比 try-catch 清晰得多:清理代码与触发它的上下文紧邻,阅读者一眼就能看到”这个函数结束时会做什么”,而不需要去找 finally 块或析构函数。
对比写法 :
1 2 3 4 5 6 7 8 9 10 11 12 try { doWork (); pool_->release (); } catch (...) { pool_->release (); throw ; } SCOPE_EXIT { pool_->release (); }; doWork ();
9.2 folly::makeGuard:异步任务的”排水阀” 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 auto sync = folly::makeGuard ([&]() { for (auto & write : writes) { try { write->move (); } catch (const std::exception&) { } } }); for (auto & write : writes) { results.push_back (write->move ()); }
这里的 makeGuard 扮演”排水阀”角色:无论主逻辑成功还是抛异常,都确保所有后台异步任务被 move()(消费)掉,避免后台任务持有的资源(文件句柄、内存)泄漏。
关键细节:guard 内部的 catch (const std::exception&) 是有意的——清理路径本身不能 throw,第一个错误已经在 results 循环里被捕获了。
9.3 跨线程异常传递:exception_ptr 模式 多线程 spill 时,后台线程的异常无法直接传播到主线程。Velox 使用 std::exception_ptr 将异常”装箱”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 try { return std::make_unique <SpillStatus>(id, written, nullptr ); } catch (const std::exception&) { return std::make_unique <SpillStatus>(id, 0 , std::current_exception ()); } for (auto & result : results) { if (result->error != nullptr ) { std::rethrow_exception (result->error); } }
结果对象 SpillStatus 的定义简洁体现了这个模式:
1 2 3 4 5 struct SpillStatus { SpillPartitionId partitionId; uint32_t rowsWritten; std::exception_ptr error; };
这使得多线程错误处理与单线程代码风格一致——主线程的调用者只需检查返回值或 catch 异常,不需要感知线程。
9.4 “1 + N” 并发策略:第一个任务不切线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for (const auto & [id, spillRun] : spillRuns_) { writes.push_back ( memory::createAsyncMemoryReclaimTask <SpillStatus>( [partitionId = id, this ]() { return writeSpill (partitionId); })); if ((writes.size () > 1 ) && executor_ != nullptr ) { executor_->add ([source = writes.back ()]() { source->prepare (); }); } } for (auto & write : writes) { results.push_back (write->move ()); }
设计精妙处 :AsyncSource::move() 的语义是”如果还没执行就内联执行,已经执行就等待结果”。第一个任务从未被 prepare(),所以在 move() 调用时内联执行于主线程。
好处:
若只有一个 partition(常见情况),完全不产生线程切换开销
主线程始终有工作做,不会空等
N 个后台线程 + 主线程并发,吞吐量最大化
HashProbe::spillOutput 对多个 probe 算子也用了完全一样的模式。
9.5 CHECK vs DCHECK:按热度分配断言代价 1 2 3 4 5 6 7 VELOX_CHECK (!finalized_); VELOX_CHECK_EQ (numWritten, run.rows.size ()); VELOX_DCHECK_GE (partitionNum, 0 ); VELOX_DCHECK (isPartitionSpilled (id));
原则:
VELOX_CHECK_* 用于外部可观察的不变式 ,违反时意味着调用方 bug 或状态异常,即使在 release 也要发现
VELOX_DCHECK_* 用于内部实现的显然正确断言 ,理论上不可能失败,加它只是为了 debug 期间辅助排查
在 fillSpillRuns 的内层循环(每行都执行一次)里用 VELOX_DCHECK 而非 VELOX_CHECK,避免 release 构建中每行多一次分支,对亿级行场景有实际性能影响。
9.6 FOLLY_LIKELY / FOLLY_UNLIKELY:把预期写进代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (FOLLY_UNLIKELY (!spillRuns_.contains (id))) { spillRuns_.emplace (id, SpillRun (*memory::spillMemoryPool ())); } if (FOLLY_UNLIKELY (isEndOfBatch)) { gatherCopy (...); } if (FOLLY_LIKELY (outputSize != 0 )) { gatherCopy (...); }
FOLLY_LIKELY/UNLIKELY 不仅是编译器分支预测提示,更是可执行的注释 ——它明确告诉读者”作者预期这条路径是 hot/cold 的”。这比写 // rarely happens 注释更可靠,因为注释会过期,而这行代码会随着逻辑一起维护。
9.7 TestValue::adjust:无侵入式测试注入 1 2 3 4 5 6 7 8 TestValue::adjust ("facebook::velox::exec::SpillerBase" , this ); TestValue::adjust ("facebook::velox::exec::HashBuild::addInput" , this ); TestValue::adjust ("facebook::velox::exec::HashBuild::reclaim" , this );
TestValue::adjust 在生产代码中是空操作(编译器会完全优化掉)。在测试中,可以向特定注入点注册回调,在精确时机修改状态或注入错误,无需任何条件编译。
配套的 TestScopedSpillInjection 更进一步:
1 2 3 4 5 6 7 8 TestScopedSpillInjection injection (20 , ".*" , 10 ) ;if (testingTriggerSpill (pool_->name ())) { spill (); }
这使得 spill 的边界条件测试 (”在恰好第 3 次 addInput 时触发 spill”)变得非常精确,而生产代码无任何额外开销。
9.8 NanosecondTimer:RAII 计时的最小实现 1 2 3 4 5 6 7 uint64_t execTimeNs{0 };{ NanosecondTimer timer (&execTimeNs) ; } spillStats_->spillFillTimeNanos.fetch_add (execTimeNs, std::memory_order_relaxed);
几个值得注意的细节:
① 先积累,再 fetch_add :不在块内直接 fetch_add,而是先存到局部变量,块结束后一次 atomic 操作。减少 atomic 操作次数,也使代码结构更清晰。
**② memory_order_relaxed**:spill 统计不需要与其他内存操作同步,用 relaxed 避免不必要的内存屏障。
③ 一个特殊处理 :
1 2 updateSpillSortTime (std::max <uint64_t >(1 , sortTimeNs));
即使排序耗时 0 纳秒(几乎不可能但理论上存在),也写入 1,防止测试误判”排序未发生”。这是一个小而精的防御性设计。
9.9 SpillPartitionId:把所有接口设施配齐 SpillPartitionId 是一个需要在多种容器中使用的值类型,代码为它配齐了所有标准设施:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 bool operator <(const SpillPartitionId& other) const ;bool operator >(const SpillPartitionId& other) const { return other < *this ; } bool operator ==(const SpillPartitionId& other) const = default ; namespace std {template <>struct hash <SpillPartitionId> { uint32_t operator () (const SpillPartitionId& id) const { return std::hash <uint32_t >()(id.encodedId ()); } }; } template <>struct fmt ::formatter<SpillPartitionId> : formatter<std::string> { auto format (SpillPartitionId s, format_context& ctx) const { return formatter<std::string>::format(s.toString (), ctx); } }; inline std::ostream& operator <<(std::ostream& os, SpillPartitionId id) { return os << id.toString (); }
operator> 通过调用 operator< 实现而非独立写逻辑,是很好的习惯——减少了两个实现可能不一致的风险。
9.10 withWLock / withRLock:锁的最小化持有 1 2 3 4 5 6 7 8 9 10 partitionWriters_.withWLock ([&](auto & lockedWriters) { if (!lockedWriters.contains (id)) { lockedWriters.emplace (id, std::make_unique <SpillWriter>(...)); } }); return partitionWriter (id)->write (rows, ...);
1 2 3 4 5 6 partitionWriters_.withRLock ([&](const auto & partitionWriters) { for (const auto & [id, writer] : partitionWriters) { } });
folly::Synchronized 的 withWLock/withRLock lambda 接口使锁的范围在代码结构上可见——lambda 的括号就是临界区的边界,不需要手动 lock_guard。
关键设计:写锁只保护”创建 writer”(初始化一次),不保护”写数据”。这是因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突,无需锁保护。最快的锁是不持有锁 。
9.11 SpillRun::sorted:防止双重排序的状态位 1 2 3 4 5 6 7 8 9 10 11 12 13 struct SpillRun { SpillRows rows; uint64_t numBytes{0 }; bool sorted{false }; }; void SpillerBase::ensureSorted (SpillRun& run) { if (run.sorted || !needSort ()) { return ; } run.sorted = true ; }
这个 sorted 标志看起来很简单,但它解决了一个微妙的 bug 风险:如果 ensureSorted 被意外调用两次(代码重构时很容易发生),不加保护会导致正确排好序的数据被 re-sort 破坏(稳定排序会保持顺序,但非稳定排序可能打乱)。一个布尔标志,一个 guard,防御性地堵死了这条 bug 路径。
9.12 pool_->release():主动归还,而非等待回收 1 2 3 4 5 6 7 8 9 10 11 12 13 pool_->release (); SCOPE_EXIT { pool_->release (); }; if (spiller_->spillTriggered ()) { pool ()->release (); }
pool_->release() 的语义是”我承诺不再需要当前预留的超额内存”。Velox 在每个”阶段完成”的节点系统性地调用它,而不是等待 GC 或析构。
这是 push 模型 而非 pull 模型 :算子主动告诉内存池”我这块不要了”,内存池立即可以分配给其他需要者。在多算子并发的大查询中,这种主动性使内存回转速度快得多。
9.13 succinctBytes():有信息量的日志 1 2 3 4 5 6 7 LOG (WARNING) << "Failed to reserve " << succinctBytes (targetIncrementBytes) << " for memory pool " << pool_->name () << ", root pool: " << pool_->root ()->name () << ", used: " << succinctBytes (pool_->usedBytes ()) << ", reservation: " << succinctBytes (pool_->reservedBytes ()) << ", root pool reservation: " << succinctBytes (pool_->root ()->reservedBytes ());
几个细节值得学习:
**① succinctBytes()**:自动选择合适单位(B/KB/MB/GB),比裸输出字节数可读性高 10 倍。
② 固定的信息层次 :pool 名 → root pool 名 → 已用 → 已预留 → root 已预留。这个模板在整个 spill 系统中完全一致,运维人员看一眼就知道去哪里找信息,不需要猜测每条日志的格式。
③ WARNING 而非 ERROR :预留失败不是错误,只是触发 spill 的信号。级别选择准确。
9.14 testing* 前缀:测试可观测性的统一约定
所有为测试暴露的内部状态方法,一律以 testing 前缀命名。这个约定的价值:
接口清晰度 :任何 testing* 方法,调用者立刻知道”这不是业务接口”
grep 友好 :grep 'testing[A-Z]' 精确列出所有测试专用接口,代码审查时可以专门审视
永不 friend:Velox 明确禁止 friend 声明(CLAUDE.md 有规定),测试访问内部状态只能通过这类 accessor,倒逼设计者 只暴露真正需要的状态 ,而非用 friend 开后门
9.15 constexpr 局部常量:让魔数有名字 1 2 3 4 5 6 7 8 9 10 void SpillerBase::fillSpillRuns (...) { constexpr int32_t kHashBatchSize = 4096 ; } std::unique_ptr<SpillerBase::SpillStatus> SpillerBase::writeSpill (...) { constexpr int32_t kTargetBatchBytes = 1 << 18 ; constexpr int32_t kTargetBatchRows = 64 ; }
这些常量定义在函数内部 而非文件级别,有两个好处:
① 最小作用域 :kHashBatchSize 只在 fillSpillRuns 里有意义,定义在全局或类级别会误导读者以为它有更广泛的语义。
② 紧贴使用处 :常量定义和使用之间距离最短,读者不需要跳转文件来理解这个数字的含义。
1 << 18 比 262144 更清楚地表达了”这是 2 的幂次”的意图,同时命名为 kTargetBatchBytes 说明了它是目标 (上限)而非精确值。
编码品味小结
技法
核心价值
代表位置
SCOPE_EXIT
清理与触发紧邻,所有 return 路径都覆盖
SortBuffer::getOutput, HashBuild::finishHashBuild
folly::makeGuard 排水阀
异步任务无论成败都被消费,防止资源泄漏
SpillerBase::runSpill
exception_ptr 跨线程传递
后台线程异常装箱,主线程重抛,调用方无感知
SpillerBase::writeSpill / runSpill
“1 + N” 并发
首任务内联,后续并发,单 partition 时零开销
SpillerBase::runSpill
DCHECK 分热路径
release 不付 inner loop 断言代价
fillSpillRuns inner loop
FOLLY_LIKELY/UNLIKELY
可执行的”热度注释”,不会过期
createOrGetSpillRun, getOutputWithSpill
TestValue::adjust
无侵入注入,生产零开销
所有关键函数入口
NanosecondTimer RAII
单行计时,早 return 安全
fillSpillRuns, ensureSorted
配齐值类型接口
<, >, ==, hash, formatter, << 一起提供
SpillPartitionId
withWLock 最小临界区
锁只保护初始化,写操作在锁外
SpillState::appendToPartition
sorted 防御位
防双重排序,用状态堵死隐患
SpillRun / ensureSorted
pool_->release() 主动归还
push 模型快速回转,而非等待析构
noMoreInput, getOutput
succinctBytes() 统一日志格式
固定信息层次,可读,可预期
所有 LOG(WARNING)
testing* 前缀约定
测试接口一眼可辨,禁用 friend 倒逼最小暴露
所有 testing* 方法
constexpr 局部常量
最小作用域,紧贴使用处,有名字
kHashBatchSize, kTargetBatchRows
10. 架构设计解读:优雅背后的工程思考 Velox spill 系统的代码量约 12,000 行,但它的设计并不复杂——真正值得品味的,是每一处决策背后的约束权衡 和系统性思考 。这一章从”为什么要这样设计”出发,梳理其核心设计哲学。
10.1 最根本的约束:OOM 不能是答案 数据库引擎处理大数据时,内存不可能无限大。任何算子都需要回答这个问题:当数据超过可用内存时,怎么办?
最简单的答案是抛出 OOM 异常。但这会让用户的查询白白失败,代价极高。Velox 的选择是:任何时候都不能因为数据量大而失败,必须能完成查询 (在 spill 开启的情况下)。
这一承诺对系统设计造成了深远影响:
不能假设数据能放进内存——每一个数据结构都要有”放不下时的出路”
不能只支持一次 spill——必须支持递归 spill (spill 后恢复,恢复时再次 OOM,再次 spill)
不能只在”好时机”才 spill——必须能在任意时刻被打断(内存仲裁 reclaim )
这三点决定了架构的基本形态。
10.2 分层:每层只管自己的事 整个 spill 系统被分成 5 层,每层接口极简:
1 2 3 4 5 算子层 → 决定「何时」和「触发哪种」spill Spiller层 → 决定「如何」把 RowContainer 内容写到磁盘(行→列,分区,排序) SpillState → 决定「写到哪个文件」 SpillFile → 决定「如何」序列化/反序列化(PrestoPage 格式) SpillConfig→ 决定「用什么参数」
这种分层带来的核心好处不是”解耦”这么虚的东西,而是每一层可以独立演进 :
要换序列化格式?只改 SpillFile,上面所有层不感知
要支持新算子 spill?只实现新的 Spiller 子类,底层文件 I/O 不变
要调整内存触发策略?只改算子层的 ensureInputFits,文件写入逻辑不变
这在一个长期演进的系统里价值巨大。HashJoin、HashAggregation、OrderBy 三类算子的 spill 逻辑差异极大,但它们复用了同一套文件 I/O 和统计框架,维护成本大幅降低。
10.3 分区:把”全量恢复”变成”分批恢复” Spill 最天真的设计是:内存不够时,把所有数据写到一个文件;恢复时,把整个文件读回来。
问题在于:如果数据总量是内存的 10 倍,一次性读回仍然 OOM。
Velox 的解法是按 hash 分区 :
1 2 3 4 5 数据 → hash(key) → partition 0, 1, 2, ..., 7(8 个分区) 恢复时: 一次只加载 partition 0 → 处理完 → 加载 partition 1 → ... 每次内存中只有 1/8 的数据
这个设计的精妙之处在于:分区和处理逻辑是解耦的 。HashBuild 恢复 partition 0 时,它完全复用了正常的 addInput → insertHashTable 路径,没有任何特殊处理。”从 spill 文件恢复” 和 “从上游算子输入” 在代码路径上几乎完全一样。
SpillPartitionId 的 bit 编码设计支持这种分批恢复天然地扩展到多层递归:
1 2 3 Level 0:bit[31:29] = 0, 分区号存在 bit[2:0] → 8 个 partition Level 1:bit[31:29] = 1, 分区号存在 bit[5:3] → 每个 Level 0 partition 又有 8 个子 partition Level 2:...
当 Level 0 的某个 partition 恢复时仍然 OOM,直接用下一层 bit 范围再 spill。恢复路径完全复用,递归深度理论上无限(受 maxSpillLevel 限制)。
10.4 有序与无序的分叉:根据”读回时的需求”决定 Spill 时要不要排序,完全由读回时的需求 决定,而不是写入时的方便:
算子
读回时的需求
要不要排序
HashJoin Build
重建 hash 表(插入顺序无关)
不排序 (节省 CPU)
HashAggregation
流式归并聚合(需要相同 key 相邻)
排序 (按 group key)
OrderBy
多路归并产生有序输出(需要各 run 内有序)
排序 (按 sort key)
这个设计的价值在于:对于不需要排序的场景,完全不付排序的代价 。HashJoin 是最常见的 spill 场景,它不排序节省了大量 CPU 时间。
而对于需要排序的场景(Aggregation、OrderBy),排序在 spill 时一次性完成,读回时的 N 路归并就是线性扫描,极其高效。如果不在 spill 时排序,读回时就必须全量加载再排序,峰值内存反而更高。
10.5 两阶段 Spill:尊重算子的生命周期 Aggregation 和 OrderBy 各有两个 Spiller(input/output),这不是过度设计,而是对算子生命周期 的精确建模:
1 2 Input 阶段:数据持续流入 → 多次触发 spill → hash 表/排序缓冲反复清空重填 Output 阶段:数据已全部处理 → 只可能触发一次 spill → 之后只有读出
两个阶段的需求截然不同:
Input 阶段 spill(AggregationInputSpiller / SortInputSpiller) :
需要 hash 分区(支持分批恢复)
需要排序(支持流式归并)
可以多次触发(每次清空 hash 表继续处理输入)
Output 阶段 spill(AggregationOutputSpiller / SortOutputSpiller) :
无需分区(只触发一次,全量写出)
无需排序(数据已经就绪,直接写入已有顺序)
只触发一次(之后只读,无需再写)
用同一个 Spiller 处理两个阶段会导致代码复杂化,且无法针对性优化。分开设计使每个 Spiller 的逻辑极度简单。
10.6 SpillRun 切分:在多个约束之间找到平衡点 maxSpillRunRows 参数控制每次 fill-run 的行数,这个设计同时解决了四个互不相关的问题:
1 2 3 4 约束 1:指针数组内存(SpillRun.rows 每行 8 字节,1 亿行 = 800MB) 约束 2:排序辅助内存(大 run 排序 cache miss 严重) 约束 3:PrestoPage 2GB 序列化限制 约束 4:内存仲裁的观测粒度(每 run 之间才能看到内存下降)
一个参数,四个收益,这是典型的”一石多鸟”设计。
值得注意的是,这个设计并没有在代码里写四个 // TODO: 原因X,而是通过一个统一的机制自然地满足了所有约束。这种”机制而非策略”的思维是优雅设计的标志。
10.7 批量行列转换:峰值内存与 I/O 效率的双重优化 extractSpillVector 每次只处理最多 64 行或 256KB,这个”小批量”设计同样服务于两个目标:
峰值内存 :
1 2 3 不切分时:SpillRun 10 万行 → 物化为一个 10 万行 RowVector → 可能数 GB 切分后: 每次 64 行 RowVector → 几百 KB → prepareForReuse 复用同一块内存 峰值 = 单批大小,与 run 总大小无关
I/O 效率 :
1 2 3 4 SpillWriter 有 writeBufferSize 写缓冲(默认几 MB) 每次 appendToPartition 追加到缓冲,满了才刷盘 64 行 ≈ 几十 KB → 多次 append 后缓冲满 → 一次大 write() 这比每 10 万行一次大 write() 的 I/O 效率相当,但内存峰值小得多
64 这个数字也不是随意的——它使得每列 FlatVector 的数据(64 × sizeof(int64) = 512B)恰好能放入 L1 cache,extractColumn 的内层循环具有最优的 cache 利用率。
10.8 主动预留 vs. 被动仲裁:两道防线 Velox 的内存触发有两条路径,共同构成防 OOM 的双重保险:
1 2 3 4 5 6 7 8 防线 1 — 主动预留(proactive): ensureInputFits → maybeReserve(targetIncrementBytes) ├─ 成功:继续处理(预留了足够空间) └─ 失败:[内存仲裁在此可能已触发 reclaim] 防线 2 — 被动仲裁(reactive): 内存仲裁器观测到系统内存紧张 → 调用 reclaim() └─ 强制 spill 当前算子
两者的分工:
防线 1 发生在算子自己的处理循环中,知道”我即将需要多少内存”,可以精确预留
防线 2 发生在任意时刻,无需算子配合,是最后的兜底手段
ReclaimableSectionGuard 是连接两者的桥梁——算子在调用 maybeReserve 前设置它,告诉仲裁器”现在可以来抢我的内存”。这样即使 maybeReserve 本身触发了全局内存整理,算子也能安全地被 spill。
10.9 NoRowContainerSpiller:让接口适应现实,而非让现实适应接口 HashProbe 侧 spill 时,行数据在 RowVectorPtr 里,根本不经过 RowContainer。如果强制所有 spill 都走 RowContainer → extractSpill → 磁盘 的路径,就需要先把 RowVector 插入 RowContainer 再提取出来,白白多一次转换。
NoRowContainerSpiller 的存在体现了一个原则:抽象应该覆盖真实场景,而不是把真实场景削足适履 。它直接接受 RowVectorPtr,跳过整个 fill/extract 流程,只保留写文件这一核心功能。
同样的思路也体现在 SortOutputSpiller 上:sortedRows_ 已经是排好序的行指针数组,如果再经过 fillSpillRuns → 按 hash 分区 → sort 这个流程,不仅多余还会破坏已有的顺序。它重写接口,接受外部已排序的 SpillRows,直接写入。
10.10 HashJoinBridge:异步协调的最小化接口 Build 和 Probe 在不同线程运行,需要协调三件事:
Build 完成 → 通知 Probe 开始
Probe 完成 → 通知 Build 恢复 spill partition
Build/Probe 任意一方 spill → 另一方配合
HashJoinBridge 用三个接口覆盖了全部场景:
1 2 3 setHashTable (table, spillPartitionSet) probeFinished () appendSpilledHashTablePartitions (set)
最优雅的设计是:这三个接口覆盖了所有递归层级 ,无论是第 0 层还是第 3 层递归,调用的接口完全一样。递归深度的增加不需要任何新的协调原语。
对比一下如果没有这种设计,每一层递归都要单独管理 future/promise 对,代码复杂度会指数级增长。
10.11 DictionaryVector wrap:零拷贝分区 Probe 侧 spillInput 需要将一个 batch 的行按 partition 分发到多个 spill 文件。最直接的做法是按 partition 拷贝行数据,但这意味着每行数据被读一次、写一次——对于宽行(含字符串列)代价极高。
Velox 的解法是 DictionaryVector:
1 2 3 4 5 6 原始 input:[row0, row1, row2, row3, row4] partition 0 的 indices buffer:[1, 3] partition 1 的 indices buffer:[0, 2, 4] wrap(2, partition0_buffer, input) → DictionaryVector{indices=[1,3], base=input} ↓ 序列化时才真正读取 row1, row3 的数据
这使得”分区”这个操作的代价只是分配 indices buffer 和一次 hash 计算,不涉及任何数据拷贝。真正的 I/O 推迟到 appendToPartition → SpillWriter::write → 序列化,此时数据只被读一次、写一次。
这是”懒求值”(lazy evaluation)思想在 spill 路径中的具体应用。
10.12 小结:设计模式归纳 回顾整个 spill 系统,可以归纳出几个贯穿始终的设计模式:
① 约束驱动设计(Constraint-Driven Design) 每个设计决策都能追溯到一个具体约束:SpillRun 切分 → PrestoPage 2GB 限制;64 行批次 → L1 cache;DictionaryVector → 零拷贝。没有无目的的”为了灵活性而灵活性”。
② 机制 vs. 策略分离(Mechanism vs. Policy Separation) SpillerBase 提供机制(fill/sort/write 的流程),各子类通过 needSort()、重写 extractSpill() 等钩子注入策略。算子层决定”何时 spill”,Spiller 层决定”如何 spill”,文件层决定”如何存储”。
③ 路径复用(Path Reuse) HashBuild 恢复 spill partition 时走的是和正常处理输入完全相同的 addInput 路径。HashProbe restore 轮也走同一套 addInput → spillInput → probe 路径。不为”恢复”写特殊逻辑,而是让恢复成为正常路径的自然一部分。这减少了测试覆盖的难度,也降低了特殊路径引入 bug 的风险。
④ 懒求值(Lazy Evaluation) 数据尽可能晚才物化:行指针不拷贝数据(SpillRun)→ DictionaryVector 不拷贝行(probe 分区)→ lazy 列在 spill 前才 loadedVector()。每一层都只在真正需要时才付出代价。
⑤ 尊重生命周期(Lifecycle Awareness) Input Spiller 和 Output Spiller 的分离不是技术必要性,而是对算子语义的尊重:input 阶段和 output 阶段的资源需求、触发频率、分区需求都不同,强行合并会产生大量条件分支,拆开则各自极简。
这套设计的优雅,不在于它的聪明,而在于它的诚实 ——每个决策都清楚地知道自己在解决什么约束,不做多余的事,也不留下欠债。