Velox HashJoin

总体架构

HashJoin 由三个核心组件构成:

1
2
3
Build Pipeline (N drivers)          Probe Pipeline (M drivers)
HashBuild × N ──────bridge────── HashProbe × M
HashJoinBridge

Bridge 是核心枢纽,负责 build/probe 之间的所有同步,以及 spill 迭代的协调。


一、HashJoinBridge

文件: velox/exec/HashJoinBridge.h / HashJoinBridge.cpp

Bridge 是 build 和 probe 两个 pipeline 之间唯一的通信通道。

关键数据成员(HashJoinBridge.h:159-203):

1
2
3
4
5
6
7
uint32_t numBuilders_;                              // build driver 数量
std::optional<HashBuildResult> buildResult_; // 合并后的 hash table
std::optional<SpillPartitionId> restoringSpillPartitionId_; // 当前在 restore 的 spill 分区
std::vector<SpillPartition*> restoringSpillShards_; // 按 builder 数量拆分后的 shards
IterableSpillPartitionSet spillPartitionSet_; // 待处理的 spill 分区栈
std::vector<ContinuePromise> promises_; // 等待 table 的 probe 协程
bool probeStarted_; // probe 是否已启动

核心接口:

方法 调用方 作用
setHashTable() Last build driver 提交合并好的 hash table,唤醒所有等待的 probe
tableOrFuture() Probe driver 获取 hash table,不就绪则挂起等
probeFinished() Last probe driver 通知 probe 结束,Bridge 调度下一个 spill 分区
spillInputOrFuture() Build driver (restore) 获取分配的 spill shard,不就绪则挂起等
appendSpilledHashTablePartitions() Probe driver 向 Bridge 追加新 spill 分区

二、HashBuild — Build 阶段

文件: velox/exec/HashBuild.h / HashBuild.cpp

2.1 状态机(HashBuild.h:43-57)

1
2
kRunning → kWaitForBuild → kWaitForProbe → (kRunning 循环) → kFinish
(spill restore)
状态 含义
kRunning 正在接收并处理输入行
kYield 处理完 spill 数据后主动让出 CPU
kWaitForBuild 非 last driver,等待 last driver 完成 table 合并
kWaitForProbe 等待 probe 完成,才能开始 restore 下一个 spill 分区
kFinish 完成

2.2 核心流程

(1)initialize() (HashBuild.cpp:123)

1
2
3
setupCachedHashTable() → 若命中缓存直接返回
setupTable() → 创建 HashTable<IgnoreNull>
setupSpiller() → 若可 spill 则创建 HashBuildSpiller

(2)addInput() (HashBuild.cpp:442)

  • 调用 ensureInputFits() 做内存预留(不够则触发 spill)
  • 调用 computeSpillPartitions() 计算每行属于哪个 spill 分区
  • 被标记为 spilling 的分区行 → 调 spillInput() 写盘
  • 其余行 → table_->groupProbe() 或直接插入 lookup_

(3)finishHashBuild() — 多 driver 汇聚(HashBuild.cpp:808)

这里是多 driver 并行 build 的核心:

1
2
3
4
5
6
7
8
allPeersFinished()?
├── false(非 last driver)→ kWaitForBuild,挂起
└── true(last driver)→
① 遍历所有 peer build driver,收集各自的部分 table
② ensureTableFits(totalNumRows) // 预留内存做 merge
③ table_->prepareJoinTable(otherTables, executor) // 并行 merge
④ joinBridge_->setHashTable(mergedTable, spillPartitions)
⑤ 唤醒所有等待的 probe driver

关键设计: 每个 build driver 各自持有一个 HashTable,last driver 负责 merge 所有 partial table,然后一次性交给 bridge。非 last driver 在 merge 完成后由 last driver 通过 stateCleared_ 标记释放其数据所有权。

(4)postHashBuildProcess() → Spill Restore 循环(HashBuild.cpp:1029)

1
2
3
4
5
6
7
8
while (true):
shard = joinBridge_->spillInputOrFuture()
if (empty) → kFinish
if (waiting) → kWaitForProbe

