Velox Task & Driver
一、Task 的设计与实现
1.1 Task 是什么
Task 代表一个查询片段(PlanFragment)的完整执行单元,是 Velox 执行层最顶层的对象,负责:
- 将 PlanFragment 翻译成若干 Pipeline(DriverFactory);
- 创建并管理所有 Driver;
- 管理 Split(数据分片)的分发;
- 维护跨 Driver 共享的桥接结构(JoinBridge、LocalExchange 等);
- 协调终止、暂停、取消等控制流。
定义在 velox/exec/Task.h:43。
1.2 Task 状态机
1 | TaskState(velox/exec/TaskStructs.h:44) |
状态转换核心代码(Task.cpp:2516):
1 | state_ = terminalState; |
1.3 核心数据成员
| 成员 | 类型 | 作用 |
|---|---|---|
drivers_ |
vector<shared_ptr<Driver>> |
拥有所有 Driver(Task.h:1250) |
driverFactories_ |
vector<unique_ptr<DriverFactory>> |
每条 Pipeline 一个,描述如何创建 Driver |
splitGroupStates_ |
unordered_map<uint32_t, SplitGroupState> |
每个 split group 的跨算子共享状态(JoinBridge、LocalExchange、Barrier) |
splitsStates_ |
unordered_map<PlanNodeId, SplitsState> |
每个 source node 的 Split 队列 |
numThreads_ |
int32_t |
当前在线程上运行的 Driver 数量 |
terminateRequested_ |
atomic_bool |
是否请求终止,Driver 会频繁轮询 |
pauseRequested_ |
atomic_bool |
是否请求暂停 |
mutex_ |
timed_mutex |
保护所有状态字段 |
1.4 Task 的执行模式
Task.h:46 定义了两种模式:
- kParallel:通过
Task::start()启动,把 Driver 提交到 Executor 并行执行(生产用途)。 - kSerial:通过
Task::next()在调用线程上单步执行,用于测试或嵌入式查询。
1.5 Task 启动流程
调用链:Task::start() → createDriverFactoriesLocked() → initializePartitionOutput() → createAndStartDrivers()
1 | // Task.cpp:982 — Task::start() |
createAndStartDrivers()(Task.cpp:1070):
1 | // 为 ungrouped execution 创建并入队所有 Driver |
drivers_ 的内存布局(Grouped Execution):
1 | drivers_[0 .. numDriversPerSplitGroup_*concurrentSplitGroups_-1] ← grouped execution 槽位 |
Grouped execution 的槽位在 split group 完成后置 nullptr,新 split group 的 Driver 复用这些槽位。
1.6 Split 分发
1 | Task::addSplit() → addSplitLocked() → SplitsStore::addSplit() |
Driver 通过 Task::getSplitOrFuture()(Task.h:442)从 SplitsStore 取 split。如果没有 split,返回kWaitForSplit,Driver 进入 Blocked 状态。
1.7 allPeersFinished —— Hash Join Build 同步屏障
Task.h:526:所有并发的 build driver 都执行完后,最后一个 driver 收到 true,其他 driver 通过ContinueFuture 阻塞等待。这是 HashJoin 多线程 build 的协调机制。
SplitGroupState::barriers(TaskStructs.h:213)存储这些屏障状态,持有 Driver 的 shared_ptr,形成一个引用环——
该引用环在 allPeersFinished 正常路径或 Task::terminate 异常路径中被清理。
1.8 Task::terminate —— 统一终止路径
Task.cpp:2502 是所有终止的汇聚点:
1 | terminate(terminalState) { |
二、Driver 的设计与实现
2.1 Driver 是什么
Driver 是单条 Pipeline 的单线程执行引擎。每个 Driver 持有一组串联的 Operator,从 source operator
(TableScan、Exchange 等)拉数据,逐级传递给 sink operator(PartitionedOutput、CallbackSink 等)。
定义在 velox/exec/Driver.h:353。
2.2 Driver 的核心组成
1 | Driver |
DriverCtx(Driver.h:231):Driver 的执行上下文,是 Driver 与 Task 之间的接口层。它持有shared_ptr<Task>,这是 Driver → Task 引用的唯一路径。
2.3 ThreadState —— Driver 的状态
Driver.h:97 定义了 ThreadState:
1 | struct ThreadState { |
2.4 Driver 状态流转
1 | 创建后 |
状态说明(对应代码注释 Driver.h:72):
| 状态 | 判断条件 | 进入方式 | 退出方式 |
|---|---|---|---|
| Created | 所有 flag 为 false | Task::createDriversLocked |
enqueue |
| Enqueued | isEnqueued=true |
Driver::enqueue() |
Executor 调度到线程 |
| On Thread | thread 字段非空 |
Task::enter() 返回 kNone |
返回 kBlock/kYield/kPause/kTerminate |
| Blocked | hasBlockingFuture=true |
BlockingState 构造时设置 |
future 兑现 → BlockingState::setResume() → enqueue |
| Suspended | numSuspensions > 0 |
Task::enterSuspended() |
Task::leaveSuspended() |
| Terminated | isTerminated=true |
Task::enter() 返回 kTerminate,或 Task::leave() |
无(终态) |
2.5 Driver::runInternal —— 核心执行循环
Driver.cpp:519 是 Driver 的心脏(详细的执行模型分析见第六章):
1 | runInternal() { |
2.6 blockDriver —— 进入 Blocked 状态
1 | // Driver.cpp:669(声明 Driver.h:669) |
BlockingState::setResume()(Driver.cpp:227):future 兑现时(通过 QueuedImmediateExecutor)记录算子
blocking 时间、hasBlockingFuture = false,若 task 未 pause 则 Driver::enqueue(driver) 重新入队。
2.7 CancelGuard —— 线程安全的清理保证
Driver.h:563,RAII 对象。runInternal 正常结束时调用 notThrown();若发生异常,guard 析构会调用Task::leave() 和 Driver::close(),确保 Driver 干净退出。
2.8 Driver::close —— 正常关闭路径
1 | // Driver.cpp:1063 |
closeOperators()(Driver.cpp:898):关闭所有算子、将 driver 生命周期统计(queued/on-thread/blocked
时间)与算子统计上报到 Task。
三、Task 与 Driver 的交互与生命周期
3.1 引用关系图
1 | Task ──── shared_ptr ────► Driver (drivers_[i]) |
额外的两个 Driver 引用:
- BlockingState 中:
std::shared_ptr<Driver> driver_,在 future 未兑现期间延长 Driver 生命。 - Executor 队列 lambda 中:
[driver]() { Driver::run(driver); }。
3.2 完整生命周期
1 | Task::create() |
3.3 Task::enter / leave —— Driver 线程注册协议
Task::enter(state)(Task.cpp:3341):
1 | 加锁 |
Task::leave(state, driverCb)(Task.cpp:3382):
1 | 加锁,检查 shouldStop(此时可能收到 terminate 请求) |
该设计确保 Driver 的 close() 总在 driver 线程上执行,避免 driver close 与 operator abort 之间的竞态。
3.4 Suspended 状态的用途
当 Driver 需等待外部 IO、内存仲裁决策时,不能简单 block(会丢失调用栈),而是进入 Suspended 状态:
1 | // Task::enterSuspended(): |
3.5 Split Group 并发控制
Grouped Execution 模式下,Task 按 concurrentSplitGroups_ 控制并发处理的 split group 数量。每当一个 split
group 的所有 Driver 完成,Task::removeDriver() → ensureSplitGroupsAreBeingProcessedLocked()
(Task.cpp:1503)为下一个 queued split group 创建新 Driver。
四、TaskDriverOperatorLifecycle.md 中文翻译
以下为
velox/exec/TaskDriverOperatorLifecycle.md的完整翻译。
Task、Driver、Operator 生命周期
摘要:Task 的生命周期长于 Driver 和 Operator。Task 与 Driver 之间存在循环引用,这些循环引用在 Task
释放对 Driver 的引用时被解除。Task::terminate 包含了兜底逻辑,用于在异常或提前终止时清理所有资源。
OutputBufferManager
OutputBuffer 持有对 Task 的引用。这个引用用于在所有输出数据被消费完毕时,通过调用task->setAllOutputConsumed()(从 OutputBuffer::deleteResults 触发)来通知 Task。
该引用在 OutputBuffer 被销毁时释放,销毁发生在两个地方:(1) OutputBufferManager::deleteResults(正常路径);
(2) OutputBufferManager::removeTask(错误路径)。
在 Prestissimo 应用中,OutputBufferManager::deleteResults 由 TaskManager::abortResults 调用,后者由TaskResource::abortResults(挂载在 HTTP DELETE /v1/task/{id}/results/{id} 路由)触发。
OutputBufferManager::removeTask 则由 Task::terminate 调用。
Operator
Operator 不直接持有对 Task 的引用。它通过 DriverCtx 间接访问 Task。
Driver 与 Task
DriverCtx 存储了一个指向 Task 的 shared_ptr。Driver 独占拥有 DriverCtx,并通过 DriverCtx 间接持有对
Task 的引用。Operator 则通过裸指针引用 DriverCtx。
1 | // Driver 侧: |
Driver 对 Task 的引用只有在 Driver 析构时才会释放。
Task 共享拥有所有 Driver:
1 | std::vector<std::shared_ptr<Driver>> drivers_; |
Task 在两个地方存储对 Driver 的引用:(1) drivers_ 成员变量,存储所有 Driver;(2)SplitGroupState.barriers,存储包含 join build 的 Driver(用于 Task::allPeersFinished)。
drivers_ 的引用在以下两处被清理:(1) Task::removeDriver——由 Driver::close 调用;(2) Task::terminate。
barriers 中存储的 Driver 引用在以下情况下被清理:(1) Task::removeDriver 中,通过SplitGroupState::clear();(2) Task::allPeersFinished,在所有 join build 流水线完成时;(3)Task::terminate 中,通过 SplitGroupState::clear()。
Task::removeDriver 和 Task::allPeersFinished 在正常路径(包括 Driver 自身遇到错误并主动关闭的路径)中被调用。Task::terminate 则在错误路径中被调用。
Task 与 Driver 之间存在循环引用:Task 引用 Driver,Driver 引用 Task。这些循环引用在 Task 释放对 Driver 的引用时被解除。
Driver 至少通过 DriverCtx::task 持有一个 Task 引用,直到析构为止。因此,Driver 的生命周期不可能超过 Task 的生命周期。
此外,Driver 独占拥有其所有 Operator,所以 Operator 的生命周期也不可能超过 Driver。总结:Task 的寿命 ≥ Driver
的寿命 ≥ Operator 的寿命。
BlockedState 结构体中也持有对 Driver 的引用。该结构体在 Driver 进入 Blocked 状态时创建,被存储在挂载到阻塞
future 的回调 lambda 中。它仅在 future 兑现且回调执行完毕后才会销毁。详见 BlockingState::setResume。如果 future
永远不兑现,Driver 和 Task 将发生泄漏。
Executor 队列中的 lambda 同样持有对 Driver 的引用:
1 | // static |
Driver 从以下几处被加入 Executor 队列:
Task::start:开始运行;BlockingState::setResume:阻塞 future 兑现后继续运行;Task::ensureSplitGroupsAreBeingProcessedLocked:在 Grouped Execution 中启动新的 split group;Task::resume:此路径目前未被使用;Driver::run被Task::requestYield触发:此路径目前未被使用。
五、架构设计品味、代码品味与多线程编程范式
5.1 架构设计品味
5.1.1 “协作式调度”而非”抢占式线程”
整个设计中最高层次的品味。朴素实现是每个 Pipeline 绑定一个 OS 线程、阻塞在 IO/锁上靠操作系统调度;Velox 反其道而行——
**Driver 不拥有线程,而是被调度到线程池上的一段段”执行片”**:
1 | Driver 不是线程,而是一个可被反复"挂起/恢复"的状态机。 |
收益:
- M 个 Driver 复用 N 个线程(M ≫ N),一台机器可同时承载成千上万个 Pipeline 而不会线程爆炸;
- 阻塞(等 split、等 join build、等下游消费、等内存仲裁)时,Driver 主动让出线程(
kBlock→setResume),线程立刻去跑别的 Driver; - CPU 时间片到期主动
kYield,实现公平性。
本质上是把操作系统的线程调度,下沉为应用层的协程调度。StopReason 枚举就是这套协程调度的”返回码协议”。
5.1.2 单一终止漏斗(terminate as a funnel)
无论是正常完成(kFinished)、用户取消(kCanceled)、外部错误(kAborted)、还是自身算子抛异常(kFailed),
所有路径最终都汇聚到 Task::terminate(terminalState) 这一个函数。资源清理逻辑只写一遍,杜绝”正常路径清理了
A 忘了 B,异常路径清理了 B 忘了 A”的经典 bug。
5.1.3 所有权方向与生命周期的”单调性”
1 | Task ⊇ Driver ⊇ Operator (寿命包含关系) |
这个不变量靠所有权结构强制:Driver 通过 DriverCtx::task 这个 shared_ptr 死死拽住 Task 直到自己析构;
Driver 用 unique_ptr 独占 Operator。于是 Operator 里可以放心地用裸指针访问 DriverCtx、Task,
不需要任何 weak_ptr / 生命周期检查——类型系统已经证明了它们一定还活着。
5.1.4 循环引用的”受控泄漏”
Task ↔ Driver 是明知故犯的循环引用。设计者没有用 weak_ptr 打破它,而是选择在一个确定的点手动切断
(drivers_[i] = nullptr)。原因:执行期间 Task 必须强引用 Driver,Driver 也必须强引用 Task,weak_ptr 会引入
“每次访问都要 lock() 检查”的开销。这里用一个明确的手动切断点,换取运行期的零开销与代码简洁;代价是 lifecycle.md
末尾那句警告——若 future 永不兑现就会泄漏。设计者清楚代价并写进了文档。
5.1.5 窄接口统一异构等待——isBlocked 作为唯一的”挂起-唤醒”抽象
整个执行引擎里,算子要等的外部事件五花八门:等 split(TableScan)、等远程数据(Exchange)、等本地 queue
(LocalExchange)、等 join build(HashProbe)、等内存仲裁、等 RPC……但 Driver 调度循环只认识一种东西:isBlocked 返回的 (BlockingReason, ContinueFuture) 二元组(Operator.h:284)。
这是极有品味的架构收敛:用一个窄接口把所有异构的异步等待归一化。 Driver 不需要知道”等的是什么”,只需要
“reason 非 kNotBlocked 就让出、future 兑现就唤醒”。新增一种等待来源(如后来的 kWaitForRPC)不需要改动调度核心,
只需让对应算子在 isBlocked 里返回新的 reason + future。BlockingReason 枚举(BlockingReason.h)只是诊断/统计用的
标签,对调度逻辑而言所有 reason 等价。详见第八章。
这条与 5.1.1 的”协作式调度”互为表里:5.1.1 讲 Driver 如何让出线程,5.1.5 讲算子如何统一地告诉 Driver”该让出了”。
5.2 代码品味
5.2.1 快路径无锁,慢路径加锁
Task::shouldStop()(Task.cpp:3505)被 Driver 主循环每个算子每轮调用:
1 | StopReason Task::shouldStop() { |
Task.h:1414 注释明确解释了为何这些字段是 atomic:为了让 ThreadSanitizer 不误报,同时表明”在锁外读 0/false 是安全的”。
5.2.2 “Locked 后缀”约定——把锁契约编码进函数名
foo()(自己加锁)与 fooLocked()(调用者必须已持锁)成对出现,把”调用前必须持有 mutex_”这一契约固化进函数签名。
5.2.3 RAII 兜底一切
CancelGuard(Driver.h:563)保证 Driver 无论正常返回还是抛异常都会 Task::leave + close()。promise 的兑现也用folly::makeGuard 包裹。关键细节:promise 的 setValue() 会同步触发 future 回调,回调里可能又抢 mutex_,
所以代码刻意在锁内把 promise/driver swap 到局部变量,出锁后再 setValue/closeByTask,避免”持锁触发回调 → 回调抢同一把锁 → 死锁”。这个”锁内收集、锁外执行副作用”的模式在 Task::leave、Task::terminate 中反复出现。
5.2.4 统计的双保险
Driver.cpp:533 的 on-thread 时间统计,既在 closeOperators() 手动 finalize,又有 scope guard 兜底,并用onThreadStartUs_ = 0 哨兵值防止重复计数。两条路径都覆盖且防双计。
5.2.5 主循环写得极致紧凑
runInternal 用一个 for 配合 i += 2; continue; 实现数据流调度,没有递归、没有 goto(符合 CODING_STYLE)。
注意:扁平循环不仅是风格,更是协作式调度的前提——详见第六章。
5.2.6 延迟报告模式——阻塞在数据操作里发现,在下一轮 isBlocked 报告
多数算子并不在 isBlocked 里现场探测阻塞,而是在 addInput/getOutput 真正干活时就拿到了 future 与 reason,
存进成员变量(blockingReason_ / future_),等下一轮 Driver 循环调用 isBlocked 时才报告出来。CallbackSink(CallbackSink.cpp:20,48,在 addInput 里记录)与 LocalExchange(LocalPartition.cpp:286,275,
在 getOutput 里记录)是对称的两例。
这不是绕路,而是与 Driver 循环顺序严丝合缝:第七章已述,循环里 isBlocked 在 getOutput/addInput
之前调用,所以本轮产生的阻塞天然只能由下一轮的 isBlocked 暴露。把”发现”和”报告”拆成两个时点,是这套调度
节拍下最自然的写法。详见第八章 8.3。
5.2.7 future 组合语义必须匹配等待语义——collectAny vs collectAll
当一个算子同时等多个 future 时,用哪种组合不是风格而是正确性:
folly::collectAny:任一就绪即唤醒。用于”任何一边推进都值得重试”——Exchange同时等数据和等 split
(Exchange.cpp:171)。folly::collectAll:全部就绪才唤醒。用于”必须所有阻塞条件都解除”——LocalPartition扇出到多个下游 queue,
必须等所有满的下游都腾出空间(LocalPartition.cpp:633)。
选错有真实后果:扇出 sink 若用 collectAny 会在仍满的下游上”唤醒→立刻又阻塞”空转;source 若用 collectAll
会在数据已到但 split 未到时白等。详见第八章 8.4。
5.3 涉及的多线程编程范式
| 范式 | 体现 | 文件/行 |
|---|---|---|
| 协作式调度(无栈协程) | Driver 主动返回 StopReason 让出线程,现场压缩成几个标量后可重入 |
runInternal |
| 三种让出语义区分 | Block(等外部事件,需 future)/ Yield(时间片,回队尾)/ Suspend(保留栈,原地等) | 第八章 8.2 |
| Future/Promise 续延 | 阻塞→ContinueFuture,兑现时 thenValue 重新入队 |
BlockingState::setResume Driver.cpp:227 |
| 窄接口统一异步等待 | 异构等待全部归一成 (BlockingReason, ContinueFuture) 二元组 |
Operator::isBlocked Operator.h:284 |
| 唤醒句柄所有权 | 谁掌握”事件就绪”的知情权(queue/exchangeClient/JoinBridge/consumer)谁负责兑现 future | 第八章 8.3 |
| future 组合语义 | collectAny(任一就绪即重试)vs collectAll(全部解除才继续) |
Exchange.cpp:171 / LocalPartition.cpp:633 |
| 状态机驱动的异步 | isBlocked 推进算子内部状态机,跨状态挂起/恢复 |
HashProbe::isBlocked HashProbe.cpp:651 |
| 同步屏障 | join build 多线程汇合,最后一个 fulfill 其余 promise | Task::allPeersFinished Task.h:542 |
| 线程计数 + 等待信号 | numThreads_ 归零时 fulfill threadFinishPromises_ |
allThreadsFinishedLocked Task.cpp:3532 |
| 可重入挂起 | numSuspensions 递归计数,保留调用栈让出线程占用 |
enterSuspended/leaveSuspended Task.cpp:3424 |
| 快慢路径分离(lock elision) | atomic flag 走无锁快路径 | shouldStop Task.cpp:3505 |
| RAII 资源/信号管理 | CancelGuard、makeGuard 兑现 promise | 贯穿 Driver.cpp / Task.cpp |
| 锁内收集、锁外触发副作用 | swap 到局部变量出锁再执行 | Task::leave、Task::terminate |
| 协作式取消 | terminateRequested_ 标志 + 轮询,并桥接 folly::CancellationToken |
Task::terminate、getCancellationToken |
| 单写者所有权消除竞态 | Driver unique_ptr 独占 Operator |
operators_ Driver.h:732 |
一条贯穿全局的主线
让”线程”成为可被任意复用的纯计算资源,让”Driver”成为可被反复挂起/恢复/取消的轻量状态机,并用所有权结构 +
单一终止漏斗 + RAII,把并发正确性从”靠程序员小心”转化为”靠类型与结构保证”。
StopReason 是这套协程调度的指令集,ThreadState 是协程的寄存器现场,Task::mutex_ 是唯一的串行化点,ContinueFuture/Promise 是协程之间的唤醒总线,而 isBlocked 返回的 (BlockingReason, ContinueFuture)
二元组则是算子向调度器递交的统一”挂起申请单”——五花八门的外部等待经它归一,才让上面这套协程模型得以用一份代码服务所有算子。
六、runInternal 的执行模型:不是 Volcano,也不是教科书式 push
准确说法:这是一个由 Driver 集中调度的、单栈帧的、demand-gated(需求门控的)push 执行模型。
第 2.5 / 5.2.5 节用了”主循环”的笼统措辞,本章给出精确语义。
6.1 控制流:Driver 是唯一的调度器
operators_ 是一个扁平数组,索引含义固定:
1 | operators_[0] = source(TableScan / Exchange) ← 最上游 |
关键事实:算子之间从不互相调用。 所有算子方法(isBlocked / needsInput / getOutput / addInput /isFinished / noMoreInput)全部由 Driver 在同一个栈帧里直接调用。算子只是一组被动回调,Driver 是中央调度器。
与 Volcano 的本质区别:
| 维度 | Volcano(经典 pull) | Velox Driver |
|---|---|---|
| 谁驱动执行 | 根算子 next() 递归调用孩子的 next() |
单个 Driver 循环调用所有算子 |
| 调用栈 | 深递归:sink→…→source,每批一层套一层 | 扁平:一个栈帧 + 对算子数组的循环 |
| 算子间耦合 | 算子 A 直接调用算子 B | 算子互不调用,全由 Driver 中转 |
| 数据传递 | 孩子把行返回给父亲(返回值上溯) | 生产者 getOutput() → Driver → 消费者 addInput() |
6.2 调度扫描方向:从 sink 往 source 找”谁能干活”
getStartingOperator()(Driver.cpp:1401)返回 operators_.size() - 1,即从 sink 开始:
1 | int32_t startingOperator = getStartingOperator(); // = N-1,即 sink |
要分清两个方向:
- 调度扫描方向:下游 → 上游(找活干);
- 数据流动方向:上游 → 下游(push)。
i += 2 配合 for 的 --i,净效果是 i+1——回头去看刚收到数据的那个下游算子,能否把它的产出再往前推一步。
于是数据一步步被 push 向 sink。数据是被 push 的,不是被 pull 上来的。
6.3 demand-gated(需求门控)
Driver 在 push 之前先问下游 nextOp->needsInput()。若下游说”不需要”(如 HashBuild 还在攒数据、下游缓冲已满),
Driver 就不会从上游取数据。这是单条 Pipeline 内部的反压机制——push,但每一步都被下游需求信号闸住,
属于 HyPer 一系的 morsel-driven / push-with-backpressure 思路。
跨 Pipeline 的反压则通过
BlockingReason::kWaitForConsumer+ContinueFuture,让整个 Driver 下线等待。
6.4 为什么”扁平循环 + Driver 调度”是协作式调度的前提
深递归的 Volcano 调用栈无法被挂起/恢复;扁平循环可以。 这是把全文串起来的枢纽:
1 | 扁平循环 + Driver 中央调度(无算子间递归) |
若是 Volcano 的 sink.next() → … → source.next() 深递归,想”暂停在第 3 层算子并保存现场”几乎不可能(需冻结整个
C++ 调用栈,这正是有栈协程要解决的难题)。而 Velox 的扁平循环里,return kBlock 直接退出,下次 runInternal
重新进入 for 循环即可恢复。整个 Driver 因此变成一个无栈协程(stackless coroutine)。
6.5 小结
- Driver 是调度器,模型是 push,不是 Volcano 的顶层
next()递归; - push 的同时有下游
needsInput()的需求门控(单 Pipeline 内反压),是 demand-gated push 而非无脑灌; - 选择”Driver 集中调度 + 扁平循环”而非递归 pull,根本目的是让 Driver 可挂起——这是 Velox 整个协作式线程复用模型能成立的地基。
七、runInternal 调度实例:三算子 pipeline 全过程
用一条最经典的流水线 TableScan → FilterProject → PartitionedOutput 来逐步 tracerunInternal(Driver.cpp:519)的调度过程。
1 | operators_[0] = TableScan (source,从 split 读数据) |
7.0 算子接口语义
| 方法 | TableScan | FilterProject | PartitionedOutput |
|---|---|---|---|
isBlocked() |
无 split 可读且 noMoreSplits 未到 → kWaitForSplit |
不阻塞 | OutputBuffer 满 → kWaitForConsumer |
needsInput() |
—(source 无上游) | 缓存空时 true | buffer 未满时 true |
getOutput() |
读出一批;读完返回 null | 处理完输入产出一批;无输入返回 null | 并行模式写 buffer 后返回 null |
addInput() |
—— | 接收一批待处理 | 接收一批待写出 |
isFinished() |
noMoreSplits 且无数据 → true | 收到 noMoreInput 且缓存空 → true | noMoreInput 且全部写出 → true |
7.1 阶段一:冷启动——从 sink 往 source “找活干”
Driver 刚上线,所有算子都还没有数据。内层循环 for (i = 2; i >= 0; --i):
i = 2(sink, PartitionedOutput) → i == N-1,走 else 分支
1 | getOutput(sink) → null // 还没人喂它 |
i = 1(FilterProject) → i < N-1,走 if 分支
1 | nextOp = operators_[2] (sink) |
i = 0(TableScan) → if 分支
1 | nextOp = operators_[1] (FilterProject) |
关键点:冷启动时扫描从 sink 一路走到 source(i: 2→1→0),目的是**自下而上确认”谁此刻能产出数据”**。
最终落到 TableScan 这个真正的数据来源,拿到第一批数据。
7.2 阶段二:数据推进——i += 2 的精妙之处
刚才在 i=0 喂完 FilterProject,i += 2 然后 for 的 --i,净效果 i = 1,回头去看刚收到数据的 FilterProject:
i = 1(FilterProject)再次
1 | nextOp = sink, needsInput? → true |
i = 2(sink)
1 | getOutput(sink) → null // PartitionedOutput 把 batch1' 写进 OutputBuffer,并行模式不返回数据 |
i += 2 的设计意图:一批数据刚喂给 operators_[i+1],下一步立刻回到 operators_[i+1] 看它能否把这批数据继续往下游推。
这样一批数据会被”连续地”沿 pipeline 推到底,而不是每次都从头扫描。数据流向始终是 0 → 1 → 2(push),而扫描指针在局部上下跳动。
7.3 阶段三:稳态循环
batch1 推到底后,i 回到 1,FilterProject 已无缓存数据:
1 | i=1: getOutput(FilterProject) → null → isFinished? false → --i → i=0 |
于是稳定在 i=1 ↔ i=0 ↔ i=2 之间往复,每轮从 TableScan 拉一批、过滤、写出。**注意每个算子调用前循环顶部都会执行task()->shouldStop()(检查终止/暂停/yield)和 op->isBlocked()**——这是协作式调度的轮询点。
7.4 阶段四:阻塞场景——blockDriver 让出线程
场景 A:OutputBuffer 满(下游消费跟不上)
某轮 i=1,准备喂 sink 时:
1 | i=1: nextOp = sink |
blockDriver(Driver.cpp:669)构造 BlockingState(设 hasBlockingFuture=true),返回 kBlock。Driver::run(Driver.cpp:861)收到 kBlock → BlockingState::setResume(blockingState) → driver 下线,线程去跑别的 Driver。
当下游消费了 buffer → future 兑现 → setResume 回调(Driver.cpp:227)→ Driver::enqueue 重新入队 →
再次进入 runInternal,从 startingOperator=2 重新扫描。
场景 B:TableScan 等 split
某轮 i=0,循环顶部:
1 | i=0: op = TableScan |
同样下线,等 Task::addSplit() 兑现 promise 后被唤醒。
这两个就是第六章讲的两类反压:B 是上游缺数据,A 是下游消费慢,都通过同一套
kBlock + ContinueFuture机制把整个 Driver 挂起。
7.5 阶段五:收尾——noMoreInput 链式传播 → close
假设 noMoreSplits 已到,TableScan 读完最后一批。某轮 i=0:
1 | i=0: nextOp=FilterProject, needsInput? true |
重新 i=2(sink)
1 | getOutput(sink) → null; isFinished? false → --i → i=1 |
i=1(FilterProject)
1 | nextOp=sink, needsInput? true |
i=2(sink)
1 | getOutput(sink) → null // PartitionedOutput flush 剩余、标记 buffer 结束 |
kAtEnd 让 Driver::run 直接 return(Driver.cpp:877),driver 彻底退出。close() 里的Task::removeDriver 会触发 checkIfFinishedLocked,若是最后一个 driver 且 output 已被消费,则Task::terminate(kFinished)。
结束信号的传播链很清晰:noMoreInput 沿 pipeline 自上游向下游逐级传递
(TableScan→FilterProject→PartitionedOutput),每个算子先把自己的剩余缓存 flush 完(isFinished 返回 true
前的 getOutput),再把”收工”信号交给下一级。每传一级就 break 重启外层循环,确保从 sink 重新评估全局状态。
7.6 时间线汇总
| 轮次 | i 序列 | 发生的事 |
|---|---|---|
| 冷启动 | 2→1→0 | 自下而上找到 TableScan,读出 batch1,喂 FilterProject |
| 推进 | 1→2 | FilterProject 产出 batch1’,喂 sink,sink 写 buffer |
| 稳态 | 1→0→1→2… | 反复拉取-过滤-写出 |
| 阻塞 A | 1 | sink buffer 满 → kBlock 下线,等消费 |
| 阻塞 B | 0 | TableScan 无 split → kBlock 下线,等 addSplit |
| 收尾 | 0(break)→2→1(break)→2 | noMoreInput 逐级传播 → isFinished → close → kAtEnd |
一句话概括这套调度:指针从 sink 往 source 扫描以”定位能产出数据的最上游算子”,一旦取到数据就靠 i+=2
把它连续推向 sink;每个算子边界都轮询 stop/block,需要等待时整条 Driver 作为无栈协程挂起,被唤醒后从 sink 重新扫描。
八、isBlocked 与三种让出机制:结合典型算子解读
8.1 isBlocked 的角色:声明阻塞意愿 + 交出唤醒句柄
isBlocked 是 Velox 把”异步等待”嵌进协作式调度的核心接口。看签名(Operator.h:284):
1 | /// 返回 kNotBlocked 表示可以继续;否则返回原因,并把 'future' 设为一个在 |
要点:**isBlocked 不是 Driver 探测算子状态的工具,而是算子主动声明”我要等外部事件、请让出线程”并交出唤醒句柄的接口。**
它干两件事:
- **返回
BlockingReason**(BlockingReason.h):语义标签,如kWaitForSplit、kWaitForConsumer、kWaitForProducer、kWaitForJoinBuild、kWaitForArbitration、kWaitForRPC等; - **通过
future出参交出”唤醒句柄”**:这个ContinueFuture是算子与外部世界之间的”门铃”,外部事件就绪时谁兑现它谁就负责唤醒 Driver。
没有这个 future,isBlocked 就只是个布尔探测,Driver 让出后将永远不知道何时回来。正是 future 把”让出”和”恢复”两端接上。
让出 CPU 的动作链(isBlocked 是起点和唤醒源,但”让出”由 Driver 的 return kBlock 完成):
1 | op->isBlocked(&future) → reason != kNotBlocked |
8.2 三种让出线程的方式
| 让出方式 | 触发 | 是否需要唤醒句柄 | 恢复方式 | 语义 |
|---|---|---|---|---|
Block(isBlocked) |
算子等外部事件 | 需要 future | future 兑现 → enqueue | 异步等待的核心 |
Yield(shouldYield) |
CPU 时间片耗尽 | 不需要 | 立即重新 enqueue 到队尾 | 公平性,不是等待 |
Suspend(enterSuspended) |
等 IO / 内存仲裁,保留调用栈 | 不需要(同步等) | leaveSuspended 原地恢复 |
让出线程占用但栈不丢 |
只有 Block 是真正的”挂起-异步唤醒”——它把 Driver 退出栈、压缩成几个标量、靠 future 回调复活,这才是异步编程的关键。
Yield 不是等待(马上回队列重排,防霸占线程);Suspend 保留 C++ 调用栈(不退出 runInternal),用于”必须原地等完”的同步阻塞。
8.3 典型算子的 isBlocked 解读
不同算子等待的”外部事件”不同,isBlocked 的写法因此分化出几种鲜明的模式。
(1) CallbackSink —— sink 的下游反压(延迟报告模式)
CallbackSink(CallbackSink.cpp:48)是 sink 算子,把数据交给 consumer 回调(下游 pipeline 的LocalExchangeQueue 或 OutputBuffer):
1 | void CallbackSink::addInput(RowVectorPtr input) { |
模式:延迟报告。 真正发现”下游满了”是在 addInput 里(拿到 future_ 和 blockingReason_ 存进成员),但要等
下一轮 Driver 循环调用 isBlocked 时才报告出来。原因见第七章:Driver 循环里 isBlocked 在 getOutput/addInput
之前调用,所以这一轮产生的阻塞只能下一轮才暴露。kWaitForConsumer 对应第七章「场景 A:OutputBuffer 满」。
(2) LocalExchange —— 本地交换 source(getOutput 探测 + isBlocked 报告)
LocalExchange(LocalPartition.cpp:275)从本地 LocalExchangeQueue 拉数据:
1 | RowVectorPtr LocalExchange::getOutput() { |
模式:getOutput 探测、isBlocked 报告——同样是延迟报告。 与 CallbackSink 对称:sink 在 addInput 探测下游,
source 在 getOutput 探测上游 queue。等待的事件是”本地上游 producer 还没产出数据”(kWaitForProducer)。
(3) Exchange —— 远程交换 source(isBlocked 里干实活 + collectAny)
Exchange(Exchange.cpp:142)从远程 worker 拉数据,是少数在 isBlocked 里直接干活的算子:
1 | BlockingReason Exchange::isBlocked(ContinueFuture* future) { |
模式:isBlocked 即工作点 + 多 future 用 collectAny 合并。 它不只是检查,还触发异步预取。这里同时等两件事
(数据到达、新 split 到达),用 folly::collectAny——任一就绪即唤醒,因为有任何一边推进都值得 Driver 重新尝试。
(4) Merge / LocalMerge —— 多源归并(kWaitForProducer)
Merge(Merge.cpp:75)要从多个有序 source 做归并排序,必须每个 source 都有数据才能比较:
1 | BlockingReason Merge::isBlocked(ContinueFuture* future) { |
模式:归并算子受限于最慢的 source。 任何一个参与归并的 source 没数据,整个归并都无法推进(否则破坏全局有序性),
所以报 kWaitForProducer 等待该 source。
(5) LocalPartition —— 多下游 sink(collectAll)
LocalPartition(LocalPartition.cpp:630)把数据按分区扇出到 N 个下游 queue,与 Exchange 形成鲜明对比:
1 | BlockingReason LocalPartition::isBlocked(ContinueFuture* future) { |
模式:扇出 sink 用 collectAll 合并。 当多个下游 queue 满时,必须等所有满的下游都腾出空间才能继续推
(否则又会立刻在某个满的 queue 上卡住),所以用 collectAll——全部就绪才唤醒。
(6) HashProbe —— join probe(状态机驱动 + kWaitForJoinBuild)
HashProbe(HashProbe.cpp:651)的 isBlocked 是一个内部状态机的推进器:
1 | BlockingReason HashProbe::isBlocked(ContinueFuture* future) { |
模式:isBlocked 驱动算子状态机。 probe 在 build 完成前处于 kWaitForBuild,它通过asyncWaitForHashTable() 拿到一个挂在 HashJoinBridge 上的 future(与第一章 allPeersFinished 的屏障机制呼应),
建好后才转入 kRunning。
8.4 两种 future 组合语义:collectAny vs collectAll
上面 Exchange 与 LocalPartition 的对比揭示了一个值得记住的设计准则:
| 组合 | 唤醒条件 | 适用场景 | 例子 |
|---|---|---|---|
folly::collectAny |
任一 future 就绪 | 等多个独立来源中任何一个推进就值得重试 | Exchange:等数据 OR 等 split |
folly::collectAll |
全部 future 就绪 | 必须所有阻塞条件都解除才能继续 | LocalPartition:等所有满的下游腾空间 |
选错会有真实后果:扇出 sink 若用 collectAny,会在某个仍满的下游上反复”唤醒→立刻又阻塞”空转;source 若用collectAll,会在数据已到但 split 未到时白白多等。
8.5 设计模式总结
| 算子 | 等待事件 | BlockingReason | future 来源 | 模式 |
|---|---|---|---|---|
| CallbackSink | 下游 consumer 满 | kWaitForConsumer | consumeCb_ 回调 | 延迟报告(addInput 探测) |
| LocalExchange | 本地上游无数据 | kWaitForProducer | queue_->next | 延迟报告(getOutput 探测) |
| Exchange | 远程数据/split | kWaitForSplit/Producer | exchangeClient_ | isBlocked 干活 + collectAny |
| Merge/LocalMerge | 任一归并源无数据 | kWaitForProducer | 各 MergeSource | 受限于最慢 source |
| LocalPartition | 所有满的下游 | kWaitForConsumer 类 | 各下游 queue | 扇出 + collectAll |
| HashProbe | build 完成/peer 完成 | kWaitForJoinBuild 等 | HashJoinBridge | 状态机驱动 |
贯穿这些算子的共同点:
- 算子自报,Driver 不探测——
isBlocked由算子根据自身与外部资源的状态决定返回值; - future 是唯一的复活路径——谁持有”事件就绪”的知情权(queue、exchangeClient、JoinBridge、consumer),谁负责兑现 future;
- 延迟报告是常态——多数算子在
addInput/getOutput里就拿到了 future,存入成员,下一轮isBlocked才报告,这与第七章的 Driver 循环顺序(isBlocked先于数据操作)严丝合缝; - future 组合语义要匹配语义——
collectAny(任一推进即重试)vscollectAll(全部解除才继续)。
这就是 isBlocked 作为”协作式异步关键接口”的全貌:它让每个算子用统一的 (BlockingReason, ContinueFuture) 二元组,把
五花八门的外部等待(split、远程数据、本地 queue、join build、内存仲裁、RPC)翻译成 Driver 能统一处理的”挂起-唤醒”信号。
九、为什么产出统一从 getOutput 发起
观察 runInternal(Driver.cpp)会发现:所有”产出新数据”的动作都从 getOutput() 发起,addInput() 只负责投喂、从不产出。
1 | // 中间算子(Driver.cpp:668 附近) |
addInput 的契约是”收下、缓冲、更新内部状态”,计算与产出一律在 getOutput 里发生。这是刻意的设计决定。
9.1 核心原因:解耦”消费输入速率”与”产生输出速率”
若把计算放进 addInput(纯 push),addInput 就必须能”立即算出结果并推给下游”。但很多算子的输入批次与输出批次不是 1:1 的:
| 算子 | 输入 → 输出映射 | 计算若在 addInput 的问题 |
|---|---|---|
| FilterProject | 1 输入 → 0 或 1 输出(可能全被过滤) | 尚可 |
| Aggregation | N 输入 → 攒到 noMoreInput 后才吐 M 个输出 |
addInput 时根本无输出可推 |
| Unnest | 1 输入行 → 爆炸成很多输出行(可能多批吐出) | addInput 得”产出多批并逐个 push”,控制流失控 |
| HashProbe | 1 probe 批 → 0..N 个输出批(一行匹配多行) | 一次输入要触发多次产出 |
| Limit | N 输入 → 截断后提前结束 | 需在投喂中途叫停 |
getOutput 把”产出”变成一个可被反复询问的动作:”你现在能给一批吗?能就给,给不出返回 null。” 于是:
- **一次
addInput可以对应 0..N 次getOutput**——算子按自己的节奏产出。Aggregation 在addInput里只更新哈希表、
到getOutput才吐结果;Unnest 可以连续多次getOutput把一行的多行逐批吐完; - 输入/输出的批大小、批数量彻底解耦,算子内部可自由缓冲、攒批、spill。
纯 push 做不到这点——它强制输入输出对齐,遇到 N:M 映射就得在 addInput 里递归往下游推,无法在批次边界停下来。
9.2 getOutput 返回 null 是干净的、可让出的信号
getOutput 返回值有明确的双重语义(Operator.h:271):非空 = “这是一批输出”;**null** = “我现在产不出更多了
(需要更多输入,或被外部阻塞,用 isBlocked 区分)”。这个 null 正是 Driver 的调度节拍点:拿到 null 就往上游走喂更多
输入(第七章里 i 往 source 递减)。每次 getOutput 产出一批,就是一个天然的让出/背压/检查取消边界——Driver 在每批
之间跑 shouldStop()/shouldYield()/isBlocked()。计算若埋在 addInput 内部循环里,这些边界就消失了。
9.3 统一性:source / 中间 / 攒批算子,Driver 一视同仁
getOutput 让 Driver 无需区分算子类型:
- TableScan(source)无上游,
addInput永不被调用——数据在getOutput里”凭空”从 split 读出; - FilterProject(中间)在
getOutput里消费缓冲输入产出; - Aggregation(攒批)在
getOutput里吐累积结果。
对 Driver 而言它们完全一样:**”调 getOutput 看有没有货”**。循环里唯一的区分只是 source(i=0)没有 i-1、无需先addInput。这与 5.1.5 的 isBlocked 统一抽象是同一种品味——用统一窄接口(getOutput 产出 / addInput 投喂 /isBlocked 等待 / isFinished 结束)让一份调度代码驱动所有算子。
9.4 与 demand-gated push 的关系:pull 触发,push 传递
1 | 下游 needsInput()=true (需求信号,pull 味道) |
“从 getOutput 发起” = 由下游需求触发的、按批产出。它既不是纯 pull(不是下游递归调上游 next()),也不是纯 push
(不是上游来数据就无脑灌),而是 Driver 居中、以 getOutput 为产出节拍器的 demand-gated push(呼应第六章)。
9.5 小结
把”产出”统一收敛到 getOutput、让 addInput 只投喂,本质是为了:(1) 解耦输入/输出速率(支持 1:N、N:M、延迟产出、
攒批),这是 push 模型做不到的;(2) 让 getOutput 返回 null 成为干净的让出/背压边界;(3) 用统一入口抹平
source / 中间 / 攒批算子差异,让一份 Driver 代码驱动所有算子;(4) 与 demand-gated push 自洽:需求触发 getOutput,getOutput 产出后 addInput 传递。
十、Task 的资源管理:terminate 流程与资源回收
Task 是所有执行期资源的根持有者:memory pool 树、Driver、ExchangeClient、JoinBridge、OutputBuffer、spill 目录、
各类 promise。本章梳理这些资源在 happy path 与异常 path 下如何被有序回收。
10.1 Memory Pool 树:层级、所有权与”刻意保活”
Task 构造时建立内存池树(Task.cpp:710 起):
1 | queryCtx_->pool() (query 级,aggregate) |
所有权要点:
pool_(Task.h:1205)是 task root,aggregate 类型(只聚合不直接分配);childPools_(Task.h:1209,vector<shared_ptr<MemoryPool>>)持有所有 node / operator / connector / exchange 池的所有权;nodePools_(Task.h:1215,map<id, MemoryPool*>)只存裸指针,用于按 plan node 复用 node 池;- Operator / connector 池都是 leaf 或 aggregate child,由
addOperatorPool(Task.cpp:781)等创建后push_back进childPools_。
关键设计——刻意保活(Task.h:1207 注释):
“Keep plan node and operator memory pools alive for the duration of the task to allow for sharing vectors across drivers without copy.”
即:算子内存池的生命周期不随 Driver 结束而结束,而是一直存活到 Task 析构。原因是 Velox 大量使用零拷贝向量共享
(一个 Driver 产出的 RowVector 可能被另一个 Driver 引用),若算子池随 Driver 立即销毁,这些跨 Driver 共享的 buffer
就会悬空。所以 Driver close 只释放算子的逻辑状态(Operator::close),其底层 buffer 所在的内存池对象由childPools_ 兜住,直到 ~Task()。
10.2 Happy Path:自然完成
1 | 某 Driver 的 runInternal() → sink isFinished() → kAtEnd |
checkIfFinishedLocked(Task.cpp:2237)的判定:numFinishedDrivers_ == numTotalDrivers_(或 ungrouped 下输出
pipeline 的 driver 全部完成),且 !hasPartitionedOutput() || partitionedOutputConsumed_(结果已被下游取走)。
注意输出是否被消费由 OutputBuffer 经 setAllOutputConsumed() 异步通知——所以最后一个 driver 完成时若 output 还没被
取走,会先返回 false,待 setAllOutputConsumed 再触发 finish。
10.3 异常 Path:错误 / 取消 / 中止
三种非正常终结都走同一入口,最终汇聚到 terminate:
1 | 算子抛异常 → runInternal catch → task()->setError(eptr) // Driver.cpp:810 |
与 happy path 的关键差异:Driver 此刻可能正在线程上跑。terminate 因此要分两类处理 Driver(见下)。
10.4 terminate:统一清理漏斗(happy/error 共用)
terminate(terminalState)(Task.cpp:2502)是所有终结的汇聚点,分”锁内”与”锁外”两段:
锁内(持 mutex_):
- 若已非 running,直接返回
makeFinishFutureLocked(幂等); state_ = terminalState;记录terminationTimeMs;取消/中止时构造exception_;terminateRequested_ = true(on-thread 的 Driver 下次shouldStop()会看到);numRunningDrivers_ = 0;- 遍历
drivers_,对每个非空 driver 调enterForTerminateLocked(Task.cpp:3367):- 不在线程上 → 返回
kTerminate,driver 被移入offThreadDrivers,driverClosedLocked(); - 在线程上 / 已 pause → 返回
kAlreadyOnThread/kPause,不在这里关,留给 driver 自己的leave路径关;
- 不在线程上 → 返回
- 把
exchangeClients_、barrierFinishPromises_swap 到局部变量;barrierRequested_ = false。
锁外(避免持锁触发回调死锁,见 5.2.3):
taskCompletionNotifier.notify()/stateChangeNotifier.notify()——兑现完成/状态变更 promise;- 对
offThreadDrivers逐个driver->closeByTask()(Driver.cpp:1077,在”已 terminate + 仿在线程”语境下关闭算子); maybeRemoveFromOutputBufferManager()——删除 OutputBuffer(释放它对 Task 的反向引用);- 关闭并清空
exchangeClients——停止远程取数、避免重发请求; - 收集并清空所有
SplitsStore的待发 split promise、处理剩余 remote split; splitGroupState.clear()——清 JoinBridge、LocalExchange、barriers;- 兑现
splitPromises(唤醒等 split 的 driver,让它们感知终止)、bridge->cancel()、关闭preloadingSplits_、兑现barrierPromises; - 返回
makeFinishFuture——调用方 await 该 future 即”所有线程已退出”。
on-thread 的 Driver 怎么收尾? 它们不在 terminate 里关闭,而是:下次循环顶部 shouldStop() 看到terminateRequested_ → 返回 kTerminate → runInternal 退出 → CancelGuard 析构 → Task::leave 检测到kTerminate 且有 driverCb → 在 driver 线程上调 close()(Task.cpp:3415)。这保证 Driver 的 close 永远在 driver
自己的线程上执行,避免与算子的 abort 竞态(见 3.3)。
10.5 最终回收:~Task()
当最后一个引用 Task 的 shared_ptr 释放(drivers_ 已清空解了循环引用、外部持有者也释放后),~Task()
(Task.cpp:452)执行:
1 | removeSpillDirectoryIfExists(); // 删除 spill 目录(若曾创建) Task.cpp:691 |
内存池的销毁顺序很关键:必须先 childPools_.clear()(leaf/node 池)再 pool_.reset()(root),因为 aggregate
父池要求子池先于自己销毁。removeFromTaskList()(从全局运行任务表摘除)通过 SCOPE_EXIT 保证最先注册、最后执行。
10.6 资源回收时机汇总
| 资源 | 释放时机 | 代码位置 |
|---|---|---|
| 算子逻辑状态(哈希表、缓冲行等) | Driver close 时 Operator::close() |
Driver::closeOperators Driver.cpp:898 |
| Task→Driver 的 shared_ptr(解循环引用) | removeDriver(happy)/ terminate(error) |
Task.cpp:1464,2559 |
| OutputBuffer | terminate → maybeRemoveFromOutputBufferManager |
Task.cpp:2590 |
| ExchangeClient | terminate 关闭并清空 |
Task.cpp:2592 |
| JoinBridge / LocalExchange / barriers | terminate → SplitGroupState::clear() |
TaskStructs.h:248 |
| 待发 split promise / preloading split | terminate 兑现 / 关闭 |
Task.cpp:2675,2683 |
| 算子 / node 内存池(buffer 所在) | **~Task() 的 childPools_.clear()**(刻意保活至此) |
Task.cpp:490 |
task root 内存池 pool_ |
**~Task() 的 pool_.reset()**(最后) |
Task.cpp:491 |
| spill 目录 | ~Task() → removeSpillDirectoryIfExists |
Task.cpp:470,691 |
10.7 设计要点
- terminate 是 happy / error 共用的单一漏斗(呼应 5.1.2)——清理逻辑只写一遍,状态码不同而已;
- 内存池与 Driver 生命周期解耦——Driver close 只释放算子逻辑状态,内存池对象保活到
~Task(),支撑跨 Driver 零拷贝向量共享; - on-thread Driver 自己在自己的线程上 close——terminate 只关 off-thread 的,避免 close/abort 竞态;
- 回收顺序受所有权约束——子池先于父池、
childPools_先于pool_、drivers_清空解循环引用后 Task 才可能析构; - 析构期带调试探针——
~Task()用clearStage字符串逐步标记,便于定位 jemalloc 析构崩溃(源码 TODO 注释)。
十一、Future/Promise 管理:避免 Driver 泄漏的设计与实现
Task/Driver 里散布着十来组 promise/future。它们是协作式调度的”唤醒总线”,但也是最危险的地方——一个永不兑现的 future
就能让一个 Driver(及它通过 DriverCtx 拽住的整个 Task)永久泄漏。本章解读这套机制的设计与背后的防泄漏思考。
11.1 为什么 future/promise 在这里特别危险
回顾第三章的引用关系:Task ↔ Driver 是循环引用,靠 drivers_[i] = nullptr 手动切断。但 Driver 的 shared_ptr
**不只存在于 drivers_**,还藏在 future 的回调闭包里:
- BlockingState(
Driver.cpp:184,并行模式):成员std::shared_ptr<Driver> driver_,并把整个state
捕获进setResume的thenValue闭包(Driver.cpp:232); - DriverBlockingState(
Task.cpp:3772,串行模式):闭包捕获driverHolder = driver_->shared_from_this(); - Executor 队列:
[driver]() { Driver::run(driver); }(Driver.cpp:302)。
也就是说,一个阻塞中的 Driver,其生命被它正在等待的那个 future 兜住。推出核心不变量:
每个挂起 Driver 的 future 都必须有一条”终将兑现或出错”的路径。只要有一个 future 可能永不了结,对应的 Driver 和
Task 就会泄漏。 文档TaskDriverOperatorLifecycle.md末尾那句警告正是此意(见第四章)。
这套设计的全部精力,就是保证这个不变量在所有路径(正常、阻塞、暂停、取消、错误、提前终止)下都成立。
11.2 全景:Task/Driver 中的 promise/future
| promise 组 | 等待者 | 兑现者 | 兑现时机 |
|---|---|---|---|
BlockingState / DriverBlockingState |
阻塞的 Driver | 外部事件(split/数据/buildbridge…) | future 兑现 → 重新 enqueue |
SplitsStore::promises_ |
等 split 的 Driver | addSplit / noMoreSplits / terminate |
来 split / 收工 / 终止 |
BarrierState::allPeersFinishedPromises |
join build 非末位 Driver | 末位 Driver(或 terminate) | 屏障完成 |
threadFinishPromises_ |
requestPause / terminate 调用方 | allThreadsFinishedLocked |
numThreads_ 归零 |
resumePromises_ |
pauseRequested 调用方 |
Task::resume |
任务恢复 |
taskCompletionPromises_ |
taskCompletionFuture 等待方 |
terminate |
任务终结 |
stateChangePromises_ |
stateChangeFuture 等待方 |
removeDriver / terminate | 状态变更 |
taskDeletionPromises_ |
taskDeletionFuture 等待方 |
~Task() |
任务析构 |
barrierFinishPromises_ |
requestBarrier 调用方 |
末位 barrier driver / terminate | barrier 完成 |
注意每一行的”兑现者”列都包含一条 terminate / 析构兜底路径——这不是巧合,是 11.4 要讲的设计。
11.3 陷阱一:持锁兑现 promise → 死锁
promise.setValue() 会同步触发挂在 future 上的 continuation,而 continuation 往往要回头抢 Task::mutex_
(例如 BlockingState::setResume 的回调里 std::lock_guard l(task->mutex()),Driver.cpp:236)。若在持锁时setValue,就是”持锁 → 触发回调 → 回调抢同一把锁”的自死锁。
对策是贯穿全代码的**”锁内收集、锁外兑现”**模式。两种写法:
(a) swap 到局部变量,出锁再兑现(allThreadsFinishedLocked,Task.cpp:3532):
1 | std::vector<ContinuePromise> Task::allThreadsFinishedLocked() { |
(b) EventCompletionNotifier——两段式 + 析构兜底(Task.cpp:51):
1 | class EventCompletionNotifier { |
用法(terminate,Task.cpp:2545):锁内 taskCompletionNotifier.activate(std::move(taskCompletionPromises_), ...),
出锁后 taskCompletionNotifier.notify()。即便中途异常提前 return,析构函数也会 notify(),且 active_ 保证不重复兑现。
11.4 陷阱二:future 永不兑现 → Driver 泄漏 → terminate 作”总清算”
这是最致命的陷阱。一个 Driver 阻塞在某个 future 上(等 split、等 consumer、等 join build),如果任务因错误/取消而中止,
那些事件可能永远不会自然发生——split 不会再来、consumer 已死、build 永远完不成。若放任不管,这些 Driver 的BlockingState 闭包会永久持有 Driver→Task,泄漏。
Task::terminate(第十章 10.4)因此承担”promise 总清算”职责,把所有可能让 Driver 永远 block 的 promise 全部兑现
(Task.cpp:2502 起):
1 | // terminate 锁外段,逐一兑现/取消,确保没有 Driver 留在 block 态: |
被唤醒的 Driver 重新上线后,循环顶部 shouldStop() 看到 terminateRequested_ → 返回 kTerminate → 干净退出 →close() → drivers_[i]=nullptr,闭包随 future 兑现而析构,Driver 引用计数归零。**”唤醒”不是为了让它继续干活,而是为了
让它有机会发现该退出、从而释放自己。** 这就是为什么 11.2 表格里每组 promise 都有 terminate 兜底。
对应地,
makeFinishFutureLocked(Task.cpp:2710)在numThreads_ == 0时立即兑现而非挂起——没有线程在跑就
没什么可等的,避免凭空制造一个永不兑现的 future。
11.5 陷阱三:double-resume / 两个线程进同一个 Driver
future 兑现是异步的,可能与 Task 的 pause/terminate 竞争。若处理不当,同一个 Driver 可能被两个线程同时 run。BlockingState::setResume(Driver.cpp:227)有两道防线:
1 | void BlockingState::setResume(std::shared_ptr<BlockingState> state) { |
三个要点:
- 在 Task 锁外注册 resume(
Driver::run里调 setResume,此时已不持锁)——注释(Driver.cpp:863)说明这样
“若 future 已经兑现,也不会有第二个线程进入同一个 Driver”; - pause 检查:若任务正在 pause,不重新 enqueue,把恢复权交给
Task::resume(Task.cpp:1204)——避免 pause
期间 Driver 偷偷回到线程; thenError兜底:folly future 若以异常兑现(包括 promise 被析构导致的 BrokenPromise),thenError把它转成task->setError,绝不让一个 future 静默地烂尾。DriverBlockingState::setDriverFuture(Task.cpp:3808)有完全
对称的thenError分支。
Driver::run 里还有一处微妙竞态处理(Driver.cpp:856):拿到 kBlock 后若发现 task->shouldStop()==kTerminate,
直接 return 而不进 resume 模式——否则会在已终止的任务上挂一个永不兑现的 future。
11.6 可观测性:让泄漏”看得见”
设计者清楚这类泄漏极难调试,于是埋了两个探针:
- **
numBlockedDrivers_**(Driver.cpp:205,BlockingState构造++、析构--):进程级”当前有多少 Driver 处于
block 态”的计数,通过BlockingState::numBlockedDrivers()暴露。泄漏的 Driver 会让这个数永不归零; - **
driversClosedByTask_**(Task.h:1307,vector<weak_ptr<Driver>>):记录被 Task 强行关闭的 Driver。注释
(Task.h:1303)直言其目的——“当 race/bug 导致这些 Driver 被永久持有、进而把 Task 变成僵尸时,用这个 vector 辅助
调试僵尸 Task”。用weak_ptr是为了观测而不延长生命。
11.7 设计哲学总结
| 原则 | 实现 |
|---|---|
| 每个 future 必有兑现路径 | 正常兑现 + terminate 总清算 + thenError 兜底,三重保证 |
| promise 兑现必在锁外 | swap-to-local / EventCompletionNotifier 两段式 |
| 兑现幂等、提前退出也兜底 | EventCompletionNotifier 析构调 notify() + active_ 标志 |
| 唤醒是为了让 Driver 发现该退出 | terminate 唤醒所有等待者,它们上线即见 kTerminate 自行 close |
| 无线程可等就别造 future | makeFinishFutureLocked 在 numThreads_==0 时立即兑现 |
| 防双线程进同一 Driver | setResume 锁外注册 + pause 检查 + run 里的 terminate 竞态短路 |
| 泄漏可观测 | numBlockedDrivers_ 计数 + driversClosedByTask_ 僵尸探针 |
一句话:Velox 不试图”小心翼翼地不泄漏”,而是建立一个强不变量——“任何挂起的 Driver 都被某个 future 兜住,而每个
future 都有正常兑现、terminate 兜底、thenError 转错三条出路之一”——再让 terminate 这个单一漏斗在所有异常路径上强制兑现
一切,从结构上消灭”future 永不兑现”的可能。 这与第五章的”把并发正确性从靠程序员小心转化为靠结构保证”一脉相承。