Velox Spiller
Velox Spiller
1. 整体架构
Velox 的 spill 系统是一个分层架构,每层职责明确:
核心设计理念:
- 分区化:spill 数据按 hash key 分区,使得每次只需将一个分区加载回内存,避免全量恢复
- 递归 spill:若恢复某分区后仍 OOM,可对该分区再次 spill(最多
maxSpillLevel层) - 有序与无序分离:sort-based 算子(Aggregation/OrderBy)spill 时保序,以支持归并输出;HashJoin 无需排序
- 两阶段 spill:Aggregation/OrderBy 区分 input 阶段与 output 阶段,各用不同的 Spiller
2. 基础设施层详解
2.1 SpillConfig — 配置层
文件:velox/common/base/SpillConfig.h
SpillConfig 是所有 spill 行为的控制中枢。
struct SpillConfig {
// 回调函数(避免与具体 Task/Driver 实现耦合)
GetSpillDirectoryPathCB getSpillDirPathCb; // 惰性获取 spill 目录
UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb; // 更新全局 quota 并检查是否超限
// 文件参数
std::string fileNamePrefix; // spill 文件名前缀
uint64_t maxFileSize; // 单文件大小上限,超过后自动 rotate
uint64_t writeBufferSize; // 写缓冲区,攒够再 flush
uint64_t readBufferSize; // 读缓冲区,支持 async 双缓冲预读
// 执行参数
folly::Executor* executor; // 非 null 时 spill 写入在后台线程并发执行
// 内存触发门槛
int32_t minSpillableReservationPct; // 可 spill reservation 不低于 used 的 X%
int32_t spillableReservationGrowthPct; // 一次 reserve 扩容 used 的 X%
uint64_t maxSpillRunRows; // 单 run 最多行数(防止 OOM)
uint64_t writerFlushThresholdSize; // TableWriter flush 门槛
// 分区参数(决定 spill 时的并行度)
uint8_t startPartitionBit; // hash 分区起始 bit(e.g. 29)
uint8_t numPartitionBits; // 每层分区 bit 数(e.g. 3 → 8 分区)
int32_t maxSpillLevel; // 最大递归 spill 层数,-1 表示不限
// 归并参数
uint32_t numMaxMergeFiles; // 归并时最多同时 open 的文件数(防 FD 耗尽)
// 排序优化
std::optional<PrefixSortConfig> prefixSortConfig; // 若设置则用 PrefixSort 代替 TimSort
// 其他
common::CompressionKind compressionKind; // 压缩方式
std::string fileCreateConfig; // 传递给底层 FileSystem 的创建选项
uint32_t windowMinReadBatchRows; // Window 算子读取 batch 最小行数
};
关键计算方法:
// 返回当前 bit offset 对应的 spill level
int32_t spillLevel(uint8_t startBitOffset) const;
// 判断是否已超过最大 spill 层数
bool exceedSpillLevelLimit(uint8_t startBitOffset) const;
递归 spill 时,每下一层 startPartitionBit 向右移 numPartitionBits 位,
spillLevel 就是 (startBitOffset - config.startPartitionBit) / numPartitionBits。
2.2 SpillStats — 统计层
文件:velox/exec/SpillStats.h
所有字段都是 std::atomic,线程安全,支持跨线程并发更新。
struct SpillStats {
// 计数
std::atomic_uint64_t spillRuns; // spill 触发次数
std::atomic_uint64_t spilledRows; // spill 的行数
std::atomic_uint64_t spilledInputBytes; // spill 前内存中的原始字节数
std::atomic_uint64_t spilledBytes; // 写到磁盘的字节数(含压缩)
std::atomic_uint32_t spilledPartitions; // spill 的分区数
std::atomic_uint64_t spilledFiles; // 生成的 spill 文件数
std::atomic_uint64_t spillMaxLevelExceededCount; // 超过最大 spill 层数次数
// 写路径时间(纳秒)
std::atomic_uint64_t spillFillTimeNanos; // 填充 SpillRun(哈希分区)
std::atomic_uint64_t spillSortTimeNanos; // 排序
std::atomic_uint64_t spillExtractVectorTimeNanos; // 从 RowContainer 提取 RowVector
std::atomic_uint64_t spillSerializationTimeNanos; // PrestoPage 序列化
std::atomic_uint64_t spillFlushTimeNanos; // 压缩 + 拷贝到写缓冲
std::atomic_uint64_t spillWriteTimeNanos; // 写到文件系统
std::atomic_uint64_t spillWrites; // write() 调用次数
// 读路径时间(纳秒)
std::atomic_uint64_t spillReadBytes;
std::atomic_uint64_t spillReads;
std::atomic_uint64_t spillReadTimeNanos;
std::atomic_uint64_t spillDeserializationTimeNanos;
IoStats ioStats;
};
每个算子持有自己的 SpillStats 对象(通过 spillStats_.get() 传递),同时每次更新都调用对应的 updateGlobal*() 函数汇总到进程级全局统计(globalSpillStats()),供 query profile 使用。
2.3 SpillFile — 文件 I/O 层
文件:velox/exec/SpillFile.h/.cpp
SpillFileInfo — 元数据
struct SpillFileInfo {
uint32_t id; // 单调递增的文件 ID
RowTypePtr type; // 数据类型
std::string path; // 文件路径
uint64_t size; // 文件字节数
std::vector<SpillSortKey> sortingKeys; // 排序键(无序 spill 则为空)
common::CompressionKind compressionKind; // 压缩方式
};
using SpillFiles = std::vector<SpillFileInfo>;
SpillWriter — 写路径
继承 SerializedPageFileWriter(底层用 PrestoPage 格式)。
构造参数:
type, sortingKeys : 数据类型与排序键
compressionKind : 压缩方式
pathPrefix : 文件路径前缀,实际路径为 prefix + "_" + fileId
targetFileSize : 单文件目标大小
writeBufferSize : 写缓冲区大小
updateAndCheckSpillLimitCb : quota 回调
关键流程:
write(rows, ranges) → 序列化 RowVector 片段,写入缓冲
finishFile() → 强制 flush + 关闭当前文件,记录 SpillFileInfo
finish() → 收尾所有文件,返回 SpillFiles 列表
文件切分逻辑(父类 SerializedPageFileWriter 实现):
每次 write 后,若当前文件大小 >= targetFileSize → 自动 finishFile()
每次 updateFileStats() 回调会调用 updateAndCheckSpillLimitCb,若全局 spill 字节超过 query limit 则抛 VELOX_SPILL_LIMIT_EXCEEDED 异常。
SpillReadFile — 读路径
create(fileInfo, bufferSize, pool, stats) → 打开文件,设置读缓冲
nextBatch(batch) → 反序列化下一 batch RowVector
→ 若文件系统支持 async IO,双缓冲预读
2.4 SpillPartitionId — 层级分区标识
文件:velox/exec/Spill.h:277
这是支持递归 spill 的核心数据结构。用单个 uint32_t 编码完整的层级路径:
Bit 布局(低位在前):
bits [2:0] : Level 1 分区号(最多 8 个)
bits [5:3] : Level 2 分区号
bits [8:6] : Level 3 分区号
bits [11:9] : Level 4 分区号
bits [28:12] : 未使用
bits [31:29] : 当前所在层级(0-3)
常量:
kMaxSpillLevel = 3 (最多 4 层,层级编号 0~3)
kMaxPartitionBits = 3 (每层最多 8 分区)
构造方式:
SpillPartitionId(uint32_t partitionNumber); // 创建 level 0 分区
SpillPartitionId(SpillPartitionId parent, uint32_t n); // 创建子分区
有序性(operator<):
排序规则:
1. 相同父节点的子分区按分区号从小到大排列
2. 不同层级时,按层级路径字典序比较
3. 浅层节点排在其子节点前面(DFS 前序)
示例:p_0 < p_1 < p_2_0 < p_2_1 < p_2_2 < p_3
这个有序性使 SpillPartitionSet(std::map<SpillPartitionId, ...>)能以 DFS 前序遍历,确保处理完父 partition 后再处理其子 partition(递归 spill 场景)。
2.5 SpillState — 分区状态管理
文件:velox/exec/Spill.h:581, Spill.cpp:131
每个 Spiller 持有一个 SpillState,负责管理所有分区的写入器:
class SpillState {
// 已 spill 分区的 ID 集合(用于快速判断某 partition 是否已开始 spill)
SpillPartitionIdSet spilledPartitionIdSet_;
// partition -> SpillWriter 的映射,folly::Synchronized 保证线程安全
// 不同 partition 可以并发写入(不同线程写各自的 partition)
folly::Synchronized<SpillPartitionWriterSet> partitionWriters_;
};
**核心方法 appendToPartition**:
uint64_t SpillState::appendToPartition(
const SpillPartitionId& id, const RowVectorPtr& rows) {
// 1. 惰性获取 spill 目录(每次调用都通过 callback,目录可能在首次写时才创建)
auto spillDir = getSpillDirPathCb_();
// 2. 若该 partition 还没有 writer,在 wLock 内创建(路径格式:dir/prefix-spill-encodedId)
partitionWriters_.withWLock([&](auto& lockedWriters) {
if (!lockedWriters.contains(id)) {
lockedWriters.emplace(id, std::make_unique<SpillWriter>(
rows->type(), sortingKeys_,
compressionKind_,
fmt::format("{}/{}-spill-{}", spillDir, fileNamePrefix_, id.encodedId()),
targetFileSize_, writeBufferSize_, ...));
}
});
// 3. 验证序列化大小不超过 2GB(PrestoPage 限制)
validateSpillBytesSize(rows->estimateFlatSize());
updateSpilledInputBytes(bytes);
// 4. 写入(不持有 wLock,不同 partition 并发写互不干扰)
IndexRange range{0, rows->size()};
return partitionWriter(id)->write(rows, folly::Range<IndexRange*>(&range, 1));
}
关键设计:partitionWriters_ 只在创建 writer 时加 wLock,实际 write() 不加锁——因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突。
2.6 SpillPartition / SpillPartitionSet — 读回路径
文件:velox/exec/Spill.h:457
class SpillPartition {
SpillPartitionId id_;
SpillFiles files_; // 该 partition 的所有文件
uint64_t size_; // 总字节数
// 无序读(HashJoin 用):逐文件顺序消费,不保序
std::unique_ptr<UnorderedStreamReader<BatchStream>> createUnorderedReader(...);
// 有序读(Aggregation/OrderBy 用):N 路败者树归并,保序
std::unique_ptr<TreeOfLosers<SpillMergeStream>> createOrderedReader(...);
// 均分文件列表为 N 个 shard(并行算子使用)
std::vector<std::unique_ptr<SpillPartition>> split(int numShards);
};
// SpillPartitionSet 是按 SpillPartitionId 有序排列的 map
using SpillPartitionSet = std::map<SpillPartitionId, std::unique_ptr<SpillPartition>>;
createOrderedReader 的防 FD 耗尽机制:
若 files_.size() > numMaxMergeFiles(且 numMaxMergeFiles >= 2):
1. 将文件分组,每组最多 numMaxMergeFiles 个
2. 对每组文件创建 TreeOfLosers 做局部归并,写出中间文件
3. 递归,直到文件数 <= numMaxMergeFiles
4. 最终对剩余文件建 TreeOfLosers 返回
这避免了同时打开 O(所有文件) 个文件描述符导致的 OOM 或 FD 耗尽。
2.7 读流层:SpillMergeStream / BatchStream
文件:velox/exec/Spill.h:56
BatchStream — 无序批次流(HashJoin 用):
接口:nextBatch(batch) → bool
实现:FileSpillBatchStream - 单文件
ConcatFilesSpillBatchStream - 多文件顺序串联
SpillMergeStream — 有序合并流(Aggregation/OrderBy 用):
接口:hasData() / currentIndex() / current() / pop()
实现:FileSpillMergeStream - 单文件
ConcatFilesSpillMergeStream - 多文件顺序串联(文件间已有序)
关键方法 compare():按 sortingKeys_ 逐列比较当前行,供 TreeOfLosers 使用
关键方法 pop():消费当前行;若当前 batch 用完,调用 nextBatch() 加载下一批
TreeOfLosers<SpillMergeStream> 是 N 路归并排序的败者树实现,next() 每次返回所有流中最小的行所在的流。
3. Spiller 层详解
3.1 SpillerBase — 抽象基类
文件:velox/exec/Spiller.h:29, Spiller.cpp:31
class SpillerBase {
protected:
RowContainer* const container_; // 持有行数据的容器(nullptr 表示无容器)
folly::Executor* const executor_; // 非 null 时并发写 spill
const HashBitRange bits_; // hash 分区 bit 范围
const RowTypePtr rowType_;
const uint64_t maxSpillRunRows_; // 单 run 最大行数
const std::optional<SpillPartitionId> parentId_; // 递归 spill 时的父 ID
bool finalized_{false}; // finishSpill 后置 true,不允许再写
SpillState state_; // 管理所有 partition writer
folly::F14FastMap<SpillPartitionId, SpillRun> spillRuns_; // 内存中的行缓冲
// 纯虚方法(子类决定行为差异)
virtual bool needSort() const = 0; // 是否在写前排序
virtual std::string type() const = 0; // 类型名称(日志用)
virtual void extractSpill(folly::Range<char**>, RowVectorPtr&); // 从 container 提取
virtual void runSpill(bool lastRun); // 执行一次 spill run 的写入
};
3.2 SpillRun — 内存中的 spill 缓冲
struct SpillRun {
SpillRows rows; // char* 指针数组,指向 RowContainer 内部的行
uint64_t numBytes{0}; // 所有行的估算总字节数(通过 rowSize() 累加)
bool sorted{false}; // 是否已排序(避免同一 run 被重复排序)
void clear() {
rows.clear();
rows.shrink_to_fit(); // 主动归还内存,spill 后立即回收指针数组
numBytes = 0;
sorted = false;
}
};
SpillRun::rows 存的是 RowContainer 内部的原始行指针(char*),不拷贝行数据本身,仅持有指针。真正的行数据仍在 RowContainer 的 Arena 内存中,直到 extractSpill 时才物化为列式 RowVector。每个分区(SpillPartitionId)对应一个 SpillRun,存在 spillRuns_ map 中。
3.3 fillSpillRuns — 数据填充
这是 spill 流程的第一步,负责扫描 RowContainer、计算 hash、将行指针按分区放入对应 SpillRun。
bool SpillerBase::fillSpillRuns(RowContainerIterator* iterator) {
constexpr int32_t kHashBatchSize = 4096; // 每次从 RowContainer 取多少行
bool lastRun{false};
uint64_t totalRows{0};
for (;;) {
// 1. 从 RowContainer 按迭代器顺序取下一批行指针(迭代器原地推进)
const auto numRows =
container_->listRows(iterator, kHashBatchSize, rows.data());
if (numRows == 0) { lastRun = true; break; } // 全部取完,这是最后一轮
auto rowSet = folly::Range<char**>(rows.data(), numRows);
// 2. 计算 hash(逐列迭代,第 2 列起 combine=true,追加 XOR 到已有 hash)
if (!isSinglePartition) {
for (auto i = 0; i < container_->keyTypes().size(); ++i) {
container_->hash(i, rowSet, /*combine=*/i > 0, hashes.data());
}
}
// 3. 按 hash 将行指针分配到对应分区的 SpillRun
for (auto i = 0; i < numRows; ++i) {
const auto partitionNum =
isSinglePartition ? 0 : bits_.partition(hashes[i]);
auto& spillRun = createOrGetSpillRun(SpillPartitionId(partitionNum));
spillRun.rows.push_back(rows[i]);
spillRun.numBytes += container_->rowSize(rows[i]); // 估算行字节数
}
totalRows += numRows;
// 4. 达到 maxSpillRunRows_ 上限时暂停,进入 runSpill,之后再继续
if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) break;
}
markSeenPartitionsSpilled(); // 标记本次出现的所有 partition 为已 spill
return lastRun;
}
hash() 的多列迭代细节:第 i=0 列 combine=false,直接写 hashes[];后续列 combine=true,将新 hash 值 XOR 到已有结果上。这使多列 hash 只需遍历一次行集合,CPU 友好。
3.4 SpillRun 为何切分 / maxSpillRunRows 的作用
spill() 的主循环是 do { fillSpillRuns; runSpill; } while (!lastRun)。maxSpillRunRows_ 控制每轮 fillSpillRuns 最多处理多少行,从而将整个大 RowContainer 的 spill 分成多个小 run 依次执行。这样做有四个原因:
1. 指针数组本身的内存代价
SpillRun::rows 是 std::vector<char*>,每个元素 8 字节。若 RowContainer 有 1 亿行,不切分时仅指针数组就需要 800 MB。maxSpillRunRows_ 通常设为数十万行,指针数组只占几 MB。
2. 排序的内存代价(仅 sorted spill)
ensureSorted 对 run.rows 原地排序(TimSort / PrefixSort),比较函数需要间接访问行数据。run.rows.size() 越大,排序所需的辅助内存和 cache miss 越高。切小 run 使每次排序在 cache 中完成。
3. 分步释放,配合内存仲裁
每轮 runSpill 结束后调用 run.clear()(含 shrink_to_fit()),立即归还指针数组内存。内存仲裁器在两轮 run 之间可观察到内存已下降,有机会停止 spill(若目标内存已释放够了)。若一次性塞满所有行,仲裁器只能在整个 spill 完成后才能看到效果。
4. PrestoPage 序列化限制
SpillState::appendToPartition 调用 validateSpillBytesSize,检查单次写入的估算字节数不超过 2 GB(std::numeric_limits<int32_t>::max())——这是 PrestoPage 协议的字段宽度限制。切分 run 使每次写入远小于这个上限,避免序列化失败。
maxSpillRunRows_ 的取值:
HashBuildSpiller → spillConfig->maxSpillRunRows(默认数十万行)
AggregationInputSpiller → spillConfig->maxSpillRunRows
AggregationOutputSpiller → spillConfig->maxSpillRunRows
SortInputSpiller → spillConfig->maxSpillRunRows
SortOutputSpiller → spillConfig->maxSpillRunRows
NoRowContainerSpiller → 0(无上限,直接 appendToPartition,无 RowContainer)
3.5 行列转换:extractSpill 与 extractSpillVector
spill 的核心转换:行式 RowContainer → 列式 RowVector。这发生在 writeSpill → extractSpillVector → extractSpill 调用链中。
extractSpillVector — 控制每批大小
int64_t SpillerBase::extractSpillVector(
SpillRows& rows,
int32_t maxRows, // 上限:64 行
int64_t maxBytes, // 上限:256 KB(估算)
RowVectorPtr& spillVector,
size_t& nextBatchIndex) { // 游标,指向 rows[] 中下一个未处理的行
// 1. 确定本批行数:不超过 maxRows,且估算总字节不超过 maxBytes
int32_t numRows = 0;
int64_t bytes = 0;
auto limit = std::min(rows.size() - nextBatchIndex, (size_t)maxRows);
for (; numRows < limit; ++numRows) {
bytes += container_->rowSize(rows[nextBatchIndex + numRows]);
if (bytes > maxBytes) {
++numRows; // 超了也要包含这一行(至少保证提取 1 行,避免死循环)
break;
}
}
// 2. 真正执行行 → 列转换
extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector);
nextBatchIndex += numRows;
return bytes;
}
rowSize() 的计算:
uint32_t RowContainer::rowSize(const char* row) const {
return fixedRowSize_ +
(rowSizeOffset_
? *reinterpret_cast<const uint32_t*>(row + rowSizeOffset_)
: 0);
}
fixedRowSize_ 是编译时确定的固定部分大小;rowSizeOffset_ 指向行尾的一个 uint32_t 字段,记录该行的变长部分(out-of-line 字符串)总字节数,两者相加为该行的完整内存占用估算。
extractSpill — 行式 → 列式核心转换
void SpillerBase::extractSpill(
folly::Range<char**> rows, // 本批 char* 指针
RowVectorPtr& resultPtr) {
// 1. 分配或复用 RowVector
if (resultPtr == nullptr) {
resultPtr = BaseVector::create<RowVector>(
rowType_, rows.size(), memory::spillMemoryPool());
} else {
resultPtr->prepareForReuse(); // 复用已有分配,避免重复 malloc
resultPtr->resize(rows.size());
}
auto* result = resultPtr.get();
// 2. 提取各普通列(key + dependent 列)
for (auto i = 0; i < container_->columnTypes().size(); ++i) {
container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
}
// 3. 提取各 accumulator 列(聚合中间状态)
const auto& accumulators = container_->accumulators();
for (auto i = 0; i < accumulators.size(); ++i) {
accumulators[i].extractForSpill(
rows, result->childAt(i + container_->columnTypes().size()));
}
}
spillMemoryPool() 是专用的 spill 内存池,与算子的 operator pool 独立,防止 spill 写入过程中分配的临时内存被算子内存统计。
3.6 RowContainer 行布局与列提取原理
理解行列转换,需要先了解 RowContainer 的行内存布局:
extractColumn(rows[], numRows, columnIndex, result) 的工作:
1. 确定该列在行内的 byte offset(通过 RowColumn 对象,编译时固定)
2. 读取 null bit 填充 result 的 null bitmap
3. 对每行读取 offset 处的值:
- 固定宽度列:直接 memcpy 4/8 字节
- StringView 列:读取 ptr+size,若数据是 inline(≤12B)直接复制;
否则从 HashStringAllocator 的堆上读取实际字符串,写入 result 的 string buffer
关键性能特征:
- 同一列的 offset 在所有行中相同(
columnAt(i)编译时固定),无需解析行 header - 变长列(
StringView)在行中只存引用,实际字符串在HashStringAllocator堆上;提取时需要一次间接访问,可能有 cache miss freezeAndExecute在 spill 期间冻结HashStringAllocator,防止其他线程并发修改变长数据
3.7 Accumulator 中间态提取
对于聚合算子,extractForSpill 提取的不是最终聚合结果,而是中间状态(partial intermediate):
void Accumulator::extractForSpill(folly::Range<char**> groups, VectorPtr& result) const {
spillExtractFunction_(groups, result);
// 实际调用:aggregate->extractAccumulators(groups.data(), groups.size(), &result)
}
extractAccumulators 是 Aggregate 虚函数,每种聚合函数自己实现。例如:
sum(int64)→ 提取int64partial sumavg(double)→ 提取(sum: double, count: int64)row structarray_agg(T)→ 提取序列化的数组(out-of-line 存储,需要 copy)
恢复(unspill)时,updateRow 调用 aggregate->addSingleGroupIntermediateResults,将磁盘读回的中间状态与内存中的行合并:
void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
mergeSelection_.setValid(input.currentIndex(), true);
for (auto i = 0; i < aggregates_.size(); ++i) {
mergeArgs_[0] = input.current().childAt(i + keyChannels_.size());
// 将 spill 文件中的中间状态合并到 row 的 accumulator
aggregates_[i].function->addSingleGroupIntermediateResults(
row, mergeSelection_, mergeArgs_, false);
}
}
这实现了"流式归并聚合":相同 key 的行按排好的顺序依次到来,逐行 merge accumulator,避免将整个 group 缓存在内存中。
3.8 writeSpill 内的 Batch 切分(kTargetBatchRows / kTargetBatchBytes)
即使在同一个 SpillRun 内部,writeSpill 也不是一次性将所有行物化为 RowVector,而是每次 extractSpillVector 只处理 最多 64 行或 256 KB(whichever first):
SpillRun.rows = [ptr0, ptr1, ..., ptr_N] // N 可能有数十万
↓
writeSpill 循环:
batch 0: [ptr0 .. ptr63] → extractSpill → RowVector(64行) → appendToPartition
batch 1: [ptr64 .. ptr127] → extractSpill → RowVector(64行) → appendToPartition
...
batch K: [ptr_last_start .. ptr_N] → extractSpill → RowVector(M行) → appendToPartition
为什么要切成 64 行/256KB 的小 batch?
① RowVector 内存峰值控制
每次 extractSpill 分配(或复用)一个 RowVector,其内存由 spillMemoryPool 分配。单批 64 行的 RowVector 内存占用可控(通常几 KB 到几百 KB),而 prepareForReuse() 使相邻批次复用同一块内存,峰值内存 ≈ 单批大小,而非整个 run 大小。
② 与 SpillWriter 写缓冲的配合
SpillWriter 内部有 writeBufferSize 的写缓冲,每次 appendToPartition → write() 将 PrestoPage 数据追加到缓冲;缓冲满时才真正调用文件系统 write()(刷盘)。小批次使每次 write() 调用的数据大小贴近 writeBufferSize,I/O 效率高。
③ "至少 1 行"的保证防止死循环
if (bytes > maxBytes) {
++numRows; // 即使超了也把这一行纳入
break;
}
若某行本身 > 256 KB(如包含超大字符串),不加这一行就会无限循环(numRows=0 → written 不推进)。++numRows 确保每次循环至少推进一行。
④ 64 行是何依据?
64 行是经验值,使得:
- 一个 RowVector 的列式存储(
FlatVector<int64>= 64×8=512B)可以完全装入 L1 cache extractColumn的内层循环(每列 64 次写入)具有良好的向量化潜力- 与 SpillMergeStream 归并读取的 batch 大小相配(读时也是小批量迭代,减少归并树的重平衡次数)
3.9 runSpill — 并发写入
void SpillerBase::runSpill(bool lastRun) {
spillStats_->spillRuns.fetch_add(1);
std::vector<std::shared_ptr<AsyncSource<SpillStatus>>> writes;
for (const auto& [id, spillRun] : spillRuns_) {
if (spillRun.rows.empty()) continue;
writes.push_back(
memory::createAsyncMemoryReclaimTask<SpillStatus>(
[partitionId = id, this]() { return writeSpill(partitionId); }));
// 第 2 个及之后的任务提交到 executor(后台线程),第 1 个在当前线程执行
if ((writes.size() > 1) && executor_ != nullptr) {
executor_->add([source = writes.back()]() { source->prepare(); });
}
}
// 折叠守卫:无论成功失败,确保所有后台任务都被 move()(drain)
auto sync = folly::makeGuard([&]() {
for (auto& write : writes) { try { write->move(); } catch (...) {} }
});
// 主线程阻塞等待所有任务(含第 1 个本线程任务)
for (auto& write : writes) {
results.push_back(write->move());
}
// 检查错误 + 清空 SpillRun(归还指针数组内存)
for (auto& result : results) {
if (result->error) std::rethrow_exception(result->error);
spillRuns_.at(result->partitionId).clear();
if (needSort()) {
state_.finishFile(result->partitionId); // sorted spill:每 run 一个文件
}
}
}
并发策略细节:
- 第 1 个分区的写任务在调用线程上通过
write->move()触发(AsyncSource::prepare()从未被调用,move()内联执行) - 第 2+ 个分区提交到
executor_(后台 CPU 线程池)并发执行 createAsyncMemoryReclaimTask确保任务在内存仲裁器视角下是"可取消的回收任务",若系统 OOM 时仲裁器能感知到这些任务正在运行
state_.finishFile(partitionId) 调用 SpillWriter::finishFile(),flush 当前文件并记录 SpillFileInfo,下次 appendToPartition 会创建新文件。这使每个 sorted run 独立成文,后续归并时文件间已有序。
3.10 finishSpill — 收尾
void SpillerBase::finishSpill(SpillPartitionSet& partitionSet) {
finalizeSpill(); // finalized_ = true,之后不再允许写入
for (const auto& partitionId : state_.spilledPartitionIdSet()) {
// 递归 spill 时,要将本层 partition ID 包装进父 ID 形成完整路径
auto wholeId = parentId_.has_value()
? SpillPartitionId(parentId_.value(), partitionId.partitionNumber())
: partitionId;
// state_.finish(partitionId) 调用 SpillWriter::finish(),
// 关闭所有文件,返回 SpillFiles(文件路径、大小、排序键等元数据)
if (partitionSet.count(wholeId) == 0) {
partitionSet.emplace(wholeId,
std::make_unique<SpillPartition>(wholeId, state_.finish(partitionId)));
} else {
// 多个 Spiller(多线程 HashBuild)的文件合并到同一 partition
partitionSet[wholeId]->addFiles(state_.finish(partitionId));
}
}
}
partitionSet 是 std::map<SpillPartitionId, ...>,按 SpillPartitionId 有序排列(DFS 前序),确保递归 spill 场景下父 partition 先于子 partition 被处理。
3.11 各子类 Spiller 详解
HashBuildSpiller
needSort : false(hash join 恢复时重建 hash 表,顺序无关)
HashBitRange : [startPartitionBit, startPartitionBit + numPartitionBits)(分区存储)
targetFileSize: spillConfig->maxFileSize
maxSpillRunRows: spillConfig->maxSpillRunRows
parentId_ : restoringPartitionId_(递归时指定父 partition)
差异点——重写 extractSpill 附加 probed flag 列:
void HashBuildSpiller::extractSpill(folly::Range<char**> rows, RowVectorPtr& resultPtr) {
// 先提取普通列(key + dependent)
for (auto i = 0; i < container_->columnTypes().size(); ++i) {
container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
}
// 若是 right/full join,额外提取每行的 probed 标志位
if (spillProbeFlag_) {
auto* flagVector = result->childAt(types.size())->asFlatVector<bool>();
for (auto i = 0; i < rows.size(); ++i) {
flagVector->set(i, container_->hasProbedFlag(rows[i]));
// hasProbedFlag 直接读行内 probedFlagOffset_ 处的 bit
}
}
}
spill() 重载1(spillTriggered_=true + 全量 spill):
void HashBuildSpiller::spill() {
spillTriggered_ = true;
SpillerBase::spill(nullptr); // fillSpillRuns(nullptr) → 从头扫全部行
}
spill(partitionId, spillVector) 重载2(直通路径,已 triggered 后):
void HashBuildSpiller::spill(const SpillPartitionId& id, const RowVectorPtr& vec) {
// 无需经过 RowContainer,直接 appendToPartition
if (!state_.isPartitionSpilled(id)) state_.setPartitionSpilled(id);
state_.appendToPartition(id, vec);
}
AggregationInputSpiller
needSort : true(按所有 group key 排序,支持流式归并聚合)
HashBitRange : [startPartitionBit, startPartitionBit + numPartitionBits)
targetFileSize: uint64_t::max(文件大小不限,由 maxFileSize 配置控制)
maxSpillRunRows: spillConfig->maxSpillRunRows
sortingKeys : 所有 key 列,升序,nulls last
继承默认 extractSpill:提取 key 列 + 各 accumulator 的 extractAccumulators 中间状态。
每次 runSpill 后(needSort=true),调用 state_.finishFile(id) 关闭当前文件——每个 run 的数据写到独立的 sorted 文件。后续 createOrderedReader 用败者树对所有文件做 N 路归并,还原全局有序流。
关键约束:rows->stringAllocator().freezeAndExecute([&]() { inputSpiller_->spill(); }) 在 spill 期间冻结 HashStringAllocator,防止 accumulator 在 spill 时并发分配/释放变长内存。
AggregationOutputSpiller
needSort : false(hash 表 scan 顺序即为输出顺序,不需要重排)
HashBitRange : {}(空,单 partition 0)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows
spill(startRowIter) 从 rowIterator 指定的偏移开始扫描(支持从已输出的位置继续):
void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) {
state_.setPartitionSpilled(SpillPartitionId(0)); // 单分区
SpillerBase::spill(&startRowIter); // 从 startRowIter 开始迭代
}
重写 runSpill:在最后一次 run 后强制 finishFile:
void AggregationOutputSpiller::runSpill(bool lastRun) {
SpillerBase::runSpill(lastRun);
if (lastRun) {
for (const auto& [id, _] : spillRuns_) {
state_.finishFile(id); // 确保最后的数据落盘
}
}
}
SortInputSpiller
needSort : true(按 sort key 排序)
HashBitRange : {}(空,单 partition 0,OrderBy 无 hash 分区)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows
sortingKeys : 所有 sort 列,对应 CompareFlags(方向 + nulls 位置)
spillerStoreType_ 把 sort 列排在最前面,这样归并时只需比较前 N 列(减少 cache miss)。
spill() 直接调用 SpillerBase::spill(nullptr),无参数——从 RowContainer 头部开始扫全部行。
SortOutputSpiller
needSort : false(调用方已传入排好序的 SpillRows)
HashBitRange : {}(单 partition 0)
targetFileSize: uint64_t::max
maxSpillRunRows: spillConfig->maxSpillRunRows
接口与其他 spiller 不同——接受外部已排序的行指针数组:
void SortOutputSpiller::spill(SpillRows& rows) {
auto& run = createOrGetSpillRun(SpillPartitionId(0));
// 直接接管外部已排序的行指针(sortedRows_ 的剩余部分)
run.rows = SpillRows(rows.begin(), rows.end(), run.rows.get_allocator());
for (const auto* row : rows) run.numBytes += container_->rowSize(row);
markSeenPartitionsSpilled();
runSpill(/*lastRun=*/true); // 一次性完成,不分多 run
}
重写 runSpill:执行完后立即 finishFile,因为这是唯一一次写入。
NoRowContainerSpiller / MergeSpiller
needSort : false
container_ : nullptr(无 RowContainer)
maxSpillRunRows: 0(不使用)
无 RowContainer,通过 spill(partitionId, spillVector) 直接将外部 RowVector 写入磁盘,跳过整个 fillSpillRuns → runSpill → extractSpill 流程:
void NoRowContainerSpiller::spill(
const SpillPartitionId& id, const RowVectorPtr& spillVector) {
if (!state_.isPartitionSpilled(id)) state_.setPartitionSpilled(id);
state_.appendToPartition(id, spillVector); // 直接写磁盘
}
MergeSpiller 是 NoRowContainerSpiller 的子类,额外携带 sortingKeys(用于 hash join probe 侧的归并 spill,需要保留排序信息以便后续 probe 侧归并读取)。
各子类对比
| 子类 | needSort | HashBitRange | container_ | 特殊行为 |
|---|---|---|---|---|
HashBuildSpiller |
false | 多分区 | ✓ | 重写 extractSpill:附加 probed flag 列;双重 spill() 入口 |
AggregationInputSpiller |
true | 多分区 | ✓ | 每 run 对应独立 sorted 文件;freeze allocator |
AggregationOutputSpiller |
false | 单分区(空) | ✓ | 从迭代器偏移开始;重写 runSpill 收尾 |
SortInputSpiller |
true | 单分区(空) | ✓ | sort 列前置;每 run 独立 sorted 文件 |
SortOutputSpiller |
false | 单分区(空) | ✓ | 接受外部已排序 SpillRows;重写 runSpill |
NoRowContainerSpiller |
false | 多分区 | nullptr | 直接 appendToPartition,无 fill/extract 流程 |
MergeSpiller |
false | 多分区 | nullptr | 同上,额外保留 sortingKeys |
4. HashJoin Spill 实现
4.1 整体流程
HashJoin spill 只发生在 Build 侧(HashBuild),主要用于处理 build 表过大的情况。Probe 侧配合 build 侧,读取对应的 spill 分区数据。
4.2 HashBuildSpiller
class HashBuildSpiller : public SpillerBase {
const bool spillProbeFlag_; // 是否需要 spill probed 标记(right/full join)
bool spillTriggered_{false}; // 一旦触发 spill,后续所有输入直接 spill
// spill() 重载1:spill RowContainer 中的所有行(内存仲裁触发)
void spill() {
spillTriggered_ = true;
SpillerBase::spill(nullptr); // → fillSpillRuns → runSpill → writeSpill
}
// spill() 重载2:spill 一个 partition 的输入向量(spillTriggered 后的直通路径)
void spill(const SpillPartitionId& partitionId, const RowVectorPtr& spillVector) {
VELOX_CHECK(spillTriggered_);
if (!state_.isPartitionSpilled(partitionId)) {
state_.setPartitionSpilled(partitionId);
}
state_.appendToPartition(partitionId, spillVector);
}
};
**重写 extractSpill**:对于 right/full outer join,需要在 spill 数据中附加一列 bool probed_flag,记录该行是否已被 probe 侧命中:
void HashBuildSpiller::extractSpill(folly::Range<char**> rows, RowVectorPtr& resultPtr) {
// 提取普通列
for (auto i = 0; i < types.size(); ++i) {
container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i));
}
// 若需要 probed flag,从 RowContainer 的标记位提取
if (spillProbeFlag_) {
auto* flagVector = result->childAt(types.size())->asFlatVector<bool>();
for (auto i = 0; i < rows.size(); ++i) {
flagVector->set(i, container_->hasProbedFlag(rows[i]));
}
}
}
4.3 setupSpiller
在 HashBuild 初始化时和每次 restore 时调用:
void HashBuild::setupSpiller(SpillPartition* spillPartition) {
if (!canSpill()) return;
// 首次调用:构造 spillType_(在普通列后可能追加 bool 列用于 probed flag)
if (spillType_ == nullptr) {
spillType_ = hashJoinTableSpillType(tableType_, joinType_);
if (needProbedFlagSpill_) {
spillProbedFlagChannel_ = spillType_->size() - 1;
// 初始化 false 常量向量(build 侧 spill 时所有行未被 probe)
spillProbedFlagVector_ = std::make_shared<ConstantVector<bool>>(
pool(), 0, /*isNull=*/false, BOOLEAN(), false);
}
}
uint8_t startPartitionBit = config->startPartitionBit;
if (spillPartition != nullptr) {
// Restore 场景:从 spill 文件中读取数据重建 hash 表
spillInputReader_ = spillPartition->createUnorderedReader(
config->readBufferSize, pool(), spillStats_.get());
restoringPartitionId_ = spillPartition->id();
// 递归 spill:bit offset 右移一层
startPartitionBit = partitionBitOffset(
spillPartition->id(), startPartitionBit, numPartitionBits) + numPartitionBits;
// 若超过最大 spill 层数,禁用本轮 spill(只能尽力处理)
if (config->exceedSpillLevelLimit(startPartitionBit)) {
exceededMaxSpillLevelLimit_ = true;
return;
}
}
spiller_ = std::make_unique<HashBuildSpiller>(
joinType_, restoringPartitionId_,
table_->rows(), spillType_,
HashBitRange(startPartitionBit, startPartitionBit + config->numPartitionBits),
config, spillStats_.get());
}
4.4 ensureInputFits — 内存压力检测
每次 addInput 前调用,是 proactive spill 的入口:
void HashBuild::ensureInputFits(RowVectorPtr& input) {
if (!canSpill() || spiller_ == nullptr || spiller_->spillTriggered()) return;
// 估算本批 input 引起的内存增量
const auto tableIncrementBytes = table_->hashTableSizeIncrease(input->size());
const auto rowContainerIncrementBytes = rows->sizeIncrement(input->size(), ...);
const auto incrementBytes = rowContainerIncrementBytes + tableIncrementBytes;
// 若当前预留充足,直接返回
if (availableReservationBytes >= minReservationBytes) {
if (freeRows > input->size() && outOfLineFreeBytes >= flatBytes) return;
if (pool()->availableReservation() > 2 * incrementBytes) return;
}
// 尝试扩容 reservation(扩容本身可能触发内存仲裁,进而触发 spill)
const auto targetIncrementBytes = std::max<int64_t>(
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);
{
Operator::ReclaimableSectionGuard guard(this); // 标记为可被仲裁
if (pool()->maybeReserve(targetIncrementBytes)) {
if (spiller_->spillTriggered()) {
// 扩容过程中仲裁器触发了 spill,不需要这份预留了
pool()->release();
}
return;
}
}
// maybeReserve 失败(OOM),后续仲裁器会调用 reclaim()
}
4.5 spillInput — 正在 spill 时的直通路径
一旦 spiller_->spillTriggered() 为 true,所有新输入不再进入 hash 表,而是直接按 partition 分发到 spill 文件:
void HashBuild::spillInput(const RowVectorPtr& input) {
if (!canSpill() || spiller_ == nullptr || !spiller_->spillTriggered()
|| !activeRows_.hasSelections()) return;
// 1. 重新计算所有行的 hash(与 addInput 中的计算可能重复,代价可接受)
computeSpillPartitions(input); // 结果存入 spillPartitions_[row]
// 2. 按 partition 分组,并从 activeRows_ 中移除(防止重复插入 hash 表)
for (auto row = 0; row < numInput; ++row) {
if (!activeRows_.isValid(row)) continue;
activeRows_.setValid(row, false); // 从 active 中移除
rawSpillInputIndicesBuffers_[spillPartitions_[row]][numSpillInputs_[partition]++] = row;
}
// 3. 为非 spill 原始输入构建 spillChildVectors_(key + dependent + probed_flag)
maybeSetupSpillChildVectors(input);
// 4. 按 partition 逐批 spill
for (uint32_t partition = 0; partition < numSpillInputs_.size(); ++partition) {
spillPartition(partition, numSpillInputs_[partition],
spillInputIndicesBuffers_[partition], input);
}
}
4.6 finishHashBuild — 收尾与传递
所有 build 线程完成后,最后一个线程执行:
bool HashBuild::finishHashBuild() {
// ...(等待所有 peer 线程完成)
SpillPartitionSet spillPartitions;
// 收集所有 peer 的 spill 文件(多线程 build 的各自 spill 合并)
for (auto* build : otherBuilds) {
if (build->spiller_ != nullptr) {
build->spiller_->finishSpill(spillPartitions);
}
}
if (spiller_ != nullptr) {
spiller_->finishSpill(spillPartitions);
removeEmptyPartitions(spillPartitions);
}
// 构建 hash 表(只包含未 spill 的行)
table_->prepareJoinTable(std::move(otherTables), ...);
// 注册 tableSpillFunc:允许 join bridge 在 probe 后 spill hash 表
HashJoinTableSpillFunc tableSpillFunc;
if (canReclaim()) {
tableSpillFunc = [hashBitRange = spiller_->hashBits(), ...](auto table) {
return spillHashJoinTable(table, ...);
};
}
// 将 hash 表 + spill partitions 一起传给 join bridge
joinBridge_->setHashTable(table, std::move(spillPartitions),
joinHasNullKeys_, std::move(tableSpillFunc));
}
4.7 reclaim — 内存仲裁触发路径
内存仲裁器调用 reclaim() 强制 spill:
void HashBuild::reclaim(uint64_t, memory::MemoryReclaimer::Stats& stats) {
// 状态检查:只有 kRunning/kWaitForBuild/kYield 状态且不在非可回收区才能 spill
if (nonReclaimableState()) {
++stats.numNonReclaimableAttempts;
return;
}
// 暂停所有 peer driver(task->pauseRequested() 保证)
const std::vector<Operator*> operators =
task->findPeerOperators(pipelineId, this);
// 检查所有 peer 都可被回收
for (auto* op : operators) {
if (static_cast<HashBuild*>(op)->nonReclaimableState()) {
++stats.numNonReclaimableAttempts;
return;
}
}
// 收集所有 peer 的 spiller,统一 spill 整个 hash 表
std::vector<HashBuildSpiller*> spillers;
for (auto* op : operators) {
spillers.push_back(static_cast<HashBuild*>(op)->spiller_.get());
}
spillHashJoinTable(spillers, config); // 核心:spill 所有 peer 的 hash 表
// 清空 hash 表(已 spill,不再需要)+ 释放 memory pool reservation
for (auto* op : operators) {
static_cast<HashBuild*>(op)->table_->clear(true);
static_cast<HashBuild*>(op)->pool()->release();
}
}
spillHashJoinTable 内部对每个 spiller 调用 spiller->spill()(即 HashBuildSpiller::spill()),将所有行按 hash 分区写到磁盘。
4.8 postHashBuildProcess / setupSpillInput — 递归恢复
Build 完成后,询问 join bridge 是否有 spill 数据需要 restore:
void HashBuild::postHashBuildProcess() {
if (!canSpill()) {
setState(State::kFinish);
return;
}
// 向 join bridge 请求下一个要恢复的 spill partition
auto spillInput = joinBridge_->spillInputOrFuture(&future_);
if (!spillInput.has_value()) {
setState(State::kWaitForProbe); // 等待 probe 侧完成并传递 spill partition
return;
}
setupSpillInput(std::move(spillInput.value()));
}
void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
if (spillInput.spillPartition == nullptr) {
setState(State::kFinish); // 没有更多 spill partition,结束
return;
}
// 重置所有状态,为本次 restore 重新初始化
table_.reset(); spiller_.reset(); spillInputReader_.reset();
restoringPartitionId_.reset();
// 列顺序重置(spill 文件中列已按 key + dependent 排好)
std::iota(keyChannels_.begin(), keyChannels_.end(), 0);
std::iota(dependentChannels_.begin(), dependentChannels_.end(), keyChannels_.size());
setupTable();
setupSpiller(spillInput.spillPartition.get()); // 建立新的 spiller(可能是递归层)
processSpillInput(); // 开始读取并重建
}
4.9 processSpillInput — 从 spill 文件重建
void HashBuild::processSpillInput() {
while (spillInputReader_->nextBatch(spillInput_)) {
// 复用 addInput 路径:重新 hash + 插入 hash 表(或再次 spill 到下一层)
addInput(std::move(spillInput_));
if (!isRunning()) return;
if (shouldYield()) {
state_ = State::kYield;
future_ = ContinueFuture{folly::Unit{}};
return;
}
}
noMoreInputInternal(); // → finishHashBuild → postHashBuildProcess(循环)
}
这形成了递归循环:processSpillInput → addInput → ensureInputFits → [spill] → noMoreInput → finishHashBuild → postHashBuildProcess → setupSpillInput → processSpillInput。
4.10 Probe 侧 Spill 详解
Probe 侧(HashProbe)的 spill 涉及三条路径,通过 HashJoinBridge 与 build 侧协调:
HashBuildResult — Build/Probe 信息传递载体
struct HashBuildResult {
std::shared_ptr<BaseHashTable> table; // 刚建好的 hash 表
std::optional<SpillPartitionId> restoredPartitionId; // 非 null 表示本次是 restore 轮
SpillPartitionIdSet spillPartitionIds; // 本轮 build 中被 spill 的 partition ID
bool hasNullKeys;
};
spillPartitionIds 非空 ⟺ build 侧触发了 spill,probe 侧必须同步将对应行 spill 到磁盘。
restoredPartitionId 非 null ⟺ 本次 hash 表是从 spill 文件 restore 得来的,probe 侧需要从对应的 spill 文件中读取之前被 spill 的 probe 行。
路径 1:input spill(探测侧主路径)
触发时机:asyncWaitForHashTable 拿到 HashBuildResult 后,发现 spillPartitionIds 非空。
void HashProbe::asyncWaitForHashTable() {
auto hashBuildResult = joinBridge_->tableOrFuture(&future_);
table_ = hashBuildResult->table;
initializeResultIter();
// 1. 若本次是 restore 轮,打开对应 partition 的 probe spill 文件
maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId);
// 2. 若 build 侧有 spill partition,初始化 inputSpiller_
maybeSetupInputSpiller(hashBuildResult->spillPartitionIds);
checkMaxSpillLevel(hashBuildResult->restoredPartitionId);
}
**maybeSetupInputSpiller**:
void HashProbe::maybeSetupInputSpiller(
const SpillPartitionIdSet& spillPartitionIds) {
spillInputPartitionIds_ = spillPartitionIds;
if (spillInputPartitionIds_.empty()) return;
// 计算 bit offset(与 build 侧一致,保证 hash 分区对齐)
const auto bitOffset = partitionBitOffset(
*spillInputPartitionIds_.begin(),
spillConfig()->startPartitionBit,
spillConfig()->numPartitionBits);
// NoRowContainerSpiller:无 RowContainer,直接将外部 RowVector 写到磁盘
inputSpiller_ = std::make_unique<NoRowContainerSpiller>(
probeType_,
restoringPartitionId_, // 递归时携带父 partition ID
HashBitRange(bitOffset, bitOffset + spillConfig()->numPartitionBits),
spillConfig(),
spillStats_.get());
// 只 spill build 侧已 spill 的那些 partition(其余 partition 可以正常 probe)
inputSpiller_->setPartitionsSpilled(spillInputPartitionIds_);
// SpillPartitionFunction:计算每行属于哪个 partition(与 build 侧使用相同的 hash bit 范围)
spillPartitionFunction_ = std::make_unique<SpillPartitionFunction>(
SpillPartitionIdLookup(spillInputPartitionIds_, ...),
probeType_, keyChannels_);
}
**spillInput**(在 addInput 中调用):
void HashProbe::spillInput(RowVectorPtr& input) {
const auto numInputRows = input->size();
// 预分配各 partition 的 indices buffer(复用避免重复 malloc)
prepareInputIndicesBuffers(numInputRows,
inputSpiller_->state().spilledPartitionIdSet());
// 为每行计算所属 partition
spillPartitionFunction_->partition(*input, spillPartitions_);
vector_size_t numNonSpillingInput = 0;
for (auto row = 0; row < numInputRows; ++row) {
const auto& partitionId = spillPartitions_[row];
if (!inputSpiller_->state().isPartitionSpilled(partitionId)) {
// 该 partition 未被 build 侧 spill,行留在内存继续 probe
rawNonSpillInputIndicesBuffer_[numNonSpillingInput++] = row;
} else {
// 该 partition 已被 build 侧 spill,行需要 spill 到磁盘
rawSpillInputIndicesBuffers_.at(partitionId)
[numSpillInputs_.at(partitionId)++] = row;
}
}
if (numNonSpillingInput == numInputRows) return; // 无行需要 spill
// 强制加载所有 lazy 列(spill 序列化需要实际数据)
for (int32_t i = 0; i < input->childrenSize(); ++i) {
input->childAt(i)->loadedVector();
}
// 按 partition 分批 spill(wrap 创建 DictionaryVector,不拷贝数据,只共享 indices)
for (const auto& [partitionId, numRows] : numSpillInputs_) {
if (numRows == 0) continue;
inputSpiller_->spill(
partitionId,
wrap(numRows, spillInputIndicesBuffers_.at(partitionId), input));
}
if (numNonSpillingInput == 0) {
input = nullptr; // 所有行都 spill 了,无需继续 probe
} else {
input = wrap(numNonSpillingInput, nonSpillInputIndicesBuffer_, input);
// 只保留未 spill 的行继续 probe
}
}
关键设计:wrap 使用 DictionaryVector,不拷贝行数据,只是 indices 引用,spill 写入时才通过 loadedVector() 强制物化。这样同一个 input batch 里属于不同 partition 的行可以零拷贝地分发到各自的 spill 文件。
**noMoreInputInternal**(探测完成,收尾 input spill):
void HashProbe::noMoreInputInternal() {
noMoreSpillInput_ = true;
if (!spillInputPartitionIds_.empty()) {
// 关闭 inputSpiller_,将所有分区的 SpillFiles 收集到 inputSpillPartitionSet_
inputSpiller_->finishSpill(inputSpillPartitionSet_);
// 注:NoRowContainerSpiller 不排序,spillSortTimeNanos 永远为 0
}
// 等待所有 peer probe 线程完成(防止一个线程先收尾而另一个还在写同一 partition)
if (!operatorCtx_->task()->allPeersFinished(...)) {
setState(ProbeOperatorState::kWaitForPeers);
return;
}
lastProber_ = true; // 最后一个完成的 probe 线程负责通知 build 侧
}
路径 2:restore 轮的 probe(从 spill 文件读取 probe 行)
当 build 侧从某个 spill partition 重建 hash 表后,probe 侧必须把之前 spill 到磁盘的对应 probe 行读回重新 probe。
**maybeSetupSpillInputReader**:
void HashProbe::maybeSetupSpillInputReader(
const std::optional<SpillPartitionId>& restoredPartitionId) {
if (!restoredPartitionId.has_value()) return;
// 从 inputSpillPartitionSet_ 中取出对应 partition 的 SpillPartition
auto iter = inputSpillPartitionSet_.find(restoredPartitionId.value());
auto partition = std::move(iter->second);
restoringPartitionId_ = restoredPartitionId;
// 创建无序读取器(probe 行之间无顺序要求,顺序读即可)
spillInputReader_ = partition->createUnorderedReader(
spillConfig_->readBufferSize, pool(), spillStats_.get());
inputSpillPartitionSet_.erase(iter);
}
**addSpillInput**(在 isBlocked 的 kRunning 状态时轮询调用):
void HashProbe::addSpillInput() {
if (input_ != nullptr || noMoreSpillInput_) return;
if (!spillInputReader_->nextBatch(input_)) {
// spill 文件读完,走正常的 noMoreInput 流程
noMoreInputInternal();
return;
}
// 复用 addInput 路径(包含 spillInput 检查,支持递归 spill)
addInput(std::move(input_));
}
路径 3:output spill(内存仲裁触发 reclaim)
触发条件:probe 正在处理某批 input 时,输出过大导致内存仲裁器调用 reclaim()。
**reclaim**:
void HashProbe::reclaim(uint64_t, memory::MemoryReclaimer::Stats& stats) {
if (exceededMaxSpillLevelLimit_ || nonReclaimableState()) {
++stats.numNonReclaimableAttempts; return;
}
const auto probeOps = findPeerOperators();
bool hasMoreProbeInput = false;
for (auto* probeOp : probeOps) {
if (probeOp->nonReclaimableState()) { ++stats.numNonReclaimableAttempts; return; }
hasMoreProbeInput |= !probeOp->noMoreSpillInput_;
}
// 步骤 1:并发将所有 peer 的 pending input 产出的 output 先 spill 到磁盘
spillOutput(probeOps);
// 步骤 2:若还有 probe input 未处理,spill hash 表(释放 build 侧内存)
SpillPartitionSet spillPartitionSet;
if (hasMoreProbeInput) {
spillPartitionSet = spillHashJoinTable(
table_, restoringPartitionId_, tableSpillHashBits_,
joinNode_, spillConfig(), spillStats_.get());
}
// 步骤 3:通知各 probe 算子设置 inputSpiller_(后续输入按新 partitions spill)
const auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);
for (auto* probeOp : probeOps) {
probeOp->clearBuffers();
if (!spillPartitionSet.empty()) {
probeOp->maybeSetupInputSpiller(spillPartitionIdSet);
}
probeOp->pool()->release();
}
// 步骤 4:清空 hash 表 + 将 spill partitions 注册到 join bridge(供 build 侧后续恢复)
table_->clear(true);
if (!spillPartitionIdSet.empty()) {
joinBridge_->appendSpilledHashTablePartitions(std::move(spillPartitionSet));
}
}
spillOutput(单个算子):
void HashProbe::spillOutput() {
if (input_ == nullptr && !needLastProbe()) return;
// 用 NoRowContainerSpiller 将 pending input 产生的所有 output 写到磁盘(单一 partition 0)
auto outputSpiller = std::make_unique<NoRowContainerSpiller>(
outputType_, std::nullopt, HashBitRange{}, spillConfig(), spillStats_.get());
outputSpiller->setPartitionsSpilled({SpillPartitionId(0)});
// 调用 getOutputInternal(toSpillOutput=true):产出 output 但直接 spill 而非返回
for (;;) {
auto output = getOutputInternal(/*toSpillOutput=*/true);
if (output != nullptr) {
for (int32_t i = 0; i < output->childrenSize(); ++i) {
output->childAt(i)->loadedVector(); // 强制物化 lazy 列
}
outputSpiller->spill(SpillPartitionId(0), output);
continue;
}
if (input_ == nullptr) break; // 当前 input 批次已处理完
// right semi join 特殊:input_ 不为 null 但 output 为 null 属于正常状态
break;
}
// 收尾:将 spill 文件信息存入 spillOutputPartitionSet_(供后续读回)
outputSpiller->finishSpill(spillOutputPartitionSet_);
removeEmptyPartitions(spillOutputPartitionSet_);
}
读回阶段:下次 getOutput 调用时,maybeReadSpillOutput() 将磁盘数据读回:
bool HashProbe::maybeReadSpillOutput() {
if (spillOutputReader_ == nullptr) {
maybeSetupSpillOutputReader(); // 首次:打开 UnorderedStreamReader
}
if (spillOutputReader_ == nullptr) return false;
if (!spillOutputReader_->nextBatch(output_)) {
spillOutputReader_.reset(); // 读完,重置
return false;
}
return true; // 将读回的 output 直接返回给调用方
}
output spill 特殊注意:getOutputInternal(toSpillOutput=true) 被调用时,若发现 input_ 为 null(即没有 pending input),会提前返回 null——这保证了 reclaim 路径不会误进入"probe 完成逻辑"(如 prepareForSpillRestore),避免状态混乱。
prepareForSpillRestore — 多轮恢复的关键
当一轮 probe(含当前 hash 表和对应 spill 文件的 probe 行)全部完成后,最后一个 probe 线程(lastProber_)负责启动下一轮恢复:
void HashProbe::prepareForSpillRestore() {
// 重置本轮相关状态
noMoreSpillInput_ = false;
if (lastProber_) {
table_->clear(true); // 清空当前 hash 表,释放内存
}
table_.reset();
inputSpiller_.reset();
spillInputReader_.reset();
restoringPartitionId_.reset();
spillInputPartitionIds_.clear();
spillOutputReader_.reset();
if (!lastProber_) return;
// 通知 join bridge:probe 侧已完成,请准备下一个 spill partition
joinBridge_->probeFinished();
// bridge 内部:从 spillPartitionSets_ 取下一个 partition,均分为 N shard,
// 放入 restoringSpillShards_,build 侧 spillInputOrFuture() 返回各自的 shard
// 唤醒等待的 peer probe 线程(之前在 kWaitForPeers 状态阻塞)
wakeupPeerOperators();
lastProber_ = false;
}
Probe 侧 Spill 全生命周期
asyncWaitForHashTable()
│
├─ spillPartitionIds 非空 → maybeSetupInputSpiller()
│ 创建 NoRowContainerSpiller,标记需要 spill 的 partition 集合
│
├─ restoredPartitionId 非 null → maybeSetupSpillInputReader()
│ 打开 UnorderedStreamReader,顺序读取之前 spill 的 probe 行
│
addInput(input)
│
└─ needToSpillInput()? → spillInput(input)
按 hash 分 partition:
spilled partition → DictionaryVector wrap → inputSpiller_.spill()
non-spilled partition → 继续正常 probe(无拷贝)
│
getOutput() → ensureOutputFits()
│
├─ maybeReadSpillOutput():读取上次 spillOutput 写到磁盘的 output batch
│
└─ 内存不足 → reclaim():
1. spillOutput(probeOps) 并发 spill 各 probe 的 pending output
2. spillHashJoinTable(table) 将 hash 表 spill 到磁盘
3. maybeSetupInputSpiller(ids) 各 probe 设置新 inputSpiller_
4. appendSpilledHashTablePartitions() → 通知 bridge
│
isBlocked(kRunning) + spillInputReader_ 非 null → addSpillInput()
│ 逐 batch 从 spill 文件读取 probe 行 → addInput() → spillInput() → probe
│
noMoreInputInternal()
│
├─ inputSpiller_.finishSpill(inputSpillPartitionSet_)
└─ allPeersFinished() → lastProber_ = true
│
▼
hasMoreSpillData()? → prepareForSpillRestore()
│ joinBridge_->probeFinished()
│ wakeupPeerOperators()
▼
asyncWaitForHashTable() ← 循环(等待 build 侧重建 hash 表)
│
└─ 所有 spill partition 处理完 → setState(kFinish)
5. HashAggregation Spill 实现
5.1 两种 Spiller
HashAggregation 的 spill 逻辑集中在 GroupingSet 中,区分两个阶段:
inputSpiller_ (AggregationInputSpiller):
- 在 input 处理阶段触发(hash 表太大)
- needSort = true:按 group key 排序
- 分区:按 hash(group key)
- 目的:spill 后 hash 表清空,腾出内存继续处理输入
outputSpiller_ (AggregationOutputSpiller):
- 在 output 处理阶段触发(输出时内存不足)
- needSort = false:已是输出顺序(hash 表 scan 顺序),不需要重排
- 无分区(HashBitRange 为空)
- 目的:将剩余输出行先 spill 到磁盘,之后边读边输出
两者互斥:同一个 GroupingSet 要么用 inputSpiller_,要么用 outputSpiller_,不会同时存在。
5.2 GroupingSet::spill() — input spill
由 HashAggregation::reclaim() 或 ensureInputFits 调用:
void GroupingSet::spill() {
if (table_ == nullptr || table_->numDistinct() == 0) return;
auto* rows = table_->rows();
VELOX_CHECK_NULL(outputSpiller_);
// 首次 spill:初始化 inputSpiller_
if (inputSpiller_ == nullptr) {
// sortingKeys:按所有 group key 列排序(升序,nulls 在末尾)
const auto sortingKeys = SpillState::makeSortingKeys(
std::vector<CompareFlags>(rows->keyTypes().size()));
inputSpiller_ = std::make_unique<AggregationInputSpiller>(
rows,
makeSpillType(), // key 列 + accumulator 列的类型
HashBitRange(startPartitionBit, startPartitionBit + numPartitionBits),
sortingKeys,
spillConfig_,
spillStats_);
}
// 冻结 HashStringAllocator:spill 可能多线程执行,防止 accumulator 并发分配
rows->stringAllocator().freezeAndExecute([&]() {
inputSpiller_->spill();
});
// distinct 聚合:记录本次 spill 生成的文件数(用于后续归并去重)
if (isDistinct() && numDistinctSpillFilesPerPartition_.empty()) {
// 记录每个 partition 的文件数,文件 ID < 该数的是 distinct 文件
for (int partition = 0; partition < maxPartitions; ++partition) {
numDistinctSpillFilesPerPartition_[partition] =
inputSpiller_->state().numFinishedFiles(SpillPartitionId(partition));
}
}
// 清空 hash 表(所有行已 spill 到磁盘)
table_->clear(/*freeTable=*/true);
}
makeSpillType() 生成 spill 类型:
RowTypePtr GroupingSet::makeSpillType() const {
std::vector<TypePtr> types;
// Group key 列
for (auto& hasher : hashers_) types.push_back(hasher->type());
// Accumulator 的中间状态类型(不是最终结果类型)
for (auto& accumulator : accumulators(false)) {
types.push_back(accumulator.spillType());
}
return ROW(names, types);
}
5.3 GroupingSet::spill(rowIterator) — output spill
由 HashAggregation::reclaim() 在 output 阶段调用:
void GroupingSet::spill(const RowContainerIterator& rowIterator) {
VELOX_CHECK(!hasSpilled()); // 确保没有 inputSpiller_
auto* rows = table_->rows();
outputSpiller_ = std::make_unique<AggregationOutputSpiller>(
rows, makeSpillType(), spillConfig_, spillStats_);
rows->stringAllocator().freezeAndExecute([&]() {
outputSpiller_->spill(rowIterator); // 从 rowIterator 指定的位置开始 spill
});
table_->clear(/*freeTable=*/true);
}
AggregationOutputSpiller::spill(rowIterator) 内部:
void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) {
// 只标记单一分区(partition 0),无 hash 分区
state_.setPartitionSpilled(SpillPartitionId(0));
SpillerBase::spill(&startRowIter); // 从 rowIterator 位置开始迭代
}
AggregationOutputSpiller::runSpill 重写:每次 run 后立即 finishFile(保证每个 run 独立一个文件,便于后续归并)。
5.4 reclaim 触发路径
void HashAggregation::reclaim(uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
// 1. 先尝试 compact(清理 accumulator 的字符串存储),代价低
const auto compactedBytes = groupingSet_->compact();
if (compactedBytes >= targetBytes) return;
if (!canSpill()) { /* ... */ return; }
if (groupingSet_->hasSpilled()) {
if (isOutputProcessing) {
// 已在输出阶段且已 spill:无法再次 spill,报告无法回收
return;
}
}
if (isOutputProcessing) {
// 在输出阶段:spill 从 resultIterator_ 位置起的剩余行
groupingSet_->spill(resultIterator_);
} else {
// 在输入阶段:spill 所有行(hash 表全量 dump)
groupingSet_->spill();
}
}
5.5 getOutputWithSpill — 归并输出
当 groupingSet_->hasSpilled() 为 true 时,getOutput 路径切换为:
bool GroupingSet::getOutputWithSpill(int32_t maxOutputRows,
int32_t maxOutputBytes,
const RowVectorPtr& result) {
if (outputSpillPartition_ == -1) {
// 首次进入:初始化归并结构
mergeRows_ = std::make_unique<RowContainer>(keyTypes, ...); // 临时 RowContainer
initializeAggregates(*mergeRows_, false);
// 将所有 spill 文件信息收集到 spillPartitionSet_
inputSpiller_->finishSpill(spillPartitionSet_);
// 或:outputSpiller_->finishSpill(spillPartitionSet_);
removeEmptyPartitions(spillPartitionSet_);
prepareNextSpillPartitionOutput(); // 打开第一个 partition 的 TreeOfLosers
}
// 逐 partition 归并输出
return mergeNext(maxOutputRows, maxOutputBytes, result);
}
bool GroupingSet::prepareNextSpillPartitionOutput() {
merge_ = nullptr;
if (spillPartitionSet_.empty()) return false;
auto it = spillPartitionSet_.begin();
outputSpillPartition_ = it->first.partitionNumber();
// createOrderedReader:构建 N 路败者树(含文件数超限时的预归并)
merge_ = it->second->createOrderedReader(*spillConfig_, pool_, spillStats_);
spillPartitionSet_.erase(it);
return true;
}
5.6 mergeNextWithAggregates — 有序归并聚合
bool GroupingSet::mergeNextWithAggregates(
int32_t maxOutputRows, int32_t maxOutputBytes, const RowVectorPtr& result) {
bool nextKeyIsEqual{false};
for (;;) {
// 从败者树取下一行(带 equal 标志:与上一行 key 是否相同)
auto [stream, equal] = merge_->nextWithEquals();
if (stream == nullptr) {
// 当前 partition 耗尽:输出已积累的行
extractSpillResult(result);
if (result->size() > 0) return true;
// 换下一个 partition
if (!prepareNextSpillPartitionOutput()) return false;
continue;
}
if (!nextKeyIsEqual) {
// 新的 group key:在 mergeRows_ 中创建新行,初始化 accumulators
mergeState_ = mergeRows_->newRow();
initializeRow(*stream, mergeState_); // 复制 key 列 + 初始化 accumulator
}
// 用当前行更新 accumulator(partial merge)
updateRow(*stream, mergeState_);
nextKeyIsEqual = equal;
stream->pop();
// 达到行数/字节数上限:输出
if (!nextKeyIsEqual && (mergeRows_->numRows() >= maxOutputRows
|| mergeRowBytes() >= maxOutputBytes)) {
extractSpillResult(result);
return true;
}
}
}
关键:merge_->nextWithEquals() 利用已排序的特性,保证相同 key 的行连续出现,从而可以在流式归并的同时完成聚合(无需将整个 group 缓存在内存中)。
5.7 mergeNextWithoutAggregates — distinct 归并
对于 SELECT DISTINCT 场景,spill 文件分为两类:
- distinct 文件(文件 ID <
numDistinctSpillFilesPerPartition_):已输出过的 distinct 行 - input 文件(文件 ID >= 阈值):新输入的行
归并时:若某个相同 key 的 run 中包含来自 distinct 文件的行,则跳过该 run(因为已经输出过);否则输出该 run 中的一行。
6. OrderBy Spill 实现
6.1 两种 Spiller
OrderBy 的 spill 通过 SortBuffer 实现,同样区分两个阶段:
inputSpiller_ (SortInputSpiller):
- input 阶段:RowContainer 行数过多时触发
- needSort = true:按 sort key 排序后写入
- 无 hash 分区(HashBitRange 为空)
- 写入一个 partition(partition 0)
outputSpiller_ (SortOutputSpiller):
- output 阶段:已完成内排序,输出时内存不足
- needSort = false:外部传入已排序的 SpillRows
- 接收 sortedRows_ 中未输出部分,直接写入
6.2 SortBuffer::spillInput
void SortBuffer::spillInput() {
if (inputSpiller_ == nullptr) {
// sortingKeys:将所有 sort column + 其余列按 sort 顺序排列
const auto sortingKeys = SpillState::makeSortingKeys(sortCompareFlags_);
inputSpiller_ = std::make_unique<SortInputSpiller>(
data_.get(), // RowContainer
spillerStoreType_, // sort 列在前,其余列在后的类型
sortingKeys,
spillConfig_,
spillStats_);
}
inputSpiller_->spill(); // fillSpillRuns → sort → writeSpill → 写磁盘
data_->clear(); // 清空 RowContainer,腾出内存
}
spillerStoreType_ 的列顺序重排(构造时完成):
// sort 列排在最前面(归并时只需比较前几列)
for (auto i = 0; i < numSortKeys; ++i) {
sorted_types.push_back(input->childAt(sortColumnIndices[i])->type());
sorted_names.push_back(input->nameOf(sortColumnIndices[i]));
}
// 其余列追加在后面
for (auto i = 0; i < input->size(); ++i) {
if (!isSortKey(i)) {
sorted_types.push_back(input->childAt(i)->type());
sorted_names.push_back(input->nameOf(i));
}
}
这样 spill 文件中每行的前 N 列就是排序键,归并比较时只需读取前 N 列,节省反序列化开销。
6.3 SortBuffer::spillOutput
void SortBuffer::spillOutput() {
if (hasSpilled()) return; // 已经 spill 过
if (numOutputRows_ == sortedRows_.size()) return; // 全部输出完了
outputSpiller_ = std::make_unique<SortOutputSpiller>(
data_.get(), spillerStoreType_, spillConfig_, spillStats_);
// 将 sortedRows_ 中未输出的部分([numOutputRows_, end))直接传给 outputSpiller_
auto spillRows = SpillerBase::SpillRows(
sortedRows_.begin() + numOutputRows_, sortedRows_.end(),
*memory::spillMemoryPool());
outputSpiller_->spill(spillRows);
data_->clear();
sortedRows_.clear(); sortedRows_.shrink_to_fit();
finishSpill(); // output spill 只触发一次,立即收尾
}
SortOutputSpiller::spill(SpillRows) 内部:
void SortOutputSpiller::spill(SpillRows& rows) {
auto& spillRun = createOrGetSpillRun(SpillPartitionId(0));
spillRun.rows = SpillRows(rows.begin(), rows.end(), ...);
// 计算 numBytes
for (const auto* row : rows) spillRun.numBytes += container_->rowSize(row);
markSeenPartitionsSpilled();
runSpill(true); // rows 已按 sort key 有序,跳过排序直接写入
}
SortOutputSpiller::runSpill 重写:在父类 runSpill 后立即调用 finishFile(保证文件封尾)。
6.4 SortBuffer::finishSpill
void SortBuffer::finishSpill() {
// inputSpiller_ 和 outputSpiller_ 互斥,只有一个会被初始化
if (inputSpiller_ != nullptr) {
inputSpiller_->finishSpill(spillPartitionSet_);
} else {
outputSpiller_->finishSpill(spillPartitionSet_);
}
// OrderBy 无 hash 分区,只有一个 partition
VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
}
6.5 SortBuffer::getOutputWithSpill
void SortBuffer::prepareOutputWithSpill() {
if (spillMerger_ != nullptr) return; // 已初始化
VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
// 创建 TreeOfLosers:归并所有 spill 文件(可能包含内存中剩余行)
spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader(
*spillConfig_, pool(), spillStats_);
spillPartitionSet_.clear();
}
void SortBuffer::getOutputWithSpill() {
int32_t outputRow = 0, outputSize = 0;
while (outputRow + outputSize < output_->size()) {
SpillMergeStream* stream = spillMerger_->next();
VELOX_CHECK_NOT_NULL(stream);
spillSources_[outputSize] = &stream->current();
spillSourceRows_[outputSize] = stream->currentIndex(&isEndOfBatch);
++outputSize;
if (isEndOfBatch) {
// batch 到边界时先复制出来,再 pop 触发加载下一 batch
gatherCopy(output_.get(), outputRow, outputSize,
spillSources_, spillSourceRows_, columnMap_);
outputRow += outputSize; outputSize = 0;
}
stream->pop();
}
// 处理最后一批
if (outputSize != 0) {
gatherCopy(output_.get(), outputRow, outputSize, ...);
}
numOutputRows_ += output_->size();
}
gatherCopy 使用 columnMap_ 将 spill 文件中的列顺序(sort 列在前)映射回原始输出列顺序。
7. 关键设计决策总结
7.1 有序 vs. 无序 Spill
| 算子 | Spill 类型 | 原因 |
|---|---|---|
| HashJoin Build | 无序 | 恢复时重建 hash 表,顺序无关 |
| HashAggregation Input | 有序(按 group key) | 支持流式归并聚合,避免全量恢复 |
| HashAggregation Output | 无序(hash 表 scan 顺序) | 已是输出顺序,不需要重排 |
| OrderBy Input | 有序(按 sort key) | 支持多路归并产生有序输出 |
| OrderBy Output | 有序(已排序传入) | 同上 |
7.2 两阶段 Spill(Input/Output Spiller 分离)
Aggregation 和 OrderBy 都支持两阶段 spill:
- Input 阶段:按 hash 分区 spill,允许多次触发;之后 hash 表清空继续处理输入
- Output 阶段:无分区,只触发一次;将剩余行 spill 后用归并输出
两阶段互斥,通过 inputSpiller_/outputSpiller_ 非空判断。
7.3 内存预留机制(proactive spill)
触发链:ensureInputFits → maybeReserve → 触发内存仲裁 → reclaim → spill
关键参数:
minSpillableReservationPct 确保始终有足够"可 spill"预留空间
spillableReservationGrowthPct 每次扩容幅度,平衡内存利用率与 spill 频率
设计意图:通过 proactive reservation,在真正 OOM 之前触发 spill,避免在不可中断的关键路径上被动 OOM。
7.4 HashBuild 多线程 Spill 协调
所有 peer HashBuild 线程共享同一个 joinBridge
仲裁器触发 reclaim 时:
1. 暂停所有 peer driver(task->pause())
2. 检查所有 peer 均可回收
3. 统一 spill 所有 peer 的 hash 表
4. 清空所有 peer 的表 + 释放预留
这确保所有线程的 hash 表碎片被一次性清理,而不是只清理一个线程的。
7.5 递归 Spill(HashJoin 专属)
初次 spill:bit [32:29] 用于分区 → 8 个 spill partitions
第一次递归:bit [28:26] 用于分区 → 8 个子 partitions
最多 maxSpillLevel 层(默认 3 层)
终止条件:
1. 达到 maxSpillLevel
2. 某 partition 能完全装入内存(不再 spill)
3. partition 数据为空
7.6 probed flag Spill(Right/Full Outer Join)
对于 right join / full outer join,hash 表中每行有一个 probed 标记位,记录该行是否已与 probe 行匹配。Spill 时需要将此标记一同写入磁盘(作为额外的 bool 列),恢复时读回并设置回 RowContainer 的对应行。
8. 递归 Spill 流程图
9. C++ 编码品味:值得学习的细节
这一章从代码本身出发,梳理 Velox spill 系统在 C++ 工程实践上值得学习的具体技法。每一条都有对应的代码位置。
9.1 SCOPE_EXIT:让清理逻辑紧贴触发逻辑
// HashBuild::finishHashBuild
SCOPE_EXIT {
// 无论函数从哪个 return 路径退出,都必须唤醒等待的 peer
peers.clear();
for (auto& promise : promises) {
promise.setValue();
}
};
// SortBuffer::getOutput
SCOPE_EXIT {
pool_->release(); // 每次 getOutput 返回后都归还未用预留
};
SCOPE_EXIT(底层是 folly::ScopeGuard)比 try-catch 清晰得多:清理代码与触发它的上下文紧邻,阅读者一眼就能看到"这个函数结束时会做什么",而不需要去找 finally 块或析构函数。
对比写法:
// ❌ 反例:try-catch 把清理和逻辑分离
try {
doWork();
pool_->release(); // 容易漏掉 early return 路径
} catch (...) {
pool_->release();
throw;
}
// ✅ SCOPE_EXIT:写一次,所有路径都覆盖
SCOPE_EXIT { pool_->release(); };
doWork();
9.2 folly::makeGuard:异步任务的"排水阀"
// SpillerBase::runSpill
auto sync = folly::makeGuard([&]() {
for (auto& write : writes) {
try {
write->move(); // 强制消费所有 pending 任务
} catch (const std::exception&) {
// 清理路径不能 throw
}
}
});
// 主线程消费结果(可能 throw)
for (auto& write : writes) {
results.push_back(write->move()); // 若某个任务失败,这里 throw
}
// 若 results 循环 throw,guard 析构时仍会 drain 剩余任务
这里的 makeGuard 扮演"排水阀"角色:无论主逻辑成功还是抛异常,都确保所有后台异步任务被 move()(消费)掉,避免后台任务持有的资源(文件句柄、内存)泄漏。
关键细节:guard 内部的 catch (const std::exception&) 是有意的——清理路径本身不能 throw,第一个错误已经在 results 循环里被捕获了。
9.3 跨线程异常传递:exception_ptr 模式
多线程 spill 时,后台线程的异常无法直接传播到主线程。Velox 使用 std::exception_ptr 将异常"装箱":
// 后台线程(writeSpill)
try {
// ... 做实际工作
return std::make_unique<SpillStatus>(id, written, nullptr); // 成功
} catch (const std::exception&) {
// 捕获任何异常,装箱为 exception_ptr
return std::make_unique<SpillStatus>(id, 0, std::current_exception());
}
// 主线程(runSpill)
for (auto& result : results) {
if (result->error != nullptr) {
std::rethrow_exception(result->error); // 在主线程重新抛出
}
// ... 处理正常结果
}
结果对象 SpillStatus 的定义简洁体现了这个模式:
struct SpillStatus {
SpillPartitionId partitionId;
uint32_t rowsWritten;
std::exception_ptr error; // nullptr 表示成功,否则携带异常
};
这使得多线程错误处理与单线程代码风格一致——主线程的调用者只需检查返回值或 catch 异常,不需要感知线程。
9.4 "1 + N" 并发策略:第一个任务不切线程
for (const auto& [id, spillRun] : spillRuns_) {
writes.push_back(
memory::createAsyncMemoryReclaimTask<SpillStatus>(
[partitionId = id, this]() { return writeSpill(partitionId); }));
// 只有第 2 个及之后的任务才提交到后台 executor
if ((writes.size() > 1) && executor_ != nullptr) {
executor_->add([source = writes.back()]() { source->prepare(); });
}
}
// 主线程顺序 move(),第一个任务在这里内联执行
for (auto& write : writes) {
results.push_back(write->move()); // 若未 prepare(),move() 内联执行 lambda
}
设计精妙处:AsyncSource::move() 的语义是"如果还没执行就内联执行,已经执行就等待结果"。第一个任务从未被 prepare(),所以在 move() 调用时内联执行于主线程。
好处:
- 若只有一个 partition(常见情况),完全不产生线程切换开销
- 主线程始终有工作做,不会空等
- N 个后台线程 + 主线程并发,吞吐量最大化
HashProbe::spillOutput 对多个 probe 算子也用了完全一样的模式。
9.5 CHECK vs DCHECK:按热度分配断言代价
// 生产代码的不变式用 VELOX_CHECK(release 也执行)
VELOX_CHECK(!finalized_); // finishSpill 之后不能再写
VELOX_CHECK_EQ(numWritten, run.rows.size()); // 写出的行数必须对得上
// 热路径上的显然正确断言用 VELOX_DCHECK(只在 debug 执行)
VELOX_DCHECK_GE(partitionNum, 0); // hash 分区号必然 >= 0(inner loop)
VELOX_DCHECK(isPartitionSpilled(id)); // appendToPartition 前必须已设置
原则:
VELOX_CHECK_*用于外部可观察的不变式,违反时意味着调用方 bug 或状态异常,即使在 release 也要发现VELOX_DCHECK_*用于内部实现的显然正确断言,理论上不可能失败,加它只是为了 debug 期间辅助排查
在 fillSpillRuns 的内层循环(每行都执行一次)里用 VELOX_DCHECK 而非 VELOX_CHECK,避免 release 构建中每行多一次分支,对亿级行场景有实际性能影响。
9.6 FOLLY_LIKELY / FOLLY_UNLIKELY:把预期写进代码
// createOrGetSpillRun:绝大多数情况 spillRun 已存在
if (FOLLY_UNLIKELY(!spillRuns_.contains(id))) {
spillRuns_.emplace(id, SpillRun(*memory::spillMemoryPool()));
}
// getOutputWithSpill:几乎每行都需要复制,到达 batch 边界是例外
if (FOLLY_UNLIKELY(isEndOfBatch)) {
gatherCopy(...);
}
// 最后一批输出大多非空
if (FOLLY_LIKELY(outputSize != 0)) {
gatherCopy(...);
}
FOLLY_LIKELY/UNLIKELY 不仅是编译器分支预测提示,更是可执行的注释——它明确告诉读者"作者预期这条路径是 hot/cold 的"。这比写 // rarely happens 注释更可靠,因为注释会过期,而这行代码会随着逻辑一起维护。
9.7 TestValue::adjust:无侵入式测试注入
// SpillerBase 构造函数
TestValue::adjust("facebook::velox::exec::SpillerBase", this);
// HashBuild::addInput 入口
TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this);
// HashBuild::reclaim 入口
TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);
TestValue::adjust 在生产代码中是空操作(编译器会完全优化掉)。在测试中,可以向特定注入点注册回调,在精确时机修改状态或注入错误,无需任何条件编译。
配套的 TestScopedSpillInjection 更进一步:
// 测试代码:
TestScopedSpillInjection injection(20, ".*", 10);
// poolName 匹配 ".*" 时,testingTriggerSpill() 有 20% 概率返回 true,最多触发 10 次
// 生产代码:
if (testingTriggerSpill(pool_->name())) {
spill(); // 测试强制触发 spill
}
这使得 spill 的边界条件测试("在恰好第 3 次 addInput 时触发 spill")变得非常精确,而生产代码无任何额外开销。
9.8 NanosecondTimer:RAII 计时的最小实现
uint64_t execTimeNs{0};
{
NanosecondTimer timer(&execTimeNs); // 构造时记录起始时间
// ... 被测代码 ...
} // 析构时写入 execTimeNs
spillStats_->spillFillTimeNanos.fetch_add(execTimeNs, std::memory_order_relaxed);
几个值得注意的细节:
① 先积累,再 fetch_add:不在块内直接 fetch_add,而是先存到局部变量,块结束后一次 atomic 操作。减少 atomic 操作次数,也使代码结构更清晰。
**② memory_order_relaxed**:spill 统计不需要与其他内存操作同步,用 relaxed 避免不必要的内存屏障。
③ 一个特殊处理:
// NOTE: Always set a non-zero sort time to avoid flakiness in tests which check sort time.
updateSpillSortTime(std::max<uint64_t>(1, sortTimeNs));
即使排序耗时 0 纳秒(几乎不可能但理论上存在),也写入 1,防止测试误判"排序未发生"。这是一个小而精的防御性设计。
9.9 SpillPartitionId:把所有接口设施配齐
SpillPartitionId 是一个需要在多种容器中使用的值类型,代码为它配齐了所有标准设施:
// 1. 比较运算符:支持 std::map(有序容器)
bool operator<(const SpillPartitionId& other) const;
bool operator>(const SpillPartitionId& other) const {
return other < *this; // 复用 < 实现 >,不重复逻辑
}
bool operator==(const SpillPartitionId& other) const = default; // C++20 default
// 2. std::hash 特化:支持 folly::F14FastMap/Set
namespace std {
template <>
struct hash<SpillPartitionId> {
uint32_t operator()(const SpillPartitionId& id) const {
return std::hash<uint32_t>()(id.encodedId()); // 直接哈希底层整数,O(1)
}
};
}
// 3. fmt::formatter 特化:支持 fmt::format("{}", id)
template <>
struct fmt::formatter<SpillPartitionId> : formatter<std::string> {
auto format(SpillPartitionId s, format_context& ctx) const {
return formatter<std::string>::format(s.toString(), ctx);
}
};
// 4. operator<< :支持 LOG(INFO) << id
inline std::ostream& operator<<(std::ostream& os, SpillPartitionId id) {
return os << id.toString();
}
operator> 通过调用 operator< 实现而非独立写逻辑,是很好的习惯——减少了两个实现可能不一致的风险。
9.10 withWLock / withRLock:锁的最小化持有
// SpillState::appendToPartition
partitionWriters_.withWLock([&](auto& lockedWriters) {
// 只在创建 writer 时持有写锁
if (!lockedWriters.contains(id)) {
lockedWriters.emplace(id, std::make_unique<SpillWriter>(...));
}
}); // 锁在此释放
// 实际写入在锁外进行(不同 partition 的写入完全并发)
return partitionWriter(id)->write(rows, ...);
// testingSpilledFilePaths(只读,用 rlock)
partitionWriters_.withRLock([&](const auto& partitionWriters) {
for (const auto& [id, writer] : partitionWriters) {
// ... 读取
}
});
folly::Synchronized 的 withWLock/withRLock lambda 接口使锁的范围在代码结构上可见——lambda 的括号就是临界区的边界,不需要手动 lock_guard。
关键设计:写锁只保护"创建 writer"(初始化一次),不保护"写数据"。这是因为每个 partition 只有一个 writer,不同 partition 的写入天然不冲突,无需锁保护。最快的锁是不持有锁。
9.11 SpillRun::sorted:防止双重排序的状态位
struct SpillRun {
SpillRows rows;
uint64_t numBytes{0};
bool sorted{false}; // 是否已排序
};
void SpillerBase::ensureSorted(SpillRun& run) {
if (run.sorted || !needSort()) { // 已排序则跳过
return;
}
// ... 执行排序
run.sorted = true; // 标记,防止重复排序
}
这个 sorted 标志看起来很简单,但它解决了一个微妙的 bug 风险:如果 ensureSorted 被意外调用两次(代码重构时很容易发生),不加保护会导致正确排好序的数据被 re-sort 破坏(稳定排序会保持顺序,但非稳定排序可能打乱)。一个布尔标志,一个 guard,防御性地堵死了这条 bug 路径。
9.12 pool_->release():主动归还,而非等待回收
// SortBuffer::noMoreInput()
// Releases the unused memory reservation after processing input.
pool_->release();
// SortBuffer::getOutput()
SCOPE_EXIT {
pool_->release();
};
// HashBuild::ensureTableFits() 中扩容后
if (spiller_->spillTriggered()) {
pool()->release(); // spill 已触发,扩容的预留用不上了,主动归还
}
pool_->release() 的语义是"我承诺不再需要当前预留的超额内存"。Velox 在每个"阶段完成"的节点系统性地调用它,而不是等待 GC 或析构。
这是 push 模型而非 pull 模型:算子主动告诉内存池"我这块不要了",内存池立即可以分配给其他需要者。在多算子并发的大查询中,这种主动性使内存回转速度快得多。
9.13 succinctBytes():有信息量的日志
LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << pool_->name()
<< ", root pool: " << pool_->root()->name()
<< ", used: " << succinctBytes(pool_->usedBytes())
<< ", reservation: " << succinctBytes(pool_->reservedBytes())
<< ", root pool reservation: "
<< succinctBytes(pool_->root()->reservedBytes());
几个细节值得学习:
**① succinctBytes()**:自动选择合适单位(B/KB/MB/GB),比裸输出字节数可读性高 10 倍。
② 固定的信息层次:pool 名 → root pool 名 → 已用 → 已预留 → root 已预留。这个模板在整个 spill 系统中完全一致,运维人员看一眼就知道去哪里找信息,不需要猜测每条日志的格式。
③ WARNING 而非 ERROR:预留失败不是错误,只是触发 spill 的信号。级别选择准确。
9.14 testing* 前缀:测试可观测性的统一约定
// SpillState::testingSpilledFilePaths()
// SpillState::testingSpilledFileIds()
// SpillState::testingNonEmptySpilledPartitionIdSet()
// HashBuild::testingExceededMaxSpillLevelLimit()
// HashProbe::testingHasInputSpiller()
// HashProbe::testingExceededMaxSpillLevelLimit()
所有为测试暴露的内部状态方法,一律以 testing 前缀命名。这个约定的价值:
- 接口清晰度:任何
testing*方法,调用者立刻知道"这不是业务接口" - grep 友好:
grep 'testing[A-Z]'精确列出所有测试专用接口,代码审查时可以专门审视 - 永不
friend:Velox 明确禁止friend声明(CLAUDE.md 有规定),测试访问内部状态只能通过这类 accessor,倒逼设计者只暴露真正需要的状态,而非用friend开后门
9.15 constexpr 局部常量:让魔数有名字
void SpillerBase::fillSpillRuns(...) {
constexpr int32_t kHashBatchSize = 4096; // 每次从 RowContainer 取多少行
// ...
}
std::unique_ptr<SpillerBase::SpillStatus> SpillerBase::writeSpill(...) {
constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K
constexpr int32_t kTargetBatchRows = 64;
// ...
}
这些常量定义在函数内部而非文件级别,有两个好处:
① 最小作用域:kHashBatchSize 只在 fillSpillRuns 里有意义,定义在全局或类级别会误导读者以为它有更广泛的语义。
② 紧贴使用处:常量定义和使用之间距离最短,读者不需要跳转文件来理解这个数字的含义。
1 << 18 比 262144 更清楚地表达了"这是 2 的幂次"的意图,同时命名为 kTargetBatchBytes 说明了它是目标(上限)而非精确值。
编码品味小结
| 技法 | 核心价值 | 代表位置 |
|---|---|---|
SCOPE_EXIT |
清理与触发紧邻,所有 return 路径都覆盖 | SortBuffer::getOutput, HashBuild::finishHashBuild |
folly::makeGuard 排水阀 |
异步任务无论成败都被消费,防止资源泄漏 | SpillerBase::runSpill |
exception_ptr 跨线程传递 |
后台线程异常装箱,主线程重抛,调用方无感知 | SpillerBase::writeSpill / runSpill |
| "1 + N" 并发 | 首任务内联,后续并发,单 partition 时零开销 | SpillerBase::runSpill |
DCHECK 分热路径 |
release 不付 inner loop 断言代价 | fillSpillRuns inner loop |
FOLLY_LIKELY/UNLIKELY |
可执行的"热度注释",不会过期 | createOrGetSpillRun, getOutputWithSpill |
TestValue::adjust |
无侵入注入,生产零开销 | 所有关键函数入口 |
NanosecondTimer RAII |
单行计时,早 return 安全 | fillSpillRuns, ensureSorted |
| 配齐值类型接口 | <, >, ==, hash, formatter, << 一起提供 |
SpillPartitionId |
withWLock 最小临界区 |
锁只保护初始化,写操作在锁外 | SpillState::appendToPartition |
sorted 防御位 |
防双重排序,用状态堵死隐患 | SpillRun / ensureSorted |
pool_->release() 主动归还 |
push 模型快速回转,而非等待析构 | noMoreInput, getOutput |
succinctBytes() 统一日志格式 |
固定信息层次,可读,可预期 | 所有 LOG(WARNING) |
testing* 前缀约定 |
测试接口一眼可辨,禁用 friend 倒逼最小暴露 | 所有 testing* 方法 |
constexpr 局部常量 |
最小作用域,紧贴使用处,有名字 | kHashBatchSize, kTargetBatchRows |
10. 架构设计解读:优雅背后的工程思考
Velox spill 系统的代码量约 12,000 行,但它的设计并不复杂——真正值得品味的,是每一处决策背后的约束权衡和系统性思考。这一章从"为什么要这样设计"出发,梳理其核心设计哲学。
10.1 最根本的约束:OOM 不能是答案
数据库引擎处理大数据时,内存不可能无限大。任何算子都需要回答这个问题:当数据超过可用内存时,怎么办?
最简单的答案是抛出 OOM 异常。但这会让用户的查询白白失败,代价极高。Velox 的选择是:任何时候都不能因为数据量大而失败,必须能完成查询(在 spill 开启的情况下)。
这一承诺对系统设计造成了深远影响:
- 不能假设数据能放进内存——每一个数据结构都要有"放不下时的出路"
- 不能只支持一次 spill——必须支持递归 spill(spill 后恢复,恢复时再次 OOM,再次 spill)
- 不能只在"好时机"才 spill——必须能在任意时刻被打断(内存仲裁 reclaim)
这三点决定了架构的基本形态。
10.2 分层:每层只管自己的事
整个 spill 系统被分成 5 层,每层接口极简:
算子层 → 决定「何时」和「触发哪种」spill
Spiller层 → 决定「如何」把 RowContainer 内容写到磁盘(行→列,分区,排序)
SpillState → 决定「写到哪个文件」
SpillFile → 决定「如何」序列化/反序列化(PrestoPage 格式)
SpillConfig→ 决定「用什么参数」
这种分层带来的核心好处不是"解耦"这么虚的东西,而是每一层可以独立演进:
- 要换序列化格式?只改 SpillFile,上面所有层不感知
- 要支持新算子 spill?只实现新的 Spiller 子类,底层文件 I/O 不变
- 要调整内存触发策略?只改算子层的
ensureInputFits,文件写入逻辑不变
这在一个长期演进的系统里价值巨大。HashJoin、HashAggregation、OrderBy 三类算子的 spill 逻辑差异极大,但它们复用了同一套文件 I/O 和统计框架,维护成本大幅降低。
10.3 分区:把"全量恢复"变成"分批恢复"
Spill 最天真的设计是:内存不够时,把所有数据写到一个文件;恢复时,把整个文件读回来。
问题在于:如果数据总量是内存的 10 倍,一次性读回仍然 OOM。
Velox 的解法是按 hash 分区:
数据 → hash(key) → partition 0, 1, 2, ..., 7(8 个分区)
恢复时:
一次只加载 partition 0 → 处理完 → 加载 partition 1 → ...
每次内存中只有 1/8 的数据
这个设计的精妙之处在于:分区和处理逻辑是解耦的。HashBuild 恢复 partition 0 时,它完全复用了正常的 addInput → insertHashTable 路径,没有任何特殊处理。"从 spill 文件恢复" 和 "从上游算子输入" 在代码路径上几乎完全一样。
SpillPartitionId 的 bit 编码设计支持这种分批恢复天然地扩展到多层递归:
Level 0:bit[31:29] = 0, 分区号存在 bit[2:0] → 8 个 partition
Level 1:bit[31:29] = 1, 分区号存在 bit[5:3] → 每个 Level 0 partition 又有 8 个子 partition
Level 2:...
当 Level 0 的某个 partition 恢复时仍然 OOM,直接用下一层 bit 范围再 spill。恢复路径完全复用,递归深度理论上无限(受 maxSpillLevel 限制)。
10.4 有序与无序的分叉:根据"读回时的需求"决定
Spill 时要不要排序,完全由读回时的需求决定,而不是写入时的方便:
| 算子 | 读回时的需求 | 要不要排序 |
|---|---|---|
| HashJoin Build | 重建 hash 表(插入顺序无关) | 不排序(节省 CPU) |
| HashAggregation | 流式归并聚合(需要相同 key 相邻) | 排序(按 group key) |
| OrderBy | 多路归并产生有序输出(需要各 run 内有序) | 排序(按 sort key) |
这个设计的价值在于:对于不需要排序的场景,完全不付排序的代价。HashJoin 是最常见的 spill 场景,它不排序节省了大量 CPU 时间。
而对于需要排序的场景(Aggregation、OrderBy),排序在 spill 时一次性完成,读回时的 N 路归并就是线性扫描,极其高效。如果不在 spill 时排序,读回时就必须全量加载再排序,峰值内存反而更高。
10.5 两阶段 Spill:尊重算子的生命周期
Aggregation 和 OrderBy 各有两个 Spiller(input/output),这不是过度设计,而是对算子生命周期的精确建模:
Input 阶段:数据持续流入 → 多次触发 spill → hash 表/排序缓冲反复清空重填
Output 阶段:数据已全部处理 → 只可能触发一次 spill → 之后只有读出
两个阶段的需求截然不同:
Input 阶段 spill(AggregationInputSpiller / SortInputSpiller):
- 需要 hash 分区(支持分批恢复)
- 需要排序(支持流式归并)
- 可以多次触发(每次清空 hash 表继续处理输入)
Output 阶段 spill(AggregationOutputSpiller / SortOutputSpiller):
- 无需分区(只触发一次,全量写出)
- 无需排序(数据已经就绪,直接写入已有顺序)
- 只触发一次(之后只读,无需再写)
用同一个 Spiller 处理两个阶段会导致代码复杂化,且无法针对性优化。分开设计使每个 Spiller 的逻辑极度简单。
10.6 SpillRun 切分:在多个约束之间找到平衡点
maxSpillRunRows 参数控制每次 fill-run 的行数,这个设计同时解决了四个互不相关的问题:
约束 1:指针数组内存(SpillRun.rows 每行 8 字节,1 亿行 = 800MB)
约束 2:排序辅助内存(大 run 排序 cache miss 严重)
约束 3:PrestoPage 2GB 序列化限制
约束 4:内存仲裁的观测粒度(每 run 之间才能看到内存下降)
一个参数,四个收益,这是典型的"一石多鸟"设计。
值得注意的是,这个设计并没有在代码里写四个 // TODO: 原因X,而是通过一个统一的机制自然地满足了所有约束。这种"机制而非策略"的思维是优雅设计的标志。
10.7 批量行列转换:峰值内存与 I/O 效率的双重优化
extractSpillVector 每次只处理最多 64 行或 256KB,这个"小批量"设计同样服务于两个目标:
峰值内存:
不切分时:SpillRun 10 万行 → 物化为一个 10 万行 RowVector → 可能数 GB
切分后: 每次 64 行 RowVector → 几百 KB → prepareForReuse 复用同一块内存
峰值 = 单批大小,与 run 总大小无关
I/O 效率:
SpillWriter 有 writeBufferSize 写缓冲(默认几 MB)
每次 appendToPartition 追加到缓冲,满了才刷盘
64 行 ≈ 几十 KB → 多次 append 后缓冲满 → 一次大 write()
这比每 10 万行一次大 write() 的 I/O 效率相当,但内存峰值小得多
64 这个数字也不是随意的——它使得每列 FlatVector 的数据(64 × sizeof(int64) = 512B)恰好能放入 L1 cache,extractColumn 的内层循环具有最优的 cache 利用率。
10.8 主动预留 vs. 被动仲裁:两道防线
Velox 的内存触发有两条路径,共同构成防 OOM 的双重保险:
防线 1 — 主动预留(proactive):
ensureInputFits → maybeReserve(targetIncrementBytes)
├─ 成功:继续处理(预留了足够空间)
└─ 失败:[内存仲裁在此可能已触发 reclaim]
防线 2 — 被动仲裁(reactive):
内存仲裁器观测到系统内存紧张 → 调用 reclaim()
└─ 强制 spill 当前算子
两者的分工:
- 防线 1 发生在算子自己的处理循环中,知道"我即将需要多少内存",可以精确预留
- 防线 2 发生在任意时刻,无需算子配合,是最后的兜底手段
ReclaimableSectionGuard 是连接两者的桥梁——算子在调用 maybeReserve 前设置它,告诉仲裁器"现在可以来抢我的内存"。这样即使 maybeReserve 本身触发了全局内存整理,算子也能安全地被 spill。
10.9 NoRowContainerSpiller:让接口适应现实,而非让现实适应接口
HashProbe 侧 spill 时,行数据在 RowVectorPtr 里,根本不经过 RowContainer。如果强制所有 spill 都走 RowContainer → extractSpill → 磁盘 的路径,就需要先把 RowVector 插入 RowContainer 再提取出来,白白多一次转换。
NoRowContainerSpiller 的存在体现了一个原则:抽象应该覆盖真实场景,而不是把真实场景削足适履。它直接接受 RowVectorPtr,跳过整个 fill/extract 流程,只保留写文件这一核心功能。
同样的思路也体现在 SortOutputSpiller 上:sortedRows_ 已经是排好序的行指针数组,如果再经过 fillSpillRuns → 按 hash 分区 → sort 这个流程,不仅多余还会破坏已有的顺序。它重写接口,接受外部已排序的 SpillRows,直接写入。
10.10 HashJoinBridge:异步协调的最小化接口
Build 和 Probe 在不同线程运行,需要协调三件事:
- Build 完成 → 通知 Probe 开始
- Probe 完成 → 通知 Build 恢复 spill partition
- Build/Probe 任意一方 spill → 另一方配合
HashJoinBridge 用三个接口覆盖了全部场景:
setHashTable(table, spillPartitionSet) // Build → Probe:我建好了,这些 partition 被 spill 了
probeFinished() // Probe → Build:我探测完了,请处理下一个 spill partition
appendSpilledHashTablePartitions(set) // Probe reclaim → Build:我 spill 了你的 hash 表
最优雅的设计是:这三个接口覆盖了所有递归层级,无论是第 0 层还是第 3 层递归,调用的接口完全一样。递归深度的增加不需要任何新的协调原语。
对比一下如果没有这种设计,每一层递归都要单独管理 future/promise 对,代码复杂度会指数级增长。
10.11 DictionaryVector wrap:零拷贝分区
Probe 侧 spillInput 需要将一个 batch 的行按 partition 分发到多个 spill 文件。最直接的做法是按 partition 拷贝行数据,但这意味着每行数据被读一次、写一次——对于宽行(含字符串列)代价极高。
Velox 的解法是 DictionaryVector:
原始 input:[row0, row1, row2, row3, row4]
partition 0 的 indices buffer:[1, 3]
partition 1 的 indices buffer:[0, 2, 4]
wrap(2, partition0_buffer, input) → DictionaryVector{indices=[1,3], base=input}
↓ 序列化时才真正读取 row1, row3 的数据
这使得"分区"这个操作的代价只是分配 indices buffer 和一次 hash 计算,不涉及任何数据拷贝。真正的 I/O 推迟到 appendToPartition → SpillWriter::write → 序列化,此时数据只被读一次、写一次。
这是"懒求值"(lazy evaluation)思想在 spill 路径中的具体应用。
10.12 小结:设计模式归纳
回顾整个 spill 系统,可以归纳出几个贯穿始终的设计模式:
① 约束驱动设计(Constraint-Driven Design) 每个设计决策都能追溯到一个具体约束:SpillRun 切分 → PrestoPage 2GB 限制;64 行批次 → L1 cache;DictionaryVector → 零拷贝。没有无目的的"为了灵活性而灵活性"。
② 机制 vs. 策略分离(Mechanism vs. Policy Separation)
SpillerBase 提供机制(fill/sort/write 的流程),各子类通过 needSort()、重写 extractSpill() 等钩子注入策略。算子层决定"何时 spill",Spiller 层决定"如何 spill",文件层决定"如何存储"。
③ 路径复用(Path Reuse)
HashBuild 恢复 spill partition 时走的是和正常处理输入完全相同的 addInput 路径。HashProbe restore 轮也走同一套 addInput → spillInput → probe 路径。不为"恢复"写特殊逻辑,而是让恢复成为正常路径的自然一部分。这减少了测试覆盖的难度,也降低了特殊路径引入 bug 的风险。
④ 懒求值(Lazy Evaluation)
数据尽可能晚才物化:行指针不拷贝数据(SpillRun)→ DictionaryVector 不拷贝行(probe 分区)→ lazy 列在 spill 前才 loadedVector()。每一层都只在真正需要时才付出代价。
⑤ 尊重生命周期(Lifecycle Awareness) Input Spiller 和 Output Spiller 的分离不是技术必要性,而是对算子语义的尊重:input 阶段和 output 阶段的资源需求、触发频率、分区需求都不同,强行合并会产生大量条件分支,拆开则各自极简。
这套设计的优雅,不在于它的聪明,而在于它的诚实——每个决策都清楚地知道自己在解决什么约束,不做多余的事,也不留下欠债。