setupSpillInput(shard) // 重置 table + spiller,创建 spillInputReader
processSpillInput() // 逐 batch 读取 → addInput() → 重新 build
noMoreInput() // → 回到 finishHashBuild(),提交新 table

三、HashProbe — Probe 阶段

文件: velox/exec/HashProbe.h / HashProbe.cpp

3.1 状态机(ProbeOperatorState.h:34-53)

1
2
kWaitForBuild → kRunning → kWaitForPeers → kFinish
↑ spill restore 时循环回来

3.2 核心流程

(1)asyncWaitForHashTable() (HashProbe.cpp:434)

  • joinBridge_->tableOrFuture() 取 hash table
  • 若未就绪:挂起,等 setHashTable() 触发
  • 就绪后:
    • maybeSetupInputSpiller() — 若 build 有 spill 分区,则建 probe 端 spiller
    • maybeSetupSpillInputReader() — 若当前是 restore 模式,建 spillInputReader
    • 若 table 为空且 join 类型允许短路 → 直接调 noMoreInput()

(2)addInput() (HashProbe.cpp:703)

1
2
3
4
5
6
if (needToSpillInput()):
spillInput() // 将属于 spill 分区的 probe 行写盘

table_->prepareForJoinProbe()
table_->joinProbe(*lookup_) // 核心:hash 查找,填充 lookup_->hits
resultIter_->reset(*lookup_) // 初始化结果迭代器

(3)getOutput() / getOutputInternal()

1
2
3
4
listJoinResults()     // 从 resultIter_ 批量取命中行
evalFilter() // 若有 join filter 则过滤
fillOutput() // 组装输出 batch(probe 列 + build 列)
// RIGHT/FULL join:标记已匹配的 build 行(setProbedFlag)

(4)Build 端输出(RIGHT/FULL join)— getBuildSideOutput() (HashProbe.cpp:897)

只有 last probe driver 处理,负责输出 build 端未匹配的行:

1
2
3
4
getAndIncrementUnclaimedRowContainerId()  // 认领 row container
table_->listNotProbedRows() // RIGHT/FULL: 未被探测的 build 行
// 或 listProbedRows() // RIGHT SEMI FILTER
// 或 listAllRows() // RIGHT SEMI PROJECT

(5)noMoreInputInternal() — Peer 同步(HashProbe.cpp:1822)

1
2
3
4
5
6
7
allPeersFinished()?
├── false(非 last)→ kWaitForPeers,等 last 唤醒
└── true(last)→
① 设 lastProber_ = true
② getBuildSideOutput() 输出 build 端行(RIGHT/FULL)
③ joinBridge_->probeFinished() // 通知 bridge
④ wakeupPeerOperators() // 唤醒等待的 peer probe driver

joinBridge_->probeFinished() 做了什么:

  • spillPartitionSet_ 弹出下一个 spill 分区
  • partition->split(numBuilders_) 拆成 N 个 shard
  • 清空 buildResult_(通知 build driver 可以开始 restore)
  • 唤醒所有等待的 build driver

四、Spill 机制

4.1 触发路径

1
2
内存仲裁器 → HashBuild::reclaim()
└── spiller_->spill(partitionId) // 把某分区的 rows 写盘

addInput() 时检测:spiller_->spillTriggered(partitionId) → 后续该分区的行直接 spill,不入 table。

4.2 整体迭代(以 1 个 spill 分区为例)

1
2
3
4
5
6
7
8
9
10
Round 1 (正常):
Build: addInput → [分区0写盘] → setHashTable(table, spillPartitions={0})
Probe: 取 table → 探测 → [分区0的 probe 行写盘] → probeFinished()

Round 2 (Restore):
Bridge: 弹出分区0,split → 2 shards(假设 2 build drivers)
Build: 各取 1 shard → setupSpillInput → processSpillInput
→ 重新 build → setHashTable(newTable, spillPartitions={})
Probe: 取 newTable → 从 probe 端 spill 读取分区0的 probe 行 → 探测
→ probeFinished()(无更多分区)→ 结束

