Velox HashJoin
总体架构
HashJoin 由三个核心组件构成:
1 | Build Pipeline (N drivers) Probe Pipeline (M drivers) |
Bridge 是核心枢纽,负责 build/probe 之间的所有同步,以及 spill 迭代的协调。
一、HashJoinBridge
文件: velox/exec/HashJoinBridge.h / HashJoinBridge.cpp
Bridge 是 build 和 probe 两个 pipeline 之间唯一的通信通道。
关键数据成员(HashJoinBridge.h:159-203):
1 | uint32_t numBuilders_; // build driver 数量 |
核心接口:
| 方法 | 调用方 | 作用 |
|---|---|---|
setHashTable() |
Last build driver | 提交合并好的 hash table,唤醒所有等待的 probe |
tableOrFuture() |
Probe driver | 获取 hash table,不就绪则挂起等 |
probeFinished() |
Last probe driver | 通知 probe 结束,Bridge 调度下一个 spill 分区 |
spillInputOrFuture() |
Build driver (restore) | 获取分配的 spill shard,不就绪则挂起等 |
appendSpilledHashTablePartitions() |
Probe driver | 向 Bridge 追加新 spill 分区 |
二、HashBuild — Build 阶段
文件: velox/exec/HashBuild.h / HashBuild.cpp
2.1 状态机(HashBuild.h:43-57)
1 | kRunning → kWaitForBuild → kWaitForProbe → (kRunning 循环) → kFinish |
| 状态 | 含义 |
|---|---|
kRunning |
正在接收并处理输入行 |
kYield |
处理完 spill 数据后主动让出 CPU |
kWaitForBuild |
非 last driver,等待 last driver 完成 table 合并 |
kWaitForProbe |
等待 probe 完成,才能开始 restore 下一个 spill 分区 |
kFinish |
完成 |
2.2 核心流程
(1)initialize() (HashBuild.cpp:123)
1 | setupCachedHashTable() → 若命中缓存直接返回 |
(2)addInput() (HashBuild.cpp:442)
- 调用
ensureInputFits()做内存预留(不够则触发 spill) - 调用
computeSpillPartitions()计算每行属于哪个 spill 分区 - 被标记为 spilling 的分区行 → 调
spillInput()写盘 - 其余行 →
table_->groupProbe()或直接插入lookup_
(3)finishHashBuild() — 多 driver 汇聚(HashBuild.cpp:808)
这里是多 driver 并行 build 的核心:
1 | allPeersFinished()? |
关键设计: 每个 build driver 各自持有一个 HashTable,last driver 负责 merge 所有 partial table,然后一次性交给 bridge。非 last driver 在 merge 完成后由 last driver 通过 stateCleared_ 标记释放其数据所有权。
(4)postHashBuildProcess() → Spill Restore 循环(HashBuild.cpp:1029)
1 | while (true): |
三、HashProbe — Probe 阶段
文件: velox/exec/HashProbe.h / HashProbe.cpp
3.1 状态机(ProbeOperatorState.h:34-53)
1 | kWaitForBuild → kRunning → kWaitForPeers → kFinish |
3.2 核心流程
(1)asyncWaitForHashTable() (HashProbe.cpp:434)
- 调
joinBridge_->tableOrFuture()取 hash table - 若未就绪:挂起,等
setHashTable()触发 - 就绪后:
maybeSetupInputSpiller()— 若 build 有 spill 分区,则建 probe 端 spillermaybeSetupSpillInputReader()— 若当前是 restore 模式,建 spillInputReader- 若 table 为空且 join 类型允许短路 → 直接调
noMoreInput()
(2)addInput() (HashProbe.cpp:703)
1 | if (needToSpillInput()): |
(3)getOutput() / getOutputInternal()
1 | listJoinResults() // 从 resultIter_ 批量取命中行 |
(4)Build 端输出(RIGHT/FULL join)— getBuildSideOutput() (HashProbe.cpp:897)
只有 last probe driver 处理,负责输出 build 端未匹配的行:
1 | getAndIncrementUnclaimedRowContainerId() // 认领 row container |
(5)noMoreInputInternal() — Peer 同步(HashProbe.cpp:1822)
1 | allPeersFinished()? |
joinBridge_->probeFinished() 做了什么:
- 从
spillPartitionSet_弹出下一个 spill 分区 - 调
partition->split(numBuilders_)拆成 N 个 shard - 清空
buildResult_(通知 build driver 可以开始 restore) - 唤醒所有等待的 build driver
四、Spill 机制
4.1 触发路径
1 | 内存仲裁器 → HashBuild::reclaim() |
addInput() 时检测:spiller_->spillTriggered(partitionId) → 后续该分区的行直接 spill,不入 table。
4.2 整体迭代(以 1 个 spill 分区为例)
1 | Round 1 (正常): |
4.3 递归 Spill
如果 restore 时内存仍不足,会在更深的 bit range 再次 spill。SpillPartitionId 包含父分区信息,通过 maxSpillLevel 限制递归深度。
五、完整协调时序图
1 | Build Driver 0 Build Driver 1 HashJoinBridge Probe Driver 0 Probe Driver 1 |
六、关键文件索引
| 文件 | 位置 | 作用 |
|---|---|---|
velox/exec/HashBuild.h |
:43 | Build 状态机定义 |
velox/exec/HashBuild.cpp |
:808 | finishHashBuild() — 多 driver 汇聚 + merge |
velox/exec/HashBuild.cpp |
:1029 | postHashBuildProcess() — spill restore 循环 |
velox/exec/HashProbe.cpp |
:434 | asyncWaitForHashTable() — 等待 table |
velox/exec/HashProbe.cpp |
:1822 | noMoreInputInternal() — peer 同步 |
velox/exec/HashJoinBridge.cpp |
:218 | setHashTable() — build 提交 table |
velox/exec/HashJoinBridge.cpp |
:320 | probeFinished() — probe 完成,调度 restore |
velox/exec/HashJoinBridge.cpp |
:368 | spillInputOrFuture() — build 取 shard |
七、多线程编程范式
HashJoin 的并发设计是整个 Velox 执行引擎并发哲学的缩影。它几乎不使用传统的”锁 + 条件变量阻塞线程”模型,而是建立在一套协作式、future 驱动的范式上。下面逐个拆解。
7.1 协作式非阻塞算子模型(Cooperative Non-Blocking)
这是最根本的范式。算子永远不阻塞 OS 线程——没有 cv.wait(),没有 thread.join()。
算子通过 isBlocked() 返回一个”阻塞原因 + future”来表达”我要等”:
1 | // Operator.h:284 |
1 | // BlockingReason.h:32 |
当算子返回非 kNotBlocked 时,Driver 不会 park 线程,而是 return StopReason::kBlock(Driver.cpp:1379 blockDriver()),把这个 OS 线程还给 executor 线程池去跑别的 driver。等 future 被 fulfill 时,BlockingState::setResume 再把这个 driver 重新调度回线程池。
品味点: 这是典型的 M:N 协程式调度——M 个 driver 跑在 N 个 OS 线程上(N = CPU 核数)。一个 SQL 查询可能有成百上千个 driver,但绝不会创建成百上千个线程。HashJoin 在 build 等 probe、probe 等 build 时全程不占线程,这是它能在高并发下保持低线程数、避免上下文切换风暴的根本原因。对比”一个算子一个线程 + 条件变量”的朴素实现,这个设计在可伸缩性上是质变。
7.2 Promise/Future 做跨 Pipeline 握手
Build pipeline 和 probe pipeline 是两组完全独立的 driver,它们之间不共享任何执行栈。耦合只通过 Bridge 上的一组 promise/future 完成:
1 | using ContinuePromise = VeloxPromise<folly::Unit>; |
消费者(probe)登记 future:
1 | // HashJoinBridge.cpp:301 tableOrFuture() |
生产者(build)fulfill:
1 | // HashJoinBridge.cpp:218 setHashTable() |
品味点: 注意
notify()在锁外执行。fulfill promise 会触发下游 driver 重新入队调度,如果在持锁状态下做,可能引发锁顺序问题甚至死锁。”锁内只搬运状态、锁外做唤醒”是贯穿全代码的纪律。folly::Unit作为 payload 表示”这是一个纯信号,不传数据”——数据本身(table)已经放在buildResult_里,future 只负责”通知就绪”,信号与数据分离得很干净。
7.3 Last-Peer Barrier(最后一人做归约)
多个 build driver 并行建表后,需要一个 barrier 把它们汇聚。Velox 没有用 std::barrier 或计数信号量,而是用一个**”最后到达者胜出并负责归约”**的模式:
1 | // Task.cpp:2304 allPeersFinished() |
品味点: 这个模式优雅地把”同步点”和”串行归约工作的指派”合二为一。
numPeers - 1个 driver 返回 false 后挂起(不占线程),唯一返回 true 的最后那个 driver 拿到全部 peer 句柄,独自完成 table merge,再 fulfill 所有挂起者的 promise。没有专门的”协调者线程”,协调者是动态选出来的——谁最后到谁干。这避免了”主从线程”的静态分工,天然负载均衡(最后到的往往是最闲的)。
7.4 锁的粒度:临界区只搬指针,重活全在锁外
整个 HashBuild 只有一把 std::mutex mutex_(HashBuild.h:326),且绝不在持锁时做重计算。
最关键的 table merge:
1 | // HashBuild.cpp:865 锁内:仅"偷"走 peer 的 unique_ptr |
品味点: 临界区内全是
std::move指针级操作,O(1) 完成;prepareJoinTable这种可能耗时几秒的 hash table 重建放在锁外。这是”锁保护的是数据结构的不变量,不是计算过程“的教科书实践。锁的持有时间趋近于零,多 driver 间几乎无锁竞争。
7.5 无锁并行 build + 串行 merge
每个 build driver 独占自己的 table_,addInput() 全程无任何同步——因为没有共享。N 个 driver 真正并行地各建各的表。只有最后的 merge 是串行的(指派给 last driver)。
而且 merge 本身内部还能再并行:
1 | // HashBuild.cpp:933 |
品味点: 这是一个两级并行的精妙设计。第一级:N driver 无锁并行填各自的 partial table(吞吐最大化)。第二级:归约阶段虽然逻辑上串行,但
prepareJoinTable内部又用 executor 把”重算所有 partial table 的 hash 分区”这件事 fork/join 出去。本该是瓶颈的串行段,被二次并行化吃掉了。”先无共享地并行,再把不得不串行的部分内部并行化”——这是处理 reduce 阶段的最佳实践。
7.6 所有权转移用 stateCleared_ 标志位防双重处理
last driver “偷”走 peer 的 table 时,用一个 bool 标志保证幂等:
1 | // HashBuild.cpp:900 |
品味点:
unique_ptr的 move 语义 +stateCleared_标志位 +VELOX_CHECK断言,三者组合让”所有权转移”既安全又自我验证。被偷后的 peer 在close()时再次置位是幂等的。用类型系统(unique_ptr 不可复制)来表达”所有权只能有一个归属”,而不是靠注释约定——让编译器帮你查并发 bug。
7.7 内存仲裁 reclaim 与算子线程的竞争防护
Spill 可能由另一个线程(内存仲裁器)发起,它会回调算子的 reclaim()。这与算子自己的执行线程存在天然竞争。Velox 用两层防护:
- 算子线程先被暂停:仲裁前 Task 会 pause 相关 driver,保证 reclaim 时算子线程已离开 CPU。
NonReclaimableSectionGuard临界区守卫:1
2
3
4
5
6
7
8// Operator.h:489
class NonReclaimableSectionGuard {
NonReclaimableSectionGuard(Operator* op) {
op_->nonReclaimableSection_ = true; // 进入不可回收区
}
~NonReclaimableSectionGuard() { ... } // RAII 自动退出
};
tsan_atomic<bool> nonReclaimableSection_{false}; // tsan 可见,配合 ThreadSanitizer 检测
reclaim() 入口先检查状态,不安全就直接放弃本次回收:
1 | // HashBuild.cpp:1310 |
品味点: 这里有三层品味。其一,用 RAII guard 管理”不可回收区”的进出,绝不会忘记退出(异常安全)。其二,
nonReclaimableSection_用tsan_atomic包装——这个类型在普通构建是裸 bool(零开销),在 ThreadSanitizer 构建下变成带 happens-before 标注的原子量,让 TSan 能验证跨线程访问的正确性。为并发正确性的可验证性买单,但只在测试构建付费。其三,reclaim 失败时选择优雅放弃(return)而非报错——回收是尽力而为的,这次回收不了就等下次,不破坏正确性。
7.8 范式总览
| 范式 | 替代了什么朴素做法 | 收益 |
|---|---|---|
| 协作式非阻塞算子 | 一算子一线程 + cv.wait() |
M:N 调度,线程数 = 核数,无上下文切换风暴 |
| Promise/Future 握手 | 共享标志位 + 轮询/条件变量 | 跨 pipeline 解耦,信号与数据分离 |
| Last-Peer Barrier | std::barrier + 静态协调者 |
同步点与归约指派合一,动态负载均衡 |
| 锁内只搬指针 | 持锁做重计算 | 临界区趋近零,无锁竞争 |
| 无锁并行 + 内部并行 merge | 全局锁保护共享 table | 两级并行,吃掉串行瓶颈 |
| unique_ptr + 标志位转移所有权 | 裸指针 + 注释约定 | 编译器/断言验证并发安全 |
| RAII guard + tsan_atomic | 手动加解锁 + 无验证 | 异常安全 + TSan 可验证、零生产开销 |
一句话总结整体设计哲学:
把”等待”变成 future、把”线程”变成可调度的 driver、把”锁”压缩成指针搬运的瞬间、把”串行归约”二次并行化、把”并发正确性”交给类型系统和 sanitizer 去验证。HashJoin 不是”用锁把并发问题摁住”,而是”通过结构设计让大部分并发冲突根本不存在”。
八、Promise/Future 生命周期管理:如何不泄漏、不卡死
整个 HashJoin(乃至 Velox 执行层)散落着大量 promise/future。这是把双刃剑——它换来了非阻塞调度,但任何一个 promise 被悄悄丢弃、或任何一个 future 永远无人 fulfill,就会造成 driver 永久挂起(hung query),等价于资源泄漏。Velox 用一套系统化的设计来杜绝这两类故障。
先明确两个故障模式:
- 故障 A:promise 被销毁但从未 fulfill —— 等在对应 future 上的 driver 永远等不到信号。
- 故障 B:future 无人满足 —— 通常发生在 task 异常/取消路径,正常的 fulfill 点被跳过。
8.1 底层兜底:folly 的 BrokenPromise 把”挂死”转成”报错”
这是整套设计的基石。ContinuePromise/ContinueFuture 基于 folly:
1 | using ContinuePromise = VeloxPromise<folly::Unit>; |
folly 的语义是:一个 Promise 析构时若尚未 fulfill,会自动给关联的 future 投递一个 BrokenPromise 异常。也就是说——
即使代码因为 bug 把一个 promise 直接丢弃了,等待方也不会无限挂起,而是会”带着错误被唤醒”。挂死(最难排查)被降级成了报错(有栈、有日志、能恢复)。
这是一条至关重要的安全网:它把”故障 A”从”静默死锁”变成了”显式异常”。Velox 的上层设计都建立在这条兜底之上。
8.2 VeloxPromise:给每个 promise 加”出生地”标签
Velox 没有裸用 folly::Promise,而是包了一层 VeloxPromise(VeloxPromise.h:41):
1 | ~VeloxPromise() { |
构造时强制要求传入 context 字符串(如 "HashJoinBridge::tableOrFuture"、"Task::allPeersFinished {taskId}")。
品味点: BrokenPromise 兜底了”不挂死”,但还有个问题——哪个 promise 漏了?分布式执行里几百个 promise,光知道”有人 broken”无济于事。VeloxPromise 在析构时打印 context,等于给每个 promise 烙上”出生地”。一旦出现未 fulfill 的析构,日志直接告诉你是哪条协调路径漏了。**这是把”难以复现的并发泄漏”变成”一行可 grep 的 WARNING”**——可观测性优先于事后调试。注意它只是
LOG(WARNING)而非VELOX_CHECK:因为在 task 取消路径下,promise 经cancel()fulfill 后再析构是正常的,未 fulfill 析构是”可疑”而非”必错”,用 warning 而非 fail 是恰当的分寸。
8.3 提取-加锁 / fulfill-解锁:异常安全的核心 idiom
所有 fulfill 点都遵循同一个两段式结构(以 JoinBridge::cancel() 为例,JoinBridge.cpp:31):
1 | void JoinBridge::cancel() { |
这个写法防了三件事:
- 死锁 ——
setValue()会触发下游 driver 重新入队调度,可能回调进其他持锁路径。在锁外 fulfill 避免锁重入/锁顺序死锁。 - promise 泄漏 —— 一旦
std::move(promises_)执行,这批 promise 的所有权就完全转移到局部promises变量。无论后面发生什么(包括异常),这个局部 vector 析构时,要么已经setValue过,要么触发 BrokenPromise 唤醒等待方。**绝不会”留在 promises_ 里被遗忘”**。 - 重复 fulfill —— 搬走后
promises_已空,后续再调cancel()是幂等的 no-op。
品味点: “把待 fulfill 的 promise 从共享状态搬进栈上局部变量”是关键一招。共享状态里的 promise 容易被遗忘(要靠记得清空),而栈上局部变量一定会析构——C++ 的栈展开机制成了泄漏的最后防线。即使
notify()中途抛异常,剩余 promise 随栈展开析构 → BrokenPromise → 等待方带错误恢复。把生命周期管理”挂靠”到语言保证的 RAII 上,而不是靠程序员的纪律。
8.4 Task 终止:把每一个 promise 源头都抽干
故障 B 主要发生在异常/取消路径——正常的 fulfill 点(如 build 完成调 setHashTable)被跳过了。Velox 的对策是:Task::terminate()(Task.cpp:2502)系统化地遍历每一个 promise 来源并全部 fulfill:
1 | // Task::terminate() 内 |
每一类 promise——任务完成、状态变更、split 输入、JoinBridge 的 build/probe 等待、allPeersFinished 的 barrier 等待——都有对应的抽干语句。bridge->cancel() 内部又走 8.3 的 idiom 抽干 bridge 自己的 promises_。
品味点: 这是”集中式排水口“设计。promise 可以分散地在各 bridge、各 barrier 里产生,但销毁只有一个总闸——
terminate()。它像一个清单一样逐项点名每个 promise 容器。配合 8.2 的 context 日志,如果将来新增了一种 promise 却忘了在 terminate 里抽干,测试时就会看到”Unfulfilled promise … Context: XXX”的告警,直接定位遗漏。新增 promise 源 → 必须在 terminate 加一行,这条不变量靠日志自我执行。
8.5 EventCompletionNotifier:RAII 保证”即使终止过程自己抛异常也 fulfill”
终止过程本身也可能抛异常。如果 fulfill 写成裸循环,异常会跳过剩余 promise。Velox 用一个 RAII 包装兜底(Task.cpp:51):
1 | class EventCompletionNotifier { |
正常路径显式调 notify()(置 active_=false,析构不再重复);若异常跳过了显式 notify(),析构函数会补上。
品味点: 这是 8.3 idiom 的升级版——连”忘记调用 notify”或”notify 前抛异常”都用析构函数兜住。
active_标志让它幂等(显式调过就不再调)。把”必须执行的收尾动作”绑定到对象生命周期,是 C++ 异常安全的标准答案,这里用在了 promise fulfill 上。
8.6 BlockingState:future 在飞行途中,谁保证 driver 不被析构?
还有一个隐蔽的泄漏/悬垂问题:driver 返回 future 后,执行线程离开了,是什么阻止 Driver 对象(及其内存池)在 future resolve 前被销毁? 答案是 BlockingState 持有一个 shared_ptr<Driver>(Driver.h:184):
1 | class BlockingState { |
1 | // Driver.cpp:227 setResume() |
续命链条:
1 | future 回调 → 捕获 [state] → state (shared_ptr<BlockingState>) |
只要 future 没 resolve,回调 lambda 就活着,lambda 捕获的 state 就活着,state->driver_ 就活着——driver 不会被析构。future 一旦 resolve(无论成功 thenValue 还是失败 thenError),lambda 执行完毕、state 析构、driver 引用释放。
品味点: 两点品味。其一,用 shared_ptr 的引用计数把”对象生命周期”和”异步操作生命周期”绑定——异步操作没结束,对象就不会死,从根上消除悬垂。其二,
thenValue+thenError成对出现:这正是对 8.1 BrokenPromise 的接应。promise 被丢弃 → future 带 BrokenPromise resolve → 走thenError→setError→ 整个 task 优雅失败并连锁抽干其余 promise(回到 8.4)。一条 driver 的 future 异常,会触发全 task 的有序拆除,而不是孤立地挂死。错误是会传染的,而这正是设计想要的——宁可让整个 task 带错退出并清理干净,也不要留一个静默挂起的 driver。
8.7 这套设计的分层防御总览
| 层次 | 机制 | 防的是 |
|---|---|---|
| L0 语言/库兜底 | folly BrokenPromise:未 fulfill 的 promise 析构 → future 收到异常 | 把”挂死”降级为”报错” |
| L1 可观测性 | VeloxPromise 析构打印 context | 定位哪条路径漏了 promise |
| L2 fulfill idiom | 锁内 std::move 提取 → 锁外 notify,靠栈上局部变量 RAII |
死锁 / 遗忘 / 重复 fulfill |
| L3 终止排水 | Task::terminate() 逐项抽干每类 promise 源 |
异常/取消路径下的故障 B |
| L4 异常安全 | EventCompletionNotifier 析构兜底 fulfill | 终止过程自身抛异常 |
| L5 生命周期 | BlockingState 持 shared_ptr<Driver> keepalive |
future 飞行途中 driver 被析构(悬垂) |
| L6 错误传染 | thenError → setError → 连锁 terminate |
单点 future 异常 → 全 task 有序拆除 |
一句话总结 promise 生命周期哲学:
不指望程序员”记得 fulfill 每个 promise”——而是层层设防:库层让漏掉的 promise 变成异常而非挂死,包装层标注出生地便于定位,idiom 层把 promise 挂靠到栈 RAII 上,终止层设一个集中排水口点名抽干,再用 shared_ptr 把对象生命周期焊死在异步操作上。最终结果是:任何一个 promise 走失,系统都会”带着错误大声退出并清理干净”,而绝不”安静地挂死”。