4.3 递归 Spill

如果 restore 时内存仍不足,会在更深的 bit range 再次 spill。SpillPartitionId 包含父分区信息,通过 maxSpillLevel 限制递归深度。


五、完整协调时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Build Driver 0  Build Driver 1  HashJoinBridge  Probe Driver 0  Probe Driver 1
│ │ │ │ │
addInput() addInput() │ │ │
│ │ │ tableOrFuture() tableOrFuture()
│ │ │←─ wait ───────┤ │
│ │ │←─ wait ────────────────────────┤
noMoreInput() noMoreInput() │ │ │
│ │ │ │ │
allPeersFinished=false allPeersFinished=true │ │
→ kWaitForBuild merge tables │ │ │
│ setHashTable() ───→│ │ │
│ │ notify() ────────────→ │
│ │ │ ─────────────────────────→
│ kFinish │ │ │
│ spillInputOrFuture()→ kWaitForProbe │ │
│ │ │ addInput() addInput()
│ │ │ getOutput() getOutput()
│ │ │ noMoreInput() noMoreInput()
│ │ │ allPeersFinished=false
│ │ │ allPeersFinished=true
│ │ │←─ probeFinished() ─────────────┤
│ │ set shard │ │
│ spillInputOrFuture()→shard │ │
│ setupSpillInput() │ │ │
│ processSpillInput() │ │ │
│ setHashTable() ──────→│ │ │
│ │ notify()──→ │
│ │ │ addSpillInput() │
│ │ │ getOutput() │
│ │ │ probeFinished()─────→
│ │ spillPartitionSet empty │
│ │ → notify builders │
│ spillInputOrFuture() → empty → kFinish kFinish

六、关键文件索引

文件 位置 作用
velox/exec/HashBuild.h :43 Build 状态机定义
velox/exec/HashBuild.cpp :808 finishHashBuild() — 多 driver 汇聚 + merge
velox/exec/HashBuild.cpp :1029 postHashBuildProcess() — spill restore 循环
velox/exec/HashProbe.cpp :434 asyncWaitForHashTable() — 等待 table
velox/exec/HashProbe.cpp :1822 noMoreInputInternal() — peer 同步
velox/exec/HashJoinBridge.cpp :218 setHashTable() — build 提交 table
velox/exec/HashJoinBridge.cpp :320 probeFinished() — probe 完成,调度 restore
velox/exec/HashJoinBridge.cpp :368 spillInputOrFuture() — build 取 shard

七、多线程编程范式

HashJoin 的并发设计是整个 Velox 执行引擎并发哲学的缩影。它几乎不使用传统的”锁 + 条件变量阻塞线程”模型,而是建立在一套协作式、future 驱动的范式上。下面逐个拆解。

7.1 协作式非阻塞算子模型(Cooperative Non-Blocking)

这是最根本的范式。算子永远不阻塞 OS 线程——没有 cv.wait(),没有 thread.join()

算子通过 isBlocked() 返回一个”阻塞原因 + future”来表达”我要等”:

1
2
// Operator.h:284
virtual BlockingReason isBlocked(ContinueFuture* future) = 0;
1
2
3
4
5
// BlockingReason.h:32
kWaitForJoinBuild, // probe 等 build table
kWaitForJoinProbe, // build 等 probe 完成(spill restore)
kWaitForMemory,
kWaitForArbitration,

当算子返回非 kNotBlocked 时,Driver 不会 park 线程,而是 return StopReason::kBlockDriver.cpp:1379 blockDriver()),把这个 OS 线程还给 executor 线程池去跑别的 driver。等 future 被 fulfill 时,BlockingState::setResume 再把这个 driver 重新调度回线程池。

品味点: 这是典型的 M:N 协程式调度——M 个 driver 跑在 N 个 OS 线程上(N = CPU 核数)。一个 SQL 查询可能有成百上千个 driver,但绝不会创建成百上千个线程。HashJoin 在 build 等 probe、probe 等 build 时全程不占线程,这是它能在高并发下保持低线程数、避免上下文切换风暴的根本原因。对比”一个算子一个线程 + 条件变量”的朴素实现,这个设计在可伸缩性上是质变。

7.2 Promise/Future 做跨 Pipeline 握手

Build pipeline 和 probe pipeline 是两组完全独立的 driver,它们之间不共享任何执行栈。耦合只通过 Bridge 上的一组 promise/future 完成:

1
2
using ContinuePromise = VeloxPromise<folly::Unit>;
using ContinueFuture = folly::SemiFuture<folly::Unit>;

消费者(probe)登记 future:

1
2
3
4
5
6
// HashJoinBridge.cpp:301  tableOrFuture()
std::lock_guard<std::mutex> l(mutex_);
if (buildResult_.has_value()) return buildResult_.value(); // 已就绪,直接拿
promises_.emplace_back("HashJoinBridge::tableOrFuture");
*future = promises_.back().getSemiFuture(); // 没就绪,登记后挂起
return std::nullopt;

生产者(build)fulfill:

1
2
3
4
5
6
7
8
// HashJoinBridge.cpp:218  setHashTable()
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
buildResult_ = HashBuildResult(...);
promises = std::move(promises_); // 锁内只搬走 promise
}
notify(std::move(promises)); // 锁外 fulfill —— 关键!

品味点: 注意 notify()锁外执行。fulfill promise 会触发下游 driver 重新入队调度,如果在持锁状态下做,可能引发锁顺序问题甚至死锁。”锁内只搬运状态、锁外做唤醒”是贯穿全代码的纪律。folly::Unit 作为 payload 表示”这是一个纯信号,不传数据”——数据本身(table)已经放在 buildResult_ 里,future 只负责”通知就绪”,信号与数据分离得很干净。

7.3 Last-Peer Barrier(最后一人做归约)

多个 build driver 并行建表后,需要一个 barrier 把它们汇聚。Velox 没有用 std::barrier 或计数信号量,而是用一个**”最后到达者胜出并负责归约”**的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Task.cpp:2304  allPeersFinished()
std::lock_guard<std::timed_mutex> l(mutex_);
auto& state = barriers[planNodeId];
if (++state.numRequested == numPeers) { // 我是最后一个
peers = std::move(state.drivers); // 接管所有 peer 的句柄
promises = std::move(state.allPeersFinishedPromises);
barriers.erase(planNodeId);
return true; // 唯一返回 true 的人
}
// 不是最后一个:登记 promise 后挂起
state.drivers.push_back(callerShared);
state.allPeersFinishedPromises.emplace_back(...);
*future = state.allPeersFinishedPromises.back().getSemiFuture();
return false;

品味点: 这个模式优雅地把”同步点”和”串行归约工作的指派”合二为一。numPeers - 1 个 driver 返回 false 后挂起(不占线程),唯一返回 true 的最后那个 driver 拿到全部 peer 句柄,独自完成 table merge,再 fulfill 所有挂起者的 promise。没有专门的”协调者线程”,协调者是动态选出来的——谁最后到谁干。这避免了”主从线程”的静态分工,天然负载均衡(最后到的往往是最闲的)。

7.4 锁的粒度:临界区只搬指针,重活全在锁外

整个 HashBuild 只有一把 std::mutex mutex_HashBuild.h:326),且绝不在持锁时做重计算

最关键的 table merge:

1
2
3
4
5
6
7
8
// HashBuild.cpp:865  锁内:仅"偷"走 peer 的 unique_ptr
{
std::lock_guard<std::mutex> l(build->mutex_);
otherTables.push_back(std::move(build->table_)); // O(1) 指针搬移
spiller = std::move(build->spiller_);
}
// HashBuild.cpp:933 锁外:真正的 merge(可能耗时数秒)
table_->prepareJoinTable(std::move(otherTables), ..., executor);

品味点: 临界区内全是 std::move 指针级操作,O(1) 完成;prepareJoinTable 这种可能耗时几秒的 hash table 重建放在锁外。这是”锁保护的是数据结构的不变量,不是计算过程“的教科书实践。锁的持有时间趋近于零,多 driver 间几乎无锁竞争。

7.5 无锁并行 build + 串行 merge

每个 build driver 独占自己的 table_addInput() 全程无任何同步——因为没有共享。N 个 driver 真正并行地各建各的表。只有最后的 merge 是串行的(指派给 last driver)。

而且 merge 本身内部还能再并行

1
2
3
4
// HashBuild.cpp:933
table_->prepareJoinTable(
std::move(otherTables), ...,
allowParallelJoinBuild ? queryCtx()->executor() : nullptr); // 传入 executor 做 fork/join

品味点: 这是一个两级并行的精妙设计。第一级:N driver 无锁并行填各自的 partial table(吞吐最大化)。第二级:归约阶段虽然逻辑上串行,但 prepareJoinTable 内部又用 executor 把”重算所有 partial table 的 hash 分区”这件事 fork/join 出去。本该是瓶颈的串行段,被二次并行化吃掉了。”先无共享地并行,再把不得不串行的部分内部并行化”——这是处理 reduce 阶段的最佳实践。

7.6 所有权转移用 stateCleared_ 标志位防双重处理

last driver “偷”走 peer 的 table 时,用一个 bool 标志保证幂等:

1
2
3
4
5
// HashBuild.cpp:900
std::lock_guard<std::mutex> l(build->mutex_);
VELOX_CHECK(!build->stateCleared_, "peer state empty..."); // 断言没被偷过
build->stateCleared_ = true; // 标记已偷
otherTables.push_back(std::move(build->table_)); // 转移所有权

品味点: unique_ptr 的 move 语义 + stateCleared_ 标志位 + VELOX_CHECK 断言,三者组合让”所有权转移”既安全又自我验证。被偷后的 peer 在 close() 时再次置位是幂等的。用类型系统(unique_ptr 不可复制)来表达”所有权只能有一个归属”,而不是靠注释约定——让编译器帮你查并发 bug。

7.7 内存仲裁 reclaim 与算子线程的竞争防护

Spill 可能由另一个线程(内存仲裁器)发起,它会回调算子的 reclaim()。这与算子自己的执行线程存在天然竞争。Velox 用两层防护:

  1. 算子线程先被暂停:仲裁前 Task 会 pause 相关 driver,保证 reclaim 时算子线程已离开 CPU。
  2. NonReclaimableSectionGuard 临界区守卫
    1
    2
    3
    4
    5
    6
    7
    8
    // Operator.h:489
    class NonReclaimableSectionGuard {
    NonReclaimableSectionGuard(Operator* op) {
    op_->nonReclaimableSection_ = true; // 进入不可回收区
    }
    ~NonReclaimableSectionGuard() { ... } // RAII 自动退出
    };
    tsan_atomic<bool> nonReclaimableSection_{false}; // tsan 可见,配合 ThreadSanitizer 检测

reclaim() 入口先检查状态,不安全就直接放弃本次回收:

1
2
3
// HashBuild.cpp:1310
VELOX_CHECK(!nonReclaimableSection_);
if (nonReclaimableState()) return; // 任一 peer 处于临界态则放弃

品味点: 这里有三层品味。其一,用 RAII guard 管理”不可回收区”的进出,绝不会忘记退出(异常安全)。其二,nonReclaimableSection_tsan_atomic 包装——这个类型在普通构建是裸 bool(零开销),在 ThreadSanitizer 构建下变成带 happens-before 标注的原子量,让 TSan 能验证跨线程访问的正确性。为并发正确性的可验证性买单,但只在测试构建付费。其三,reclaim 失败时选择优雅放弃(return)而非报错——回收是尽力而为的,这次回收不了就等下次,不破坏正确性。

7.8 范式总览

范式 替代了什么朴素做法 收益
协作式非阻塞算子 一算子一线程 + cv.wait() M:N 调度,线程数 = 核数,无上下文切换风暴
Promise/Future 握手 共享标志位 + 轮询/条件变量 跨 pipeline 解耦,信号与数据分离
Last-Peer Barrier std::barrier + 静态协调者 同步点与归约指派合一,动态负载均衡
锁内只搬指针 持锁做重计算 临界区趋近零,无锁竞争
无锁并行 + 内部并行 merge 全局锁保护共享 table 两级并行,吃掉串行瓶颈
unique_ptr + 标志位转移所有权 裸指针 + 注释约定 编译器/断言验证并发安全
RAII guard + tsan_atomic 手动加解锁 + 无验证 异常安全 + TSan 可验证、零生产开销

一句话总结整体设计哲学:

把”等待”变成 future、把”线程”变成可调度的 driver、把”锁”压缩成指针搬运的瞬间、把”串行归约”二次并行化、把”并发正确性”交给类型系统和 sanitizer 去验证。HashJoin 不是”用锁把并发问题摁住”,而是”通过结构设计让大部分并发冲突根本不存在”。


八、Promise/Future 生命周期管理:如何不泄漏、不卡死

整个 HashJoin(乃至 Velox 执行层)散落着大量 promise/future。这是把双刃剑——它换来了非阻塞调度,但任何一个 promise 被悄悄丢弃、或任何一个 future 永远无人 fulfill,就会造成 driver 永久挂起(hung query),等价于资源泄漏。Velox 用一套系统化的设计来杜绝这两类故障。

先明确两个故障模式:

  • 故障 A:promise 被销毁但从未 fulfill —— 等在对应 future 上的 driver 永远等不到信号。
  • 故障 B:future 无人满足 —— 通常发生在 task 异常/取消路径,正常的 fulfill 点被跳过。

8.1 底层兜底:folly 的 BrokenPromise 把”挂死”转成”报错”

这是整套设计的基石。ContinuePromise/ContinueFuture 基于 folly:

1
2
using ContinuePromise = VeloxPromise<folly::Unit>;
using ContinueFuture = folly::SemiFuture<folly::Unit>;

folly 的语义是:一个 Promise 析构时若尚未 fulfill,会自动给关联的 future 投递一个 BrokenPromise 异常。也就是说——

即使代码因为 bug 把一个 promise 直接丢弃了,等待方也不会无限挂起,而是会”带着错误被唤醒”。挂死(最难排查)被降级成了报错(有栈、有日志、能恢复)。

这是一条至关重要的安全网:它把”故障 A”从”静默死锁”变成了”显式异常”。Velox 的上层设计都建立在这条兜底之上。

8.2 VeloxPromise:给每个 promise 加”出生地”标签

Velox 没有裸用 folly::Promise,而是包了一层 VeloxPromiseVeloxPromise.h:41):

1
2
3
4
5
6
~VeloxPromise() {
if (!this->isFulfilled()) {
LOG(WARNING) << "PROMISE: Unfulfilled promise is being deleted. Context: "
<< context_;
}
}

构造时强制要求传入 context 字符串(如 "HashJoinBridge::tableOrFuture""Task::allPeersFinished {taskId}")。

品味点: BrokenPromise 兜底了”不挂死”,但还有个问题——哪个 promise 漏了?分布式执行里几百个 promise,光知道”有人 broken”无济于事。VeloxPromise 在析构时打印 context,等于给每个 promise 烙上”出生地”。一旦出现未 fulfill 的析构,日志直接告诉你是哪条协调路径漏了。**这是把”难以复现的并发泄漏”变成”一行可 grep 的 WARNING”**——可观测性优先于事后调试。注意它只是 LOG(WARNING) 而非 VELOX_CHECK:因为在 task 取消路径下,promise 经 cancel() fulfill 后再析构是正常的,未 fulfill 析构是”可疑”而非”必错”,用 warning 而非 fail 是恰当的分寸。

8.3 提取-加锁 / fulfill-解锁:异常安全的核心 idiom

所有 fulfill 点都遵循同一个两段式结构(以 JoinBridge::cancel() 为例,JoinBridge.cpp:31):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void JoinBridge::cancel() {
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
cancelled_ = true;
promises = std::move(promises_); // ① 锁内:把 promise 搬到局部变量
} // (promises_ 被清空)
notify(std::move(promises)); // ② 锁外:逐个 setValue
}

// JoinBridge.cpp:20
static void JoinBridge::notify(std::vector<ContinuePromise> promises) {
for (auto& promise : promises) {
promise.setValue();
}
}

这个写法防了三件事:

  1. 死锁 —— setValue() 会触发下游 driver 重新入队调度,可能回调进其他持锁路径。在锁外 fulfill 避免锁重入/锁顺序死锁。
  2. promise 泄漏 —— 一旦 std::move(promises_) 执行,这批 promise 的所有权就完全转移到局部 promises 变量。无论后面发生什么(包括异常),这个局部 vector 析构时,要么已经 setValue 过,要么触发 BrokenPromise 唤醒等待方。**绝不会”留在 promises_ 里被遗忘”**。
  3. 重复 fulfill —— 搬走后 promises_ 已空,后续再调 cancel() 是幂等的 no-op。

品味点: “把待 fulfill 的 promise 从共享状态搬进栈上局部变量”是关键一招。共享状态里的 promise 容易被遗忘(要靠记得清空),而栈上局部变量一定会析构——C++ 的栈展开机制成了泄漏的最后防线。即使 notify() 中途抛异常,剩余 promise 随栈展开析构 → BrokenPromise → 等待方带错误恢复。把生命周期管理”挂靠”到语言保证的 RAII 上,而不是靠程序员的纪律。

8.4 Task 终止:把每一个 promise 源头都抽干

故障 B 主要发生在异常/取消路径——正常的 fulfill 点(如 build 完成调 setHashTable)被跳过了。Velox 的对策是:Task::terminate()Task.cpp:2502系统化地遍历每一个 promise 来源并全部 fulfill

1
2
3
4
5
6
7
// Task::terminate() 内
taskCompletionNotifier.activate(std::move(taskCompletionPromises_), ...); // 任务完成
stateChangeNotifier.activate(std::move(stateChangePromises_)); // 状态变更
for (auto& splitGroupState : splitGroupStates) splitGroupState.clear(); // split
for (auto& promise : splitPromises) promise.setValue(); // split 等待
for (auto& bridge : oldBridges) bridge->cancel(); // join/其他 bridge
for (auto& barrierPromise : barrierPromises) barrierPromise.setValue(); // barrier 等待

每一类 promise——任务完成、状态变更、split 输入、JoinBridge 的 build/probe 等待、allPeersFinished 的 barrier 等待——都有对应的抽干语句。bridge->cancel() 内部又走 8.3 的 idiom 抽干 bridge 自己的 promises_

品味点: 这是”集中式排水口“设计。promise 可以分散地在各 bridge、各 barrier 里产生,但销毁只有一个总闸——terminate()。它像一个清单一样逐项点名每个 promise 容器。配合 8.2 的 context 日志,如果将来新增了一种 promise 却忘了在 terminate 里抽干,测试时就会看到”Unfulfilled promise … Context: XXX”的告警,直接定位遗漏。新增 promise 源 → 必须在 terminate 加一行,这条不变量靠日志自我执行。

8.5 EventCompletionNotifier:RAII 保证”即使终止过程自己抛异常也 fulfill”

终止过程本身也可能抛异常。如果 fulfill 写成裸循环,异常会跳过剩余 promise。Velox 用一个 RAII 包装兜底(Task.cpp:51):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class EventCompletionNotifier {
public:
~EventCompletionNotifier() { notify(); } // 析构兜底
void activate(std::vector<ContinuePromise> promises, std::function<void()> cb = nullptr) {
active_ = true; callback_ = std::move(cb); promises_ = std::move(promises);
}
void notify() {
if (active_) {
for (auto& p : promises_) p.setValue();
promises_.clear();
if (callback_) callback_();
active_ = false; // 幂等:标记已通知
}
}
};

正常路径显式调 notify()(置 active_=false,析构不再重复);若异常跳过了显式 notify()析构函数会补上

品味点: 这是 8.3 idiom 的升级版——连”忘记调用 notify”或”notify 前抛异常”都用析构函数兜住。active_ 标志让它幂等(显式调过就不再调)。把”必须执行的收尾动作”绑定到对象生命周期,是 C++ 异常安全的标准答案,这里用在了 promise fulfill 上。

8.6 BlockingState:future 在飞行途中,谁保证 driver 不被析构?

还有一个隐蔽的泄漏/悬垂问题:driver 返回 future 后,执行线程离开了,是什么阻止 Driver 对象(及其内存池)在 future resolve 前被销毁? 答案是 BlockingState 持有一个 shared_ptr<Driver>Driver.h:184):

1
2
3
4
5
class BlockingState {
std::shared_ptr<Driver> driver_; // keepalive:把 driver 续命
ContinueFuture future_;
// ...
};
1
2
3
4
5
6
7
8
9
10
11
12
13
// Driver.cpp:227  setResume()
void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
std::move(state->future_)
.via(&exec)
.thenValue([state](auto&&) { // lambda 捕获 state → 续命整条链
// ... 未终止则重新入队 ...
Driver::enqueue(state->driver_);
})
.thenError(folly::tag_t<std::exception>{}, [state](std::exception const& e) {
// future 带 BrokenPromise/其他错误 resolve → 设置 task error
state->driver_->task()->setError(std::current_exception());
});
}

续命链条:

1
2
future 回调  →  捕获 [state]  →  state (shared_ptr<BlockingState>)
└─ driver_ (shared_ptr<Driver>)

只要 future 没 resolve,回调 lambda 就活着,lambda 捕获的 state 就活着,state->driver_ 就活着——driver 不会被析构。future 一旦 resolve(无论成功 thenValue 还是失败 thenError),lambda 执行完毕、state 析构、driver 引用释放。

品味点: 两点品味。其一,用 shared_ptr 的引用计数把”对象生命周期”和”异步操作生命周期”绑定——异步操作没结束,对象就不会死,从根上消除悬垂。其二,thenValue + thenError 成对出现:这正是对 8.1 BrokenPromise 的接应。promise 被丢弃 → future 带 BrokenPromise resolve → 走 thenErrorsetError → 整个 task 优雅失败并连锁抽干其余 promise(回到 8.4)。一条 driver 的 future 异常,会触发全 task 的有序拆除,而不是孤立地挂死。错误是会传染的,而这正是设计想要的——宁可让整个 task 带错退出并清理干净,也不要留一个静默挂起的 driver。

8.7 这套设计的分层防御总览

层次 机制 防的是
L0 语言/库兜底 folly BrokenPromise:未 fulfill 的 promise 析构 → future 收到异常 把”挂死”降级为”报错”
L1 可观测性 VeloxPromise 析构打印 context 定位哪条路径漏了 promise
L2 fulfill idiom 锁内 std::move 提取 → 锁外 notify,靠栈上局部变量 RAII 死锁 / 遗忘 / 重复 fulfill
L3 终止排水 Task::terminate() 逐项抽干每类 promise 源 异常/取消路径下的故障 B
L4 异常安全 EventCompletionNotifier 析构兜底 fulfill 终止过程自身抛异常
L5 生命周期 BlockingState 持 shared_ptr<Driver> keepalive future 飞行途中 driver 被析构(悬垂)
L6 错误传染 thenErrorsetError → 连锁 terminate 单点 future 异常 → 全 task 有序拆除

一句话总结 promise 生命周期哲学:

不指望程序员”记得 fulfill 每个 promise”——而是层层设防:库层让漏掉的 promise 变成异常而非挂死,包装层标注出生地便于定位,idiom 层把 promise 挂靠到栈 RAII 上,终止层设一个集中排水口点名抽干,再用 shared_ptr 把对象生命周期焊死在异步操作上。最终结果是:任何一个 promise 走失,系统都会”带着错误大声退出并清理干净”,而绝不”安静地挂死”。