1. 1. 一、Task 的设计与实现
    1. 1.1. 1.1 Task 是什么
    2. 1.2. 1.2 Task 状态机
    3. 1.3. 1.3 核心数据成员
    4. 1.4. 1.4 Task 的执行模式
    5. 1.5. 1.5 Task 启动流程
    6. 1.6. 1.6 Split 分发
    7. 1.7. 1.7 allPeersFinished —— Hash Join Build 同步屏障
    8. 1.8. 1.8 Task::terminate —— 统一终止路径
  2. 2. 二、Driver 的设计与实现
    1. 2.1. 2.1 Driver 是什么
    2. 2.2. 2.2 Driver 的核心组成
    3. 2.3. 2.3 ThreadState —— Driver 的状态
    4. 2.4. 2.4 Driver 状态流转
    5. 2.5. 2.5 Driver::runInternal —— 核心执行循环
    6. 2.6. 2.6 blockDriver —— 进入 Blocked 状态
    7. 2.7. 2.7 CancelGuard —— 线程安全的清理保证
    8. 2.8. 2.8 Driver::close —— 正常关闭路径
  3. 3. 三、Task 与 Driver 的交互与生命周期
    1. 3.1. 3.1 引用关系图
    2. 3.2. 3.2 完整生命周期
    3. 3.3. 3.3 Task::enter / leave —— Driver 线程注册协议
    4. 3.4. 3.4 Suspended 状态的用途
    5. 3.5. 3.5 Split Group 并发控制
  4. 4. 四、TaskDriverOperatorLifecycle.md 中文翻译
  • Task、Driver、Operator 生命周期
    1. 1. OutputBufferManager
    2. 2. Operator
    3. 3. Driver 与 Task
    4. 4. 五、架构设计品味、代码品味与多线程编程范式
      1. 4.1. 5.1 架构设计品味
        1. 4.1.1. 5.1.1 “协作式调度”而非”抢占式线程”
        2. 4.1.2. 5.1.2 单一终止漏斗(terminate as a funnel)
        3. 4.1.3. 5.1.3 所有权方向与生命周期的”单调性”
        4. 4.1.4. 5.1.4 循环引用的”受控泄漏”
        5. 4.1.5. 5.1.5 窄接口统一异构等待——isBlocked 作为唯一的”挂起-唤醒”抽象
      2. 4.2. 5.2 代码品味
        1. 4.2.1. 5.2.1 快路径无锁,慢路径加锁
        2. 4.2.2. 5.2.2 “Locked 后缀”约定——把锁契约编码进函数名
        3. 4.2.3. 5.2.3 RAII 兜底一切
        4. 4.2.4. 5.2.4 统计的双保险
        5. 4.2.5. 5.2.5 主循环写得极致紧凑
        6. 4.2.6. 5.2.6 延迟报告模式——阻塞在数据操作里发现,在下一轮 isBlocked 报告
        7. 4.2.7. 5.2.7 future 组合语义必须匹配等待语义——collectAny vs collectAll
      3. 4.3. 5.3 涉及的多线程编程范式
        1. 4.3.1. 一条贯穿全局的主线
    5. 5. 六、runInternal 的执行模型:不是 Volcano,也不是教科书式 push
      1. 5.1. 6.1 控制流:Driver 是唯一的调度器
      2. 5.2. 6.2 调度扫描方向:从 sink 往 source 找”谁能干活”
      3. 5.3. 6.3 demand-gated(需求门控)
      4. 5.4. 6.4 为什么”扁平循环 + Driver 调度”是协作式调度的前提
      5. 5.5. 6.5 小结
    6. 6. 七、runInternal 调度实例:三算子 pipeline 全过程
      1. 6.1. 7.0 算子接口语义
      2. 6.2. 7.1 阶段一:冷启动——从 sink 往 source “找活干”
      3. 6.3. 7.2 阶段二:数据推进——i += 2 的精妙之处
      4. 6.4. 7.3 阶段三:稳态循环
      5. 6.5. 7.4 阶段四:阻塞场景——blockDriver 让出线程
        1. 6.5.1. 场景 A:OutputBuffer 满(下游消费跟不上)
        2. 6.5.2. 场景 B:TableScan 等 split
      6. 6.6. 7.5 阶段五:收尾——noMoreInput 链式传播 → close
      7. 6.7. 7.6 时间线汇总
    7. 7. 八、isBlocked 与三种让出机制:结合典型算子解读
      1. 7.1. 8.1 isBlocked 的角色:声明阻塞意愿 + 交出唤醒句柄
      2. 7.2. 8.2 三种让出线程的方式
      3. 7.3. 8.3 典型算子的 isBlocked 解读
        1. 7.3.1. (1) CallbackSink —— sink 的下游反压(延迟报告模式)
        2. 7.3.2. (2) LocalExchange —— 本地交换 source(getOutput 探测 + isBlocked 报告)
        3. 7.3.3. (3) Exchange —— 远程交换 source(isBlocked 里干实活 + collectAny)
        4. 7.3.4. (4) Merge / LocalMerge —— 多源归并(kWaitForProducer)
        5. 7.3.5. (5) LocalPartition —— 多下游 sink(collectAll)
        6. 7.3.6. (6) HashProbe —— join probe(状态机驱动 + kWaitForJoinBuild)
      4. 7.4. 8.4 两种 future 组合语义:collectAny vs collectAll
      5. 7.5. 8.5 设计模式总结
    8. 8. 九、为什么产出统一从 getOutput 发起
      1. 8.1. 9.1 核心原因:解耦”消费输入速率”与”产生输出速率”
      2. 8.2. 9.2 getOutput 返回 null 是干净的、可让出的信号
      3. 8.3. 9.3 统一性:source / 中间 / 攒批算子,Driver 一视同仁
      4. 8.4. 9.4 与 demand-gated push 的关系:pull 触发,push 传递
      5. 8.5. 9.5 小结
    9. 9. 十、Task 的资源管理:terminate 流程与资源回收
      1. 9.1. 10.1 Memory Pool 树:层级、所有权与”刻意保活”
      2. 9.2. 10.2 Happy Path:自然完成
      3. 9.3. 10.3 异常 Path:错误 / 取消 / 中止
      4. 9.4. 10.4 terminate:统一清理漏斗(happy/error 共用)
      5. 9.5. 10.5 最终回收:~Task()
      6. 9.6. 10.6 资源回收时机汇总
      7. 9.7. 10.7 设计要点
    10. 10. 十一、Future/Promise 管理:避免 Driver 泄漏的设计与实现
      1. 10.1. 11.1 为什么 future/promise 在这里特别危险
      2. 10.2. 11.2 全景:Task/Driver 中的 promise/future
      3. 10.3. 11.3 陷阱一:持锁兑现 promise → 死锁
      4. 10.4. 11.4 陷阱二:future 永不兑现 → Driver 泄漏 → terminate 作”总清算”
      5. 10.5. 11.5 陷阱三:double-resume / 两个线程进同一个 Driver
      6. 10.6. 11.6 可观测性:让泄漏”看得见”
      7. 10.7. 11.7 设计哲学总结
  • Velox Task & Driver

    一、Task 的设计与实现

    1.1 Task 是什么

    Task 代表一个查询片段(PlanFragment)的完整执行单元,是 Velox 执行层最顶层的对象,负责:

    • 将 PlanFragment 翻译成若干 Pipeline(DriverFactory);
    • 创建并管理所有 Driver;
    • 管理 Split(数据分片)的分发;
    • 维护跨 Driver 共享的桥接结构(JoinBridge、LocalExchange 等);
    • 协调终止、暂停、取消等控制流。

    定义在 velox/exec/Task.h:43

    1.2 Task 状态机

    1
    2
    3
    4
    5
    6
    7
    TaskState(velox/exec/TaskStructs.h:44)

    kRunning(0) ──── 正常结束 ──────────────────► kFinished(1)
    │ ──── requestCancel() ──────────► kCanceled(2)
    │ ──── requestAbort() ──────────► kAborted(3)
    │ ──── setError() ──────────► kFailed(4)
    └─── 所有路径均通过 Task::terminate(terminalState) 实现

    状态转换核心代码(Task.cpp:2516):

    1
    2
    state_ = terminalState;
    terminateRequested_ = true;

    1.3 核心数据成员

    成员 类型 作用
    drivers_ vector<shared_ptr<Driver>> 拥有所有 Driver(Task.h:1250
    driverFactories_ vector<unique_ptr<DriverFactory>> 每条 Pipeline 一个,描述如何创建 Driver
    splitGroupStates_ unordered_map<uint32_t, SplitGroupState> 每个 split group 的跨算子共享状态(JoinBridge、LocalExchange、Barrier)
    splitsStates_ unordered_map<PlanNodeId, SplitsState> 每个 source node 的 Split 队列
    numThreads_ int32_t 当前在线程上运行的 Driver 数量
    terminateRequested_ atomic_bool 是否请求终止,Driver 会频繁轮询
    pauseRequested_ atomic_bool 是否请求暂停
    mutex_ timed_mutex 保护所有状态字段

    1.4 Task 的执行模式

    Task.h:46 定义了两种模式:

    • kParallel:通过 Task::start() 启动,把 Driver 提交到 Executor 并行执行(生产用途)。
    • kSerial:通过 Task::next() 在调用线程上单步执行,用于测试或嵌入式查询。

    1.5 Task 启动流程

    调用链:Task::start()createDriverFactoriesLocked()initializePartitionOutput()createAndStartDrivers()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // Task.cpp:982 — Task::start()
    void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
    // 1. 通过 LocalPlanner::plan() 将 PlanFragment 切分成 DriverFactory 列表
    createDriverFactoriesLocked(maxDrivers);
    // 2. 初始化 PartitionedOutput buffer(如果有 PartitionedOutputNode)
    initializePartitionOutput();
    // 3. 为 ungrouped execution 创建 Driver,为 grouped execution 按需创建
    createAndStartDrivers(concurrentSplitGroups);
    }

    createAndStartDrivers()Task.cpp:1070):

    1
    2
    3
    4
    5
    6
    7
    // 为 ungrouped execution 创建并入队所有 Driver
    for (auto it = drivers_.end() - numDriversUngrouped_; it != drivers_.end(); ++it) {
    ++numRunningDrivers_;
    Driver::enqueue(*it); // 提交到线程池
    }
    // 如果有 grouped execution,调用 ensureSplitGroupsAreBeingProcessedLocked()
    // 按 concurrentSplitGroups_ 上限,为已有 split 的 group 创建 Driver

    drivers_ 的内存布局(Grouped Execution):

    1
    2
    drivers_[0 .. numDriversPerSplitGroup_*concurrentSplitGroups_-1]   ← grouped execution 槽位
    drivers_[numDriversPerSplitGroup_*concurrentSplitGroups_ ..] ← ungrouped execution Driver

    Grouped execution 的槽位在 split group 完成后置 nullptr,新 split group 的 Driver 复用这些槽位。

    1.6 Split 分发

    1
    2
    Task::addSplit() → addSplitLocked() → SplitsStore::addSplit()
    → 唤醒所有等待 split 的 Driver(通过 ContinuePromise)

    Driver 通过 Task::getSplitOrFuture()Task.h:442)从 SplitsStore 取 split。如果没有 split,返回
    kWaitForSplit,Driver 进入 Blocked 状态。

    1.7 allPeersFinished —— Hash Join Build 同步屏障

    Task.h:526:所有并发的 build driver 都执行完后,最后一个 driver 收到 true,其他 driver 通过
    ContinueFuture 阻塞等待。这是 HashJoin 多线程 build 的协调机制。

    SplitGroupState::barriersTaskStructs.h:213)存储这些屏障状态,持有 Driver 的 shared_ptr,形成一个引用环——
    该引用环在 allPeersFinished 正常路径或 Task::terminate 异常路径中被清理。

    1.8 Task::terminate —— 统一终止路径

    Task.cpp:2502 是所有终止的汇聚点:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    terminate(terminalState) {
    1. 加锁,将 state_ 设置为终态
    2. terminateRequested_ = true(Driver 循环会看到)
    3. numRunningDrivers_ = 0
    4. 对所有 drivers_:
    enterForTerminateLocked() → 不在线程上的 Driver 直接标 isTerminated,
    收集到 offThreadDrivers(需在锁外 closeByTask())
    5. 解锁后:
    offThreadDrivers.closeByTask() // 关闭不在线程的 Driver
    maybeRemoveFromOutputBufferManager() // 清理输出 buffer
    关闭 ExchangeClient
    清理 splitGroupStates(bridges、barriers、localExchanges)
    兑现所有 splitPromises(让等待 split 的 Driver 感知终止)
    cancel 所有 JoinBridge
    6. 返回 future,调用方 await 等待所有线程退出
    }

    二、Driver 的设计与实现

    2.1 Driver 是什么

    Driver 是单条 Pipeline 的单线程执行引擎。每个 Driver 持有一组串联的 Operator,从 source operator
    (TableScan、Exchange 等)拉数据,逐级传递给 sink operator(PartitionedOutput、CallbackSink 等)。

    定义在 velox/exec/Driver.h:353

    2.2 Driver 的核心组成

    1
    2
    3
    4
    5
    6
    Driver
    ├── ctx_: unique_ptr<DriverCtx> // 持有 Task shared_ptr(引用环的一侧)
    ├── operators_: vector<unique_ptr<Operator>> // 独占所有算子
    ├── state_: ThreadState // 线程状态,由 Task::mutex_ 保护
    ├── barrier_: BarrierState // Barrier 处理状态
    └── blockingReason_: BlockingReason // 当前阻塞原因

    DriverCtxDriver.h:231):Driver 的执行上下文,是 Driver 与 Task 之间的接口层。它持有
    shared_ptr<Task>,这是 Driver → Task 引用的唯一路径。

    2.3 ThreadState —— Driver 的状态

    Driver.h:97 定义了 ThreadState

    1
    2
    3
    4
    5
    6
    7
    struct ThreadState {
    atomic<thread::id> thread; // 正在运行的线程 id(非空 = 在线程上)
    atomic<bool> isEnqueued; // 已提交到 Executor 但还未开始运行
    atomic<bool> isTerminated; // 已终止,终态
    tsan_atomic<bool> hasBlockingFuture; // 有待兑现的 future(Blocked 状态)
    atomic<uint32_t> numSuspensions; // Suspended 层数(> 0 = suspended)
    };

    2.4 Driver 状态流转

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
                       创建后

    ┌────▼────┐
    │ Created │ 初始状态:所有 flag 均 false
    └────┬────┘
    │ Driver::enqueue()
    ┌────▼──────┐
    │ Enqueued │ isEnqueued=true,已提交到线程池
    └────┬──────┘
    │ Executor 调度到线程,Task::enter() → kNone
    ┌────▼──────────┐
    ┌──►│ On Thread │ thread 字段被设置,numThreads_++
    │ └──┬──┬──┬──┬──┘
    │ │ │ │ │
    │ kBlock │ kYield │ kPause/kTerminate
    │ │ │ │
    │ ┌───▼─┐│ ┌───▼──────┐
    │ │Blocked││ │Terminated│ isTerminated=true,终态
    │ │hasBlk=││ └──────────┘
    │ │true ││
    │ └───┬───┘│
    future兑现│ │ │ Driver::enqueue() → 重新入队
    setResume() │ │ │
    └──────┘ │
    └───────────┘ (kYield → 重新入队)

    kPause → Off Thread(等待 resume)
    kBlock + Suspended → Suspended(numSuspensions>0,仍在线程)

    状态说明(对应代码注释 Driver.h:72):

    状态 判断条件 进入方式 退出方式
    Created 所有 flag 为 false Task::createDriversLocked enqueue
    Enqueued isEnqueued=true Driver::enqueue() Executor 调度到线程
    On Thread thread 字段非空 Task::enter() 返回 kNone 返回 kBlock/kYield/kPause/kTerminate
    Blocked hasBlockingFuture=true BlockingState 构造时设置 future 兑现 → BlockingState::setResume() → enqueue
    Suspended numSuspensions > 0 Task::enterSuspended() Task::leaveSuspended()
    Terminated isTerminated=true Task::enter() 返回 kTerminate,或 Task::leave() 无(终态)

    2.5 Driver::runInternal —— 核心执行循环

    Driver.cpp:519 是 Driver 的心脏(详细的执行模型分析见第六章):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    runInternal() {
    1. 记录 queue 时间,设置 onThreadStartUs_
    2. Task::enter(state_) // 向 Task 注册"我上线了"
    → kTerminate: 设置 task error,返回
    → kNone: 继续
    3. 安装 CancelGuard(析构时调用 Task::leave + Driver::close)
    4. initializeOperators() // 第一次执行时初始化所有算子
    5. 主循环 for(;;):
    for (i = startingOperator(=sink); i >= 0; --i): // 从 sink 往 source 扫描
    a. task()->shouldStop() // 检查终止/暂停/yield
    b. shouldYield() // CPU 时间片耗尽则返回 kYield
    c. checkUnderArbitration() // 内存仲裁中则 block
    d. op->isBlocked() // 算子阻塞则 blockDriver()
    e. 若下游 (i+1) needsInput:
    op->getOutput() → 有数据 → nextOp->addInput() → i+=2(向下游推进)
    op->getOutput() → null → op->isFinished() → nextOp->noMoreInput()
    f. 若 i 为 sink: getOutput() 返回非空 → kBlock(串行模式)
    isFinished() → close() → kAtEnd
    6. catch → task()->setError(),返回 kAlreadyTerminated
    }

    2.6 blockDriver —— 进入 Blocked 状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // Driver.cpp:669(声明 Driver.h:669)
    StopReason blockDriver(...) {
    blockingState = make_shared<BlockingState>(self, move(future), op, reason);
    guard.notThrown(); // 避免 CancelGuard 触发 close
    return StopReason::kBlock;
    }
    // Driver::run() 中:
    case StopReason::kBlock:
    BlockingState::setResume(blockingState); // 注册 future 回调
    return;

    BlockingState::setResume()Driver.cpp:227):future 兑现时(通过 QueuedImmediateExecutor)记录算子
    blocking 时间、hasBlockingFuture = false,若 task 未 pause 则 Driver::enqueue(driver) 重新入队。

    2.7 CancelGuard —— 线程安全的清理保证

    Driver.h:563,RAII 对象。runInternal 正常结束时调用 notThrown();若发生异常,guard 析构会调用
    Task::leave()Driver::close(),确保 Driver 干净退出。

    2.8 Driver::close —— 正常关闭路径

    1
    2
    3
    4
    5
    6
    7
    // Driver.cpp:1063
    void Driver::close() {
    if (!closed_.exchange(true)) { // 幂等
    closeOperators(); // 关闭所有算子,上报统计
    Task::removeDriver(task(), this); // 从 Task 中移除自己
    }
    }

    closeOperators()Driver.cpp:898):关闭所有算子、将 driver 生命周期统计(queued/on-thread/blocked
    时间)与算子统计上报到 Task。


    三、Task 与 Driver 的交互与生命周期

    3.1 引用关系图

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Task ──── shared_ptr ────► Driver (drivers_[i])
    ▲ │
    │ unique_ptr
    │ │
    │ DriverCtx
    │ │
    └─── shared_ptr ────────────┘ (ctx_->task)

    形成循环引用:Task → Driver → DriverCtx → Task

    解除循环引用的时机:
    drivers_[i] = nullptr (Task::removeDriver 或 Task::terminate 中)
    → 此后 Driver 只剩 DriverCtx 持有的 Task 引用
    → Driver 析构后,Task 引用计数归零(若外部也无引用)

    额外的两个 Driver 引用:

    1. BlockingState 中:std::shared_ptr<Driver> driver_,在 future 未兑现期间延长 Driver 生命。
    2. Executor 队列 lambda 中:[driver]() { Driver::run(driver); }

    3.2 完整生命周期

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    Task::create()

    Task::start() / Task::next()
    │ createDriverFactoriesLocked() — LocalPlanner 把 PlanFragment 切成 DriverFactory
    │ initializePartitionOutput() — 初始化输出 buffer
    │ createAndStartDrivers() — 创建 Driver,调用 Driver::enqueue()

    [所有 Driver 在 Executor 上并发运行]

    ├── 正常路径:runInternal() → kAtEnd → Driver::close() → Task::removeDriver()
    │ → checkIfFinishedLocked() → 所有 driver 完成 + output consumed
    │ → Task::terminate(kFinished)

    ├── 错误路径:operator 抛异常 → task()->setError() → Task::terminate(kFailed)
    │ → on-thread Driver 在下次 shouldStop() 看到 kTerminate,退出循环,
    │ CancelGuard 触发 close();off-thread Driver 直接 closeByTask()

    └── 外部取消:requestCancel()/requestAbort() → terminate(kCanceled/kAborted)(同错误路径)

    Task 析构 → pool_ 等资源释放 → taskDeletionPromises_ 被 fulfill

    3.3 Task::enter / leave —— Driver 线程注册协议

    Task::enter(state)Task.cpp:3341):

    1
    2
    3
    4
    5
    6
    加锁
    isEnqueued = false
    若 isTerminated → kAlreadyTerminated
    shouldStopLocked() 检查 terminateRequested_/pauseRequested_/toYield_
    若 kNone: ++numThreads_; state.setThread()
    返回 StopReason

    Task::leave(state, driverCb)Task.cpp:3382):

    1
    2
    3
    4
    5
    6
    7
    加锁,检查 shouldStop(此时可能收到 terminate 请求)
    若 kTerminate 且有 driverCb:
    解锁,调用 driverCb(kTerminate) // 在 driver 线程上关闭 driver
    再加锁
    --numThreads_
    若 numThreads_ == 0 → fulfill threadFinishPromises_(解除 pause/terminate 的等待)
    state.clearThread()

    该设计确保 Driver 的 close() 总在 driver 线程上执行,避免 driver close 与 operator abort 之间的竞态。

    3.4 Suspended 状态的用途

    当 Driver 需等待外部 IO、内存仲裁决策时,不能简单 block(会丢失调用栈),而是进入 Suspended 状态:

    1
    2
    3
    4
    5
    6
    // Task::enterSuspended():
    // --numThreads_(让 Task 认为没有线程在跑,满足 pause 条件)
    // numSuspensions++(Driver 仍持有线程栈)
    // 等待外部事件...
    // Task::leaveSuspended():
    // 若 pause 中,sleep 等待 resume;否则 ++numThreads_,--numSuspensions

    3.5 Split Group 并发控制

    Grouped Execution 模式下,Task 按 concurrentSplitGroups_ 控制并发处理的 split group 数量。每当一个 split
    group 的所有 Driver 完成,Task::removeDriver()ensureSplitGroupsAreBeingProcessedLocked()
    Task.cpp:1503)为下一个 queued split group 创建新 Driver。


    四、TaskDriverOperatorLifecycle.md 中文翻译

    以下为 velox/exec/TaskDriverOperatorLifecycle.md 的完整翻译。

    Task、Driver、Operator 生命周期

    摘要:Task 的生命周期长于 Driver 和 Operator。Task 与 Driver 之间存在循环引用,这些循环引用在 Task
    释放对 Driver 的引用时被解除。Task::terminate 包含了兜底逻辑,用于在异常或提前终止时清理所有资源。

    OutputBufferManager

    OutputBuffer 持有对 Task 的引用。这个引用用于在所有输出数据被消费完毕时,通过调用
    task->setAllOutputConsumed()(从 OutputBuffer::deleteResults 触发)来通知 Task。

    该引用在 OutputBuffer 被销毁时释放,销毁发生在两个地方:(1) OutputBufferManager::deleteResults(正常路径);
    (2) OutputBufferManager::removeTask(错误路径)。

    在 Prestissimo 应用中,OutputBufferManager::deleteResultsTaskManager::abortResults 调用,后者由
    TaskResource::abortResults(挂载在 HTTP DELETE /v1/task/{id}/results/{id} 路由)触发。

    OutputBufferManager::removeTask 则由 Task::terminate 调用。

    Operator

    Operator 不直接持有对 Task 的引用。它通过 DriverCtx 间接访问 Task。

    Driver 与 Task

    DriverCtx 存储了一个指向 Task 的 shared_ptr。Driver 独占拥有 DriverCtx,并通过 DriverCtx 间接持有对
    Task 的引用。Operator 则通过裸指针引用 DriverCtx

    1
    2
    3
    4
    // Driver 侧:
    std::unique_ptr<DriverCtx> ctx_;
    // DriverCtx 侧:
    std::shared_ptr<Task> task;

    Driver 对 Task 的引用只有在 Driver 析构时才会释放

    Task 共享拥有所有 Driver:

    1
    std::vector<std::shared_ptr<Driver>> drivers_;

    Task 在两个地方存储对 Driver 的引用:(1) drivers_ 成员变量,存储所有 Driver;(2)
    SplitGroupState.barriers,存储包含 join build 的 Driver(用于 Task::allPeersFinished)。

    drivers_ 的引用在以下两处被清理:(1) Task::removeDriver——由 Driver::close 调用;(2) Task::terminate

    barriers 中存储的 Driver 引用在以下情况下被清理:(1) Task::removeDriver 中,通过
    SplitGroupState::clear();(2) Task::allPeersFinished,在所有 join build 流水线完成时;(3)
    Task::terminate 中,通过 SplitGroupState::clear()

    Task::removeDriverTask::allPeersFinished 在正常路径(包括 Driver 自身遇到错误并主动关闭的路径)中被调用。
    Task::terminate 则在错误路径中被调用。

    Task 与 Driver 之间存在循环引用:Task 引用 Driver,Driver 引用 Task。这些循环引用在 Task 释放对 Driver 的引用时被解除。
    Driver 至少通过 DriverCtx::task 持有一个 Task 引用,直到析构为止。因此,Driver 的生命周期不可能超过 Task 的生命周期
    此外,Driver 独占拥有其所有 Operator,所以 Operator 的生命周期也不可能超过 Driver。总结:Task 的寿命 ≥ Driver
    的寿命 ≥ Operator 的寿命。

    BlockedState 结构体中也持有对 Driver 的引用。该结构体在 Driver 进入 Blocked 状态时创建,被存储在挂载到阻塞
    future 的回调 lambda 中。它仅在 future 兑现且回调执行完毕后才会销毁。详见 BlockingState::setResume如果 future
    永远不兑现,Driver 和 Task 将发生泄漏。

    Executor 队列中的 lambda 同样持有对 Driver 的引用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // static
    void Driver::enqueue(std::shared_ptr<Driver> driver) {
    // 预期在 Driver 所属 Task 的互斥锁保护下调用。
    driver->enqueueInternal();
    if (closed_) {
    return;
    }
    task()->queryCtx()->executor()->add([driver]() { Driver::run(driver); });
    }

    Driver 从以下几处被加入 Executor 队列:

    • Task::start:开始运行;
    • BlockingState::setResume:阻塞 future 兑现后继续运行;
    • Task::ensureSplitGroupsAreBeingProcessedLocked:在 Grouped Execution 中启动新的 split group;
    • Task::resume:此路径目前未被使用;
    • Driver::runTask::requestYield 触发:此路径目前未被使用。

    五、架构设计品味、代码品味与多线程编程范式

    5.1 架构设计品味

    5.1.1 “协作式调度”而非”抢占式线程”

    整个设计中最高层次的品味。朴素实现是每个 Pipeline 绑定一个 OS 线程、阻塞在 IO/锁上靠操作系统调度;Velox 反其道而行——
    **Driver 不拥有线程,而是被调度到线程池上的一段段”执行片”**:

    1
    2
    Driver 不是线程,而是一个可被反复"挂起/恢复"的状态机。
    线程是稀缺资源(CPUThreadPoolExecutor),Driver 是廉价的逻辑单元。

    收益:

    • M 个 Driver 复用 N 个线程(M ≫ N),一台机器可同时承载成千上万个 Pipeline 而不会线程爆炸;
    • 阻塞(等 split、等 join build、等下游消费、等内存仲裁)时,Driver 主动让出线程kBlocksetResume),线程立刻去跑别的 Driver;
    • CPU 时间片到期主动 kYield,实现公平性。

    本质上是把操作系统的线程调度,下沉为应用层的协程调度StopReason 枚举就是这套协程调度的”返回码协议”。

    5.1.2 单一终止漏斗(terminate as a funnel)

    无论是正常完成(kFinished)、用户取消(kCanceled)、外部错误(kAborted)、还是自身算子抛异常(kFailed),
    所有路径最终都汇聚到 Task::terminate(terminalState) 这一个函数。资源清理逻辑只写一遍,杜绝”正常路径清理了
    A 忘了 B,异常路径清理了 B 忘了 A”的经典 bug。

    5.1.3 所有权方向与生命周期的”单调性”

    1
    Task ⊇ Driver ⊇ Operator   (寿命包含关系)

    这个不变量靠所有权结构强制:Driver 通过 DriverCtx::task 这个 shared_ptr 死死拽住 Task 直到自己析构;
    Driver 用 unique_ptr 独占 Operator。于是 Operator 里可以放心地用裸指针访问 DriverCtx、Task,
    不需要任何 weak_ptr / 生命周期检查——类型系统已经证明了它们一定还活着。

    5.1.4 循环引用的”受控泄漏”

    Task ↔ Driver 是明知故犯的循环引用。设计者没有用 weak_ptr 打破它,而是选择在一个确定的点手动切断
    drivers_[i] = nullptr)。原因:执行期间 Task 必须强引用 Driver,Driver 也必须强引用 Task,weak_ptr 会引入
    “每次访问都要 lock() 检查”的开销。这里用一个明确的手动切断点,换取运行期的零开销与代码简洁;代价是 lifecycle.md
    末尾那句警告——若 future 永不兑现就会泄漏。设计者清楚代价并写进了文档。

    5.1.5 窄接口统一异构等待——isBlocked 作为唯一的”挂起-唤醒”抽象

    整个执行引擎里,算子要等的外部事件五花八门:等 split(TableScan)、等远程数据(Exchange)、等本地 queue
    (LocalExchange)、等 join build(HashProbe)、等内存仲裁、等 RPC……但 Driver 调度循环只认识一种东西
    isBlocked 返回的 (BlockingReason, ContinueFuture) 二元组(Operator.h:284)。

    这是极有品味的架构收敛:用一个窄接口把所有异构的异步等待归一化。 Driver 不需要知道”等的是什么”,只需要
    “reason 非 kNotBlocked 就让出、future 兑现就唤醒”。新增一种等待来源(如后来的 kWaitForRPC)不需要改动调度核心,
    只需让对应算子在 isBlocked 里返回新的 reason + future。BlockingReason 枚举(BlockingReason.h)只是诊断/统计用的
    标签,对调度逻辑而言所有 reason 等价。详见第八章。

    这条与 5.1.1 的”协作式调度”互为表里:5.1.1 讲 Driver 如何让出线程,5.1.5 讲算子如何统一地告诉 Driver”该让出了”。

    5.2 代码品味

    5.2.1 快路径无锁,慢路径加锁

    Task::shouldStop()Task.cpp:3505)被 Driver 主循环每个算子每轮调用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    StopReason Task::shouldStop() {
    if (pauseRequested_) return StopReason::kPause; // atomic 读,无锁
    if (terminateRequested_) return StopReason::kTerminate; // atomic 读,无锁
    if (toYield_) { // 只有 yield 才加锁
    std::lock_guard<std::timed_mutex> l(mutex_);
    return shouldStopLocked();
    }
    return StopReason::kNone; // 绝大多数走到这里
    }

    Task.h:1414 注释明确解释了为何这些字段是 atomic:为了让 ThreadSanitizer 不误报,同时表明”在锁外读 0/false 是安全的”。

    5.2.2 “Locked 后缀”约定——把锁契约编码进函数名

    foo()(自己加锁)与 fooLocked()(调用者必须已持锁)成对出现,把”调用前必须持有 mutex_”这一契约固化进函数签名

    5.2.3 RAII 兜底一切

    CancelGuardDriver.h:563)保证 Driver 无论正常返回还是抛异常都会 Task::leave + close()。promise 的兑现也用
    folly::makeGuard 包裹。关键细节:promise 的 setValue() 会同步触发 future 回调,回调里可能又抢 mutex_
    所以代码刻意在锁内把 promise/driver swap 到局部变量,出锁后再 setValue/closeByTask,避免”持锁触发回调 → 回调抢同一把锁 → 死锁”。这个”锁内收集、锁外执行副作用”的模式在 Task::leaveTask::terminate 中反复出现。

    5.2.4 统计的双保险

    Driver.cpp:533 的 on-thread 时间统计,既在 closeOperators() 手动 finalize,又有 scope guard 兜底,并用
    onThreadStartUs_ = 0 哨兵值防止重复计数。两条路径都覆盖且防双计。

    5.2.5 主循环写得极致紧凑

    runInternal 用一个 for 配合 i += 2; continue; 实现数据流调度,没有递归、没有 goto(符合 CODING_STYLE)。
    注意:扁平循环不仅是风格,更是协作式调度的前提——详见第六章。

    5.2.6 延迟报告模式——阻塞在数据操作里发现,在下一轮 isBlocked 报告

    多数算子并不在 isBlocked 里现场探测阻塞,而是在 addInput/getOutput 真正干活时就拿到了 future 与 reason,
    存进成员变量(blockingReason_ / future_),等下一轮 Driver 循环调用 isBlocked 时才报告出来
    CallbackSinkCallbackSink.cpp:20,48,在 addInput 里记录)与 LocalExchangeLocalPartition.cpp:286,275
    getOutput 里记录)是对称的两例。

    这不是绕路,而是与 Driver 循环顺序严丝合缝:第七章已述,循环里 isBlockedgetOutput/addInput
    之前调用,所以本轮产生的阻塞天然只能由下一轮的 isBlocked 暴露。把”发现”和”报告”拆成两个时点,是这套调度
    节拍下最自然的写法。详见第八章 8.3。

    5.2.7 future 组合语义必须匹配等待语义——collectAny vs collectAll

    当一个算子同时等多个 future 时,用哪种组合不是风格而是正确性

    • folly::collectAny任一就绪即唤醒。用于”任何一边推进都值得重试”——Exchange 同时等数据和等 split
      Exchange.cpp:171)。
    • folly::collectAll全部就绪才唤醒。用于”必须所有阻塞条件都解除”——LocalPartition 扇出到多个下游 queue,
      必须等所有满的下游都腾出空间(LocalPartition.cpp:633)。

    选错有真实后果:扇出 sink 若用 collectAny 会在仍满的下游上”唤醒→立刻又阻塞”空转;source 若用 collectAll
    会在数据已到但 split 未到时白等。详见第八章 8.4。

    5.3 涉及的多线程编程范式

    范式 体现 文件/行
    协作式调度(无栈协程) Driver 主动返回 StopReason 让出线程,现场压缩成几个标量后可重入 runInternal
    三种让出语义区分 Block(等外部事件,需 future)/ Yield(时间片,回队尾)/ Suspend(保留栈,原地等) 第八章 8.2
    Future/Promise 续延 阻塞→ContinueFuture,兑现时 thenValue 重新入队 BlockingState::setResume Driver.cpp:227
    窄接口统一异步等待 异构等待全部归一成 (BlockingReason, ContinueFuture) 二元组 Operator::isBlocked Operator.h:284
    唤醒句柄所有权 谁掌握”事件就绪”的知情权(queue/exchangeClient/JoinBridge/consumer)谁负责兑现 future 第八章 8.3
    future 组合语义 collectAny(任一就绪即重试)vs collectAll(全部解除才继续) Exchange.cpp:171 / LocalPartition.cpp:633
    状态机驱动的异步 isBlocked 推进算子内部状态机,跨状态挂起/恢复 HashProbe::isBlocked HashProbe.cpp:651
    同步屏障 join build 多线程汇合,最后一个 fulfill 其余 promise Task::allPeersFinished Task.h:542
    线程计数 + 等待信号 numThreads_ 归零时 fulfill threadFinishPromises_ allThreadsFinishedLocked Task.cpp:3532
    可重入挂起 numSuspensions 递归计数,保留调用栈让出线程占用 enterSuspended/leaveSuspended Task.cpp:3424
    快慢路径分离(lock elision) atomic flag 走无锁快路径 shouldStop Task.cpp:3505
    RAII 资源/信号管理 CancelGuard、makeGuard 兑现 promise 贯穿 Driver.cpp / Task.cpp
    锁内收集、锁外触发副作用 swap 到局部变量出锁再执行 Task::leaveTask::terminate
    协作式取消 terminateRequested_ 标志 + 轮询,并桥接 folly::CancellationToken Task::terminategetCancellationToken
    单写者所有权消除竞态 Driver unique_ptr 独占 Operator operators_ Driver.h:732

    一条贯穿全局的主线

    让”线程”成为可被任意复用的纯计算资源,让”Driver”成为可被反复挂起/恢复/取消的轻量状态机,并用所有权结构 +
    单一终止漏斗 + RAII,把并发正确性从”靠程序员小心”转化为”靠类型与结构保证”。

    StopReason 是这套协程调度的指令集,ThreadState 是协程的寄存器现场,Task::mutex_ 是唯一的串行化点,
    ContinueFuture/Promise 是协程之间的唤醒总线,而 isBlocked 返回的 (BlockingReason, ContinueFuture)
    二元组则是算子向调度器递交的统一”挂起申请单”——五花八门的外部等待经它归一,才让上面这套协程模型得以用一份代码服务所有算子。


    六、runInternal 的执行模型:不是 Volcano,也不是教科书式 push

    准确说法:这是一个由 Driver 集中调度的、单栈帧的、demand-gated(需求门控的)push 执行模型。
    第 2.5 / 5.2.5 节用了”主循环”的笼统措辞,本章给出精确语义。

    6.1 控制流:Driver 是唯一的调度器

    operators_ 是一个扁平数组,索引含义固定:

    1
    2
    3
    4
    operators_[0]    = source(TableScan / Exchange)   ← 最上游
    ...
    operators_[N-1] = sink(PartitionedOutput / CallbackSink) ← 最下游
    数据流向: 0 ──► 1 ──► 2 ──► ... ──► N-1

    关键事实:算子之间从不互相调用。 所有算子方法(isBlocked / needsInput / getOutput / addInput /
    isFinished / noMoreInput)全部由 Driver 在同一个栈帧里直接调用。算子只是一组被动回调,Driver 是中央调度器。

    与 Volcano 的本质区别:

    维度 Volcano(经典 pull) Velox Driver
    谁驱动执行 根算子 next() 递归调用孩子的 next() 单个 Driver 循环调用所有算子
    调用栈 深递归:sink→…→source,每批一层套一层 扁平:一个栈帧 + 对算子数组的循环
    算子间耦合 算子 A 直接调用算子 B 算子互不调用,全由 Driver 中转
    数据传递 孩子把行返回给父亲(返回值上溯) 生产者 getOutput() → Driver → 消费者 addInput()

    6.2 调度扫描方向:从 sink 往 source 找”谁能干活”

    getStartingOperator()Driver.cpp:1401)返回 operators_.size() - 1,即从 sink 开始

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    int32_t startingOperator = getStartingOperator();          // = N-1,即 sink
    for (;;) {
    for (int32_t i = startingOperator; i >= 0; --i) { // sink → source 递减
    auto* op = operators_[i].get();
    if (i < operators_.size() - 1) {
    Operator* nextOp = operators_[i + 1].get(); // i+1 是 i 的下游
    if (nextOp->needsInput()) { // ← 下游发出"需求信号"
    getOutput(op, intermediateResult); // 从上游 op 取一批
    if (intermediateResult) {
    addInput(nextOp, intermediateResult); // 推给下游
    i += 2; continue; // 回头看下游能否再往前推
    }
    }
    } else {
    // i == sink:串行模式产出结果 / 判断 isFinished
    }
    }
    }

    要分清两个方向

    • 调度扫描方向:下游 → 上游(找活干);
    • 数据流动方向:上游 → 下游(push)。

    i += 2 配合 for 的 --i,净效果是 i+1——回头去看刚收到数据的那个下游算子,能否把它的产出再往前推一步。
    于是数据一步步被 push 向 sink。数据是被 push 的,不是被 pull 上来的。

    6.3 demand-gated(需求门控)

    Driver 在 push 之前先问下游 nextOp->needsInput()。若下游说”不需要”(如 HashBuild 还在攒数据、下游缓冲已满),
    Driver 就不会从上游取数据。这是单条 Pipeline 内部的反压机制——push,但每一步都被下游需求信号闸住,
    属于 HyPer 一系的 morsel-driven / push-with-backpressure 思路。

    跨 Pipeline 的反压则通过 BlockingReason::kWaitForConsumer + ContinueFuture,让整个 Driver 下线等待。

    6.4 为什么”扁平循环 + Driver 调度”是协作式调度的前提

    深递归的 Volcano 调用栈无法被挂起/恢复;扁平循环可以。 这是把全文串起来的枢纽:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    扁平循环 + Driver 中央调度(无算子间递归)


    Driver 现场可压缩成几个标量(curOperatorId_ / blockedOperatorId_ /
    blockingReason_ / traceInput_),可在算子边界随时 return


    StopReason 协议(kBlock/kYield/kPause/kTerminate)得以成立


    协作式调度:M 个 Driver 复用 N 个线程,阻塞即让出

    若是 Volcano 的 sink.next() → … → source.next() 深递归,想”暂停在第 3 层算子并保存现场”几乎不可能(需冻结整个
    C++ 调用栈,这正是有栈协程要解决的难题)。而 Velox 的扁平循环里,return kBlock 直接退出,下次 runInternal
    重新进入 for 循环即可恢复。整个 Driver 因此变成一个无栈协程(stackless coroutine)。

    6.5 小结

    • Driver 是调度器,模型是 push,不是 Volcano 的顶层 next() 递归;
    • push 的同时有下游 needsInput() 的需求门控(单 Pipeline 内反压),是 demand-gated push 而非无脑灌;
    • 选择”Driver 集中调度 + 扁平循环”而非递归 pull,根本目的是让 Driver 可挂起——这是 Velox 整个协作式线程复用模型能成立的地基。

    七、runInternal 调度实例:三算子 pipeline 全过程

    用一条最经典的流水线 TableScan → FilterProject → PartitionedOutput 来逐步 trace
    runInternalDriver.cpp:519)的调度过程。

    1
    2
    3
    4
    operators_[0] = TableScan         (source,从 split 读数据)
    operators_[1] = FilterProject (中间算子,过滤+投影,逐批无状态)
    operators_[2] = PartitionedOutput (sink,写 OutputBuffer)
    N = 3,startingOperator = N-1 = 2

    7.0 算子接口语义

    方法 TableScan FilterProject PartitionedOutput
    isBlocked() 无 split 可读且 noMoreSplits 未到 → kWaitForSplit 不阻塞 OutputBuffer 满 → kWaitForConsumer
    needsInput() —(source 无上游) 缓存空时 true buffer 未满时 true
    getOutput() 读出一批;读完返回 null 处理完输入产出一批;无输入返回 null 并行模式写 buffer 后返回 null
    addInput() —— 接收一批待处理 接收一批待写出
    isFinished() noMoreSplits 且无数据 → true 收到 noMoreInput 且缓存空 → true noMoreInput 且全部写出 → true

    7.1 阶段一:冷启动——从 sink 往 source “找活干”

    Driver 刚上线,所有算子都还没有数据。内层循环 for (i = 2; i >= 0; --i)

    i = 2(sink, PartitionedOutput)i == N-1,走 else 分支

    1
    2
    3
    getOutput(sink) → null   // 还没人喂它
    isFinished()? → false
    continue → --i → i=1

    i = 1(FilterProject)i < N-1,走 if 分支

    1
    2
    3
    4
    5
    6
    nextOp = operators_[2] (sink)
    nextOp->isBlocked()? → 否
    nextOp->needsInput()? → true // sink 要数据
    getOutput(FilterProject) → null // FilterProject 还没收到输入
    → else 分支:isFinished(FilterProject)? → false
    → 既无数据也未结束,不 break,继续 --i → i=0

    i = 0(TableScan) → if 分支

    1
    2
    3
    4
    5
    6
    nextOp = operators_[1] (FilterProject)
    nextOp->needsInput()? → true
    getOutput(TableScan) → batch1 // 终于从 split 读出真实数据!
    intermediateResult 非空:
    addInput(FilterProject, batch1) // ← 数据被 push 给 FilterProject
    i += 2 → i=2; continue → --i → i=1

    关键点:冷启动时扫描从 sink 一路走到 source(i: 2→1→0),目的是**自下而上确认”谁此刻能产出数据”**。
    最终落到 TableScan 这个真正的数据来源,拿到第一批数据。

    7.2 阶段二:数据推进——i += 2 的精妙之处

    刚才在 i=0 喂完 FilterProject,i += 2 然后 for 的 --i,净效果 i = 1,回头去看刚收到数据的 FilterProject:

    i = 1(FilterProject)再次

    1
    2
    3
    4
    nextOp = sink, needsInput? → true
    getOutput(FilterProject) → batch1' // 它有输入了,过滤投影后产出
    addInput(sink, batch1') // ← 数据 push 给 sink
    i += 2 → i=3; continue → --i → i=2

    i = 2(sink)

    1
    2
    3
    getOutput(sink) → null   // PartitionedOutput 把 batch1' 写进 OutputBuffer,并行模式不返回数据
    isFinished()? → false
    continue → --i → i=1

    i += 2 的设计意图:一批数据刚喂给 operators_[i+1],下一步立刻回到 operators_[i+1] 看它能否把这批数据继续往下游推
    这样一批数据会被”连续地”沿 pipeline 推到底,而不是每次都从头扫描。数据流向始终是 0 → 1 → 2(push),而扫描指针在局部上下跳动。

    7.3 阶段三:稳态循环

    batch1 推到底后,i 回到 1,FilterProject 已无缓存数据:

    1
    2
    i=1: getOutput(FilterProject) → null → isFinished? false → --i → i=0
    i=0: getOutput(TableScan) → batch2 → addInput(FilterProject) → 推进...

    于是稳定在 i=1 ↔ i=0 ↔ i=2 之间往复,每轮从 TableScan 拉一批、过滤、写出。**注意每个算子调用前循环顶部都会执行
    task()->shouldStop()(检查终止/暂停/yield)和 op->isBlocked()**——这是协作式调度的轮询点。

    7.4 阶段四:阻塞场景——blockDriver 让出线程

    场景 A:OutputBuffer 满(下游消费跟不上)

    某轮 i=1,准备喂 sink 时:

    1
    2
    3
    4
    i=1: nextOp = sink
    nextOp->isBlocked(&future) → kWaitForConsumer // buffer 满了
    blockingReason_ != kNotBlocked
    return blockDriver(self, /*blockedOperatorId=*/i+1=2, future, ...)

    blockDriverDriver.cpp:669)构造 BlockingState(设 hasBlockingFuture=true),返回 kBlock
    Driver::runDriver.cpp:861)收到 kBlock → BlockingState::setResume(blockingState)driver 下线,线程去跑别的 Driver

    当下游消费了 buffer → future 兑现 → setResume 回调(Driver.cpp:227)→ Driver::enqueue 重新入队 →
    再次进入 runInternalstartingOperator=2 重新扫描

    场景 B:TableScan 等 split

    某轮 i=0,循环顶部:

    1
    2
    3
    i=0: op = TableScan
    op->isBlocked(&future) → kWaitForSplit // 无 split 且 noMoreSplits 未到
    return blockDriver(self, /*blockedOperatorId=*/0, future, ...)

    同样下线,等 Task::addSplit() 兑现 promise 后被唤醒。

    这两个就是第六章讲的两类反压:B 是上游缺数据,A 是下游消费慢,都通过同一套
    kBlock + ContinueFuture 机制把整个 Driver 挂起。

    7.5 阶段五:收尾——noMoreInput 链式传播 → close

    假设 noMoreSplits 已到,TableScan 读完最后一批。某轮 i=0:

    1
    2
    3
    4
    5
    6
    7
    i=0: nextOp=FilterProject, needsInput? true
    getOutput(TableScan) → null // 没数据了
    else 分支:
    isBlocked(TableScan)? → 否 // 不是阻塞,是真的结束
    isFinished(TableScan)? → true
    nextOp->noMoreInput() → FilterProject.noMoreInput() // 通知下游:上游收工
    break → 跳出内层 for,回到 for(;;),重新 i=2

    重新 i=2(sink)

    1
    getOutput(sink) → null; isFinished? false → --i → i=1

    i=1(FilterProject)

    1
    2
    3
    4
    5
    nextOp=sink, needsInput? true
    getOutput(FilterProject) → null // 收到 noMoreInput,已无缓存可 flush
    → else:isFinished(FilterProject)? → true // 它也结束了
    nextOp->noMoreInput() → PartitionedOutput.noMoreInput() // 继续向下游传播
    break → 回到 for(;;),i=2

    i=2(sink)

    1
    2
    3
    4
    getOutput(sink) → null     // PartitionedOutput flush 剩余、标记 buffer 结束
    isFinished(PartitionedOutput)? → true
    close(); // 关闭所有算子、上报统计、Task::removeDriver()
    return StopReason::kAtEnd; // driver 正常终结

    kAtEndDriver::run 直接 return(Driver.cpp:877),driver 彻底退出。close() 里的
    Task::removeDriver 会触发 checkIfFinishedLocked,若是最后一个 driver 且 output 已被消费,则
    Task::terminate(kFinished)

    结束信号的传播链很清晰:noMoreInput 沿 pipeline 自上游向下游逐级传递
    (TableScan→FilterProject→PartitionedOutput),每个算子先把自己的剩余缓存 flush 完(isFinished 返回 true
    前的 getOutput),再把”收工”信号交给下一级。每传一级就 break 重启外层循环,确保从 sink 重新评估全局状态。

    7.6 时间线汇总

    轮次 i 序列 发生的事
    冷启动 2→1→0 自下而上找到 TableScan,读出 batch1,喂 FilterProject
    推进 1→2 FilterProject 产出 batch1’,喂 sink,sink 写 buffer
    稳态 1→0→1→2… 反复拉取-过滤-写出
    阻塞 A 1 sink buffer 满 → kBlock 下线,等消费
    阻塞 B 0 TableScan 无 split → kBlock 下线,等 addSplit
    收尾 0(break)→2→1(break)→2 noMoreInput 逐级传播 → isFinished → close → kAtEnd

    一句话概括这套调度:指针从 sink 往 source 扫描以”定位能产出数据的最上游算子”,一旦取到数据就靠 i+=2
    把它连续推向 sink;每个算子边界都轮询 stop/block,需要等待时整条 Driver 作为无栈协程挂起,被唤醒后从 sink 重新扫描。


    八、isBlocked 与三种让出机制:结合典型算子解读

    8.1 isBlocked 的角色:声明阻塞意愿 + 交出唤醒句柄

    isBlocked 是 Velox 把”异步等待”嵌进协作式调度的核心接口。看签名(Operator.h:284):

    1
    2
    3
    /// 返回 kNotBlocked 表示可以继续;否则返回原因,并把 'future' 设为一个在
    /// 阻塞原因消失时兑现的 future。调用者必须等 future 完成才能再次调用。
    virtual BlockingReason isBlocked(ContinueFuture* future) = 0;

    要点:**isBlocked 不是 Driver 探测算子状态的工具,而是算子主动声明”我要等外部事件、请让出线程”并交出唤醒句柄的接口。**
    它干两件事:

    1. **返回 BlockingReason**(BlockingReason.h):语义标签,如 kWaitForSplitkWaitForConsumer
      kWaitForProducerkWaitForJoinBuildkWaitForArbitrationkWaitForRPC 等;
    2. **通过 future 出参交出”唤醒句柄”**:这个 ContinueFuture 是算子与外部世界之间的”门铃”,外部事件就绪时谁兑现它谁就负责唤醒 Driver。

    没有这个 future,isBlocked 就只是个布尔探测,Driver 让出后将永远不知道何时回来。正是 future 把”让出”和”恢复”两端接上。

    让出 CPU 的动作链(isBlocked 是起点和唤醒源,但”让出”由 Driver 的 return kBlock 完成):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    op->isBlocked(&future) → reason != kNotBlocked


    blockDriver(self, opId, future, ...) // Driver.cpp:669
    构造 BlockingState(driver, future, op, reason) // 持有 future + driver 的 shared_ptr
    return kBlock


    Driver::run 收到 kBlock → BlockingState::setResume // Driver.cpp:227,861
    future.via(executor).thenValue([state]{
    driver->state().hasBlockingFuture = false;
    Driver::enqueue(driver); // future 兑现 → 重新入队唤醒
    })
    ← runInternal 已 return,线程归还线程池去跑别的 Driver

    8.2 三种让出线程的方式

    让出方式 触发 是否需要唤醒句柄 恢复方式 语义
    BlockisBlocked 算子等外部事件 需要 future future 兑现 → enqueue 异步等待的核心
    YieldshouldYield CPU 时间片耗尽 不需要 立即重新 enqueue 到队尾 公平性,不是等待
    SuspendenterSuspended 等 IO / 内存仲裁,保留调用栈 不需要(同步等) leaveSuspended 原地恢复 让出线程占用但栈不丢

    只有 Block 是真正的”挂起-异步唤醒”——它把 Driver 退出栈、压缩成几个标量、靠 future 回调复活,这才是异步编程的关键。
    Yield 不是等待(马上回队列重排,防霸占线程);Suspend 保留 C++ 调用栈(不退出 runInternal),用于”必须原地等完”的同步阻塞。

    8.3 典型算子的 isBlocked 解读

    不同算子等待的”外部事件”不同,isBlocked 的写法因此分化出几种鲜明的模式。

    (1) CallbackSink —— sink 的下游反压(延迟报告模式)

    CallbackSinkCallbackSink.cpp:48)是 sink 算子,把数据交给 consumer 回调(下游 pipeline 的
    LocalExchangeQueueOutputBuffer):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void CallbackSink::addInput(RowVectorPtr input) {
    // 推给 consumer;若 consumer 满了,consumeCb_ 返回 future 并记录到成员
    blockingReason_ = consumeCb_(std::move(input), false, &future_);
    }

    BlockingReason CallbackSink::isBlocked(ContinueFuture* future) {
    ...
    if (blockingReason_ != BlockingReason::kNotBlocked) {
    *future = std::move(future_); // 把上次 addInput 记录的 future 报出来
    blockingReason_ = BlockingReason::kNotBlocked;
    return BlockingReason::kWaitForConsumer;
    }
    return BlockingReason::kNotBlocked;
    }

    模式:延迟报告。 真正发现”下游满了”是在 addInput 里(拿到 future_blockingReason_ 存进成员),但要等
    下一轮 Driver 循环调用 isBlocked 时才报告出来。原因见第七章:Driver 循环里 isBlockedgetOutput/addInput
    之前调用,所以这一轮产生的阻塞只能下一轮才暴露。kWaitForConsumer 对应第七章「场景 A:OutputBuffer 满」。

    (2) LocalExchange —— 本地交换 source(getOutput 探测 + isBlocked 报告)

    LocalExchangeLocalPartition.cpp:275)从本地 LocalExchangeQueue 拉数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    RowVectorPtr LocalExchange::getOutput() {
    ...
    blockingReason_ = queue_->next(&future_, pool(), &data, drained); // queue 空则记录 future
    if (blockingReason_ != BlockingReason::kNotBlocked) {
    return nullptr; // 没数据,先返回 null
    }
    ...
    }

    BlockingReason LocalExchange::isBlocked(ContinueFuture* future) {
    if (blockingReason_ != BlockingReason::kNotBlocked) {
    *future = std::move(future_); // 报告上次记录的 future
    auto reason = blockingReason_;
    blockingReason_ = BlockingReason::kNotBlocked;
    return reason; // 通常是 kWaitForProducer
    }
    return BlockingReason::kNotBlocked;
    }

    模式:getOutput 探测、isBlocked 报告——同样是延迟报告。 与 CallbackSink 对称:sink 在 addInput 探测下游,
    source 在 getOutput 探测上游 queue。等待的事件是”本地上游 producer 还没产出数据”(kWaitForProducer)。

    (3) Exchange —— 远程交换 source(isBlocked 里干实活 + collectAny)

    ExchangeExchange.cpp:142)从远程 worker 拉数据,是少数在 isBlocked 里直接干活的算子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    BlockingReason Exchange::isBlocked(ContinueFuture* future) {
    if (!currentPages_.empty() || atEnd_) return BlockingReason::kNotBlocked;

    if (!splitFuture_.valid()) getSplits(&splitFuture_); // 主动拉取 split

    ContinueFuture dataFuture;
    currentPages_ = exchangeClient_->next(driverId_, ..., &dataFuture); // 主动拉一批 page
    if (!currentPages_.empty() || atEnd_) return BlockingReason::kNotBlocked;

    if (splitFuture_.valid()) {
    // 同时在等数据 OR 等更多 split —— 任一就绪即唤醒
    std::vector<ContinueFuture> futures;
    futures.push_back(std::move(splitFuture_));
    futures.push_back(std::move(dataFuture));
    *future = folly::collectAny(futures).unit(); // ← collectAny
    return BlockingReason::kWaitForSplit;
    }
    *future = std::move(dataFuture);
    return BlockingReason::kWaitForProducer;
    }

    模式:isBlocked 即工作点 + 多 future 用 collectAny 合并。 它不只是检查,还触发异步预取。这里同时等两件事
    (数据到达、新 split 到达),用 folly::collectAny——任一就绪即唤醒,因为有任何一边推进都值得 Driver 重新尝试。

    (4) Merge / LocalMerge —— 多源归并(kWaitForProducer)

    MergeMerge.cpp:75)要从多个有序 source 做归并排序,必须每个 source 都有数据才能比较:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    BlockingReason Merge::isBlocked(ContinueFuture* future) {
    const auto reason = addMergeSources(future); // 先确保所有 source 就位
    if (reason != BlockingReason::kNotBlocked) return reason;
    ...
    if (sourceMerger_ != nullptr) {
    sourceMerger_->isBlocked(sourceBlockingFutures_); // 收集所有 source 的阻塞 future
    }
    if (sourceBlockingFutures_.empty()) return BlockingReason::kNotBlocked;

    *future = std::move(sourceBlockingFutures_.back()); // 任一 source 没数据就整体阻塞
    sourceBlockingFutures_.pop_back();
    return BlockingReason::kWaitForProducer;
    }

    模式:归并算子受限于最慢的 source。 任何一个参与归并的 source 没数据,整个归并都无法推进(否则破坏全局有序性),
    所以报 kWaitForProducer 等待该 source。

    (5) LocalPartition —— 多下游 sink(collectAll)

    LocalPartitionLocalPartition.cpp:630)把数据按分区扇出到 N 个下游 queue,与 Exchange 形成鲜明对比:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    BlockingReason LocalPartition::isBlocked(ContinueFuture* future) {
    if (!futures_.empty()) {
    auto blockingReason = blockingReasons_.front();
    *future = folly::collectAll(futures_.begin(), futures_.end()).unit(); // ← collectAll
    futures_.clear();
    blockingReasons_.clear();
    return blockingReason;
    }
    return BlockingReason::kNotBlocked;
    }

    模式:扇出 sink 用 collectAll 合并。 当多个下游 queue 满时,必须等所有满的下游都腾出空间才能继续推
    (否则又会立刻在某个满的 queue 上卡住),所以用 collectAll——全部就绪才唤醒

    (6) HashProbe —— join probe(状态机驱动 + kWaitForJoinBuild)

    HashProbeHashProbe.cpp:651)的 isBlocked 是一个内部状态机的推进器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    BlockingReason HashProbe::isBlocked(ContinueFuture* future) {
    switch (state_) {
    case ProbeOperatorState::kWaitForBuild: // 等 build 侧建好哈希表
    if (!future_.valid()) { setRunning(); asyncWaitForHashTable(); }
    break;
    case ProbeOperatorState::kWaitForPeers: // 等其他 probe 完成(spill 场景)
    if (!future_.valid()) setRunning();
    break;
    ...
    }
    if (future_.valid()) { *future = std::move(future_); return ...; } // kWaitForJoinBuild 等
    }

    模式:isBlocked 驱动算子状态机。 probe 在 build 完成前处于 kWaitForBuild,它通过
    asyncWaitForHashTable() 拿到一个挂在 HashJoinBridge 上的 future(与第一章 allPeersFinished 的屏障机制呼应),
    建好后才转入 kRunning

    8.4 两种 future 组合语义:collectAny vs collectAll

    上面 Exchange 与 LocalPartition 的对比揭示了一个值得记住的设计准则:

    组合 唤醒条件 适用场景 例子
    folly::collectAny 任一 future 就绪 等多个独立来源中任何一个推进就值得重试 Exchange:等数据 OR 等 split
    folly::collectAll 全部 future 就绪 必须所有阻塞条件都解除才能继续 LocalPartition:等所有满的下游腾空间

    选错会有真实后果:扇出 sink 若用 collectAny,会在某个仍满的下游上反复”唤醒→立刻又阻塞”空转;source 若用
    collectAll,会在数据已到但 split 未到时白白多等。

    8.5 设计模式总结

    算子 等待事件 BlockingReason future 来源 模式
    CallbackSink 下游 consumer 满 kWaitForConsumer consumeCb_ 回调 延迟报告(addInput 探测)
    LocalExchange 本地上游无数据 kWaitForProducer queue_->next 延迟报告(getOutput 探测)
    Exchange 远程数据/split kWaitForSplit/Producer exchangeClient_ isBlocked 干活 + collectAny
    Merge/LocalMerge 任一归并源无数据 kWaitForProducer 各 MergeSource 受限于最慢 source
    LocalPartition 所有满的下游 kWaitForConsumer 类 各下游 queue 扇出 + collectAll
    HashProbe build 完成/peer 完成 kWaitForJoinBuild 等 HashJoinBridge 状态机驱动

    贯穿这些算子的共同点:

    • 算子自报,Driver 不探测——isBlocked 由算子根据自身与外部资源的状态决定返回值;
    • future 是唯一的复活路径——谁持有”事件就绪”的知情权(queue、exchangeClient、JoinBridge、consumer),谁负责兑现 future;
    • 延迟报告是常态——多数算子在 addInput/getOutput 里就拿到了 future,存入成员,下一轮 isBlocked 才报告,这与第七章的 Driver 循环顺序(isBlocked 先于数据操作)严丝合缝;
    • future 组合语义要匹配语义——collectAny(任一推进即重试)vs collectAll(全部解除才继续)。

    这就是 isBlocked 作为”协作式异步关键接口”的全貌:它让每个算子用统一的 (BlockingReason, ContinueFuture) 二元组,把
    五花八门的外部等待(split、远程数据、本地 queue、join build、内存仲裁、RPC)翻译成 Driver 能统一处理的”挂起-唤醒”信号。


    九、为什么产出统一从 getOutput 发起

    观察 runInternalDriver.cpp)会发现:所有”产出新数据”的动作都从 getOutput() 发起,addInput() 只负责投喂、从不产出。

    1
    2
    3
    4
    5
    // 中间算子(Driver.cpp:668 附近)
    getOutput(op, intermediateResult); // 上游产出
    addInput(nextOp, intermediateResult); // 推给下游(下游只是接收)
    // sink(Driver.cpp:772 附近)
    getOutput(op, result);

    addInput 的契约是”收下、缓冲、更新内部状态”,计算与产出一律在 getOutput 里发生。这是刻意的设计决定。

    9.1 核心原因:解耦”消费输入速率”与”产生输出速率”

    若把计算放进 addInput(纯 push),addInput 就必须能”立即算出结果并推给下游”。但很多算子的输入批次与输出批次不是 1:1 的

    算子 输入 → 输出映射 计算若在 addInput 的问题
    FilterProject 1 输入 → 0 或 1 输出(可能全被过滤) 尚可
    Aggregation N 输入 → 攒到 noMoreInput 后才吐 M 个输出 addInput 时根本无输出可推
    Unnest 1 输入行 → 爆炸成很多输出行(可能多批吐出) addInput 得”产出多批并逐个 push”,控制流失控
    HashProbe 1 probe 批 → 0..N 个输出批(一行匹配多行) 一次输入要触发多次产出
    Limit N 输入 → 截断后提前结束 需在投喂中途叫停

    getOutput 把”产出”变成一个可被反复询问的动作:”你现在能给一批吗?能就给,给不出返回 null。” 于是:

    • **一次 addInput 可以对应 0..N 次 getOutput**——算子按自己的节奏产出。Aggregation 在 addInput 里只更新哈希表、
      getOutput 才吐结果;Unnest 可以连续多次 getOutput 把一行的多行逐批吐完;
    • 输入/输出的批大小、批数量彻底解耦,算子内部可自由缓冲、攒批、spill。

    纯 push 做不到这点——它强制输入输出对齐,遇到 N:M 映射就得在 addInput 里递归往下游推,无法在批次边界停下来。

    9.2 getOutput 返回 null 是干净的、可让出的信号

    getOutput 返回值有明确的双重语义(Operator.h:271):非空 = “这是一批输出”;**null** = “我现在产不出更多了
    (需要更多输入,或被外部阻塞,用 isBlocked 区分)”。这个 null 正是 Driver 的调度节拍点:拿到 null 就往上游走喂更多
    输入(第七章里 i 往 source 递减)。每次 getOutput 产出一批,就是一个天然的让出/背压/检查取消边界——Driver 在每批
    之间跑 shouldStop()/shouldYield()/isBlocked()。计算若埋在 addInput 内部循环里,这些边界就消失了。

    9.3 统一性:source / 中间 / 攒批算子,Driver 一视同仁

    getOutput 让 Driver 无需区分算子类型

    • TableScan(source)无上游,addInput 永不被调用——数据在 getOutput 里”凭空”从 split 读出;
    • FilterProject(中间)在 getOutput 里消费缓冲输入产出;
    • Aggregation(攒批)在 getOutput 里吐累积结果。

    对 Driver 而言它们完全一样:**”调 getOutput 看有没有货”**。循环里唯一的区分只是 source(i=0)没有 i-1、无需先
    addInput。这与 5.1.5 的 isBlocked 统一抽象是同一种品味——用统一窄接口(getOutput 产出 / addInput 投喂 /
    isBlocked 等待 / isFinished 结束)让一份调度代码驱动所有算子。

    9.4 与 demand-gated push 的关系:pull 触发,push 传递

    1
    2
    3
    4
    5
    6
    7
    下游 needsInput()=true       (需求信号,pull 味道)


    Driver 调上游 getOutput() (按需触发一次产出)


    Driver 调下游 addInput() (把产出 push 下去)

    “从 getOutput 发起” = 由下游需求触发的、按批产出。它既不是纯 pull(不是下游递归调上游 next()),也不是纯 push
    (不是上游来数据就无脑灌),而是 Driver 居中、以 getOutput 为产出节拍器的 demand-gated push(呼应第六章)。

    9.5 小结

    把”产出”统一收敛到 getOutput、让 addInput 只投喂,本质是为了:(1) 解耦输入/输出速率(支持 1:N、N:M、延迟产出、
    攒批),这是 push 模型做不到的;(2) 让 getOutput 返回 null 成为干净的让出/背压边界;(3) 用统一入口抹平
    source / 中间 / 攒批算子差异,让一份 Driver 代码驱动所有算子;(4) 与 demand-gated push 自洽:需求触发 getOutput
    getOutput 产出后 addInput 传递


    十、Task 的资源管理:terminate 流程与资源回收

    Task 是所有执行期资源的根持有者:memory pool 树、Driver、ExchangeClient、JoinBridge、OutputBuffer、spill 目录、
    各类 promise。本章梳理这些资源在 happy path 与异常 path 下如何被有序回收。

    10.1 Memory Pool 树:层级、所有权与”刻意保活”

    Task 构造时建立内存池树(Task.cpp:710 起):

    1
    2
    3
    4
    queryCtx_->pool()                                    (query 级,aggregate)
    └── pool_ = "task.<taskId>" (task 级,aggregate) initTaskPool()
    └── childPools_[k] = "node.<planNodeId>" (node 级,aggregate) getOrAddNodePool()
    └── childPools_[m] = "op.<id>.<pipe>.<driver>.<type>" (operator 级,leaf) addOperatorPool()

    所有权要点:

    • pool_Task.h:1205)是 task root,aggregate 类型(只聚合不直接分配);
    • childPools_Task.h:1209vector<shared_ptr<MemoryPool>>持有所有 node / operator / connector / exchange 池的所有权
    • nodePools_Task.h:1215map<id, MemoryPool*>)只存裸指针,用于按 plan node 复用 node 池;
    • Operator / connector 池都是 leaf 或 aggregate child,由 addOperatorPoolTask.cpp:781)等创建后 push_backchildPools_

    关键设计——刻意保活Task.h:1207 注释):

    “Keep plan node and operator memory pools alive for the duration of the task to allow for sharing vectors across drivers without copy.”

    即:算子内存池的生命周期不随 Driver 结束而结束,而是一直存活到 Task 析构。原因是 Velox 大量使用零拷贝向量共享
    (一个 Driver 产出的 RowVector 可能被另一个 Driver 引用),若算子池随 Driver 立即销毁,这些跨 Driver 共享的 buffer
    就会悬空。所以 Driver close 只释放算子的逻辑状态Operator::close),其底层 buffer 所在的内存池对象由
    childPools_ 兜住,直到 ~Task()

    10.2 Happy Path:自然完成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    某 Driver 的 runInternal() → sink isFinished() → kAtEnd


    Driver::close() // Driver.cpp:1063(仅允许在 driver 线程)
    closeOperators() // 逐个 Operator::close(),释放算子逻辑状态、上报 stats
    updateStats()
    closed_ = true
    Task::removeDriver(task, this) // Task.cpp:1440


    Task::removeDriver
    drivers_[i] = nullptr // ← 切断 Task→Driver 的 shared_ptr(解循环引用)
    driverClosedLocked() → ++numFinishedDrivers_
    splitGroupState.numRunningDrivers-- / numFinishedOutputDrivers++
    allFinished = checkIfFinishedLocked() // 所有 driver 完成 + output 已消费?

    ▼ (若 allFinished)
    Task::terminate(TaskState::kFinished) // 见 10.4 的统一清理

    checkIfFinishedLockedTask.cpp:2237)的判定:numFinishedDrivers_ == numTotalDrivers_(或 ungrouped 下输出
    pipeline 的 driver 全部完成), !hasPartitionedOutput() || partitionedOutputConsumed_(结果已被下游取走)。
    注意输出是否被消费由 OutputBuffersetAllOutputConsumed() 异步通知——所以最后一个 driver 完成时若 output 还没被
    取走,会先返回 false,待 setAllOutputConsumed 再触发 finish。

    10.3 异常 Path:错误 / 取消 / 中止

    三种非正常终结都走同一入口,最终汇聚到 terminate

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    算子抛异常 → runInternal catch → task()->setError(eptr)   // Driver.cpp:810
    用户取消 → Task::requestCancel() → terminate(kCanceled)
    外部错误 → Task::requestAbort() → terminate(kAborted)

    setError(Task.cpp:3304):
    加锁;若已非 running 或已有 exception_ 则直接返回(保证只第一个错误生效)
    exception_ = exception
    解锁
    terminate(TaskState::kFailed)
    onError_(exception_) // 回调通知应用层

    与 happy path 的关键差异:Driver 此刻可能正在线程上跑terminate 因此要分两类处理 Driver(见下)。

    10.4 terminate:统一清理漏斗(happy/error 共用)

    terminate(terminalState)Task.cpp:2502)是所有终结的汇聚点,分”锁内”与”锁外”两段:

    锁内(持 mutex_):

    1. 若已非 running,直接返回 makeFinishFutureLocked(幂等);
    2. state_ = terminalState;记录 terminationTimeMs;取消/中止时构造 exception_
    3. terminateRequested_ = true(on-thread 的 Driver 下次 shouldStop() 会看到);numRunningDrivers_ = 0
    4. 遍历 drivers_,对每个非空 driver 调 enterForTerminateLockedTask.cpp:3367):
      • 不在线程上 → 返回 kTerminate,driver 被移入 offThreadDriversdriverClosedLocked()
      • 在线程上 / 已 pause → 返回 kAlreadyOnThread / kPause不在这里关,留给 driver 自己的 leave 路径关;
    5. exchangeClients_barrierFinishPromises_ swap 到局部变量;barrierRequested_ = false

    锁外(避免持锁触发回调死锁,见 5.2.3):

    1. taskCompletionNotifier.notify() / stateChangeNotifier.notify()——兑现完成/状态变更 promise;
    2. offThreadDrivers 逐个 driver->closeByTask()Driver.cpp:1077,在”已 terminate + 仿在线程”语境下关闭算子);
    3. maybeRemoveFromOutputBufferManager()——删除 OutputBuffer(释放它对 Task 的反向引用);
    4. 关闭并清空 exchangeClients——停止远程取数、避免重发请求;
    5. 收集并清空所有 SplitsStore 的待发 split promise、处理剩余 remote split;
    6. splitGroupState.clear()——清 JoinBridge、LocalExchange、barriers;
    7. 兑现 splitPromises(唤醒等 split 的 driver,让它们感知终止)、bridge->cancel()、关闭 preloadingSplits_、兑现
      barrierPromises
    8. 返回 makeFinishFuture——调用方 await 该 future 即”所有线程已退出”。

    on-thread 的 Driver 怎么收尾? 它们不在 terminate 里关闭,而是:下次循环顶部 shouldStop() 看到
    terminateRequested_ → 返回 kTerminaterunInternal 退出 → CancelGuard 析构 → Task::leave 检测到
    kTerminate 且有 driverCb → 在 driver 线程上调 close()Task.cpp:3415)。这保证 Driver 的 close 永远在 driver
    自己的线程上执行
    ,避免与算子的 abort 竞态(见 3.3)。

    10.5 最终回收:~Task()

    当最后一个引用 Task 的 shared_ptr 释放(drivers_ 已清空解了循环引用、外部持有者也释放后),~Task()
    Task.cpp:452)执行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    removeSpillDirectoryIfExists();        // 删除 spill 目录(若曾创建)  Task.cpp:691
    // 按固定顺序逐项 clear,便于定位析构期崩溃:
    threadFinishPromises_.clear(); splitGroupStates_.clear(); taskStats_ = {};
    stateChangePromises_.clear(); taskCompletionPromises_.clear(); splitsStates_.clear();
    drivers_.clear(); // 此时通常已空
    driverFactories_.clear(); exchangeClientByPlanNode_.clear(); exchangeClients_.clear();
    nodePools_.clear(); // 先清裸指针 map
    childPools_.clear(); // ← 释放所有 node/operator 内存池(10.1 的刻意保活在此终结)
    pool_.reset(); // ← 最后释放 task root 池
    queryCtx_.reset();
    // 最后兑现 taskDeletionPromises_(taskDeletionFuture 的等待方在此被唤醒)

    内存池的销毁顺序很关键:必须先 childPools_.clear()(leaf/node 池)再 pool_.reset()(root),因为 aggregate
    父池要求子池先于自己销毁。removeFromTaskList()(从全局运行任务表摘除)通过 SCOPE_EXIT 保证最先注册、最后执行。

    10.6 资源回收时机汇总

    资源 释放时机 代码位置
    算子逻辑状态(哈希表、缓冲行等) Driver close 时 Operator::close() Driver::closeOperators Driver.cpp:898
    Task→Driver 的 shared_ptr(解循环引用) removeDriver(happy)/ terminate(error) Task.cpp:1464,2559
    OutputBuffer terminatemaybeRemoveFromOutputBufferManager Task.cpp:2590
    ExchangeClient terminate 关闭并清空 Task.cpp:2592
    JoinBridge / LocalExchange / barriers terminateSplitGroupState::clear() TaskStructs.h:248
    待发 split promise / preloading split terminate 兑现 / 关闭 Task.cpp:2675,2683
    算子 / node 内存池(buffer 所在) **~Task()childPools_.clear()**(刻意保活至此) Task.cpp:490
    task root 内存池 pool_ **~Task()pool_.reset()**(最后) Task.cpp:491
    spill 目录 ~Task()removeSpillDirectoryIfExists Task.cpp:470,691

    10.7 设计要点

    • terminate 是 happy / error 共用的单一漏斗(呼应 5.1.2)——清理逻辑只写一遍,状态码不同而已;
    • 内存池与 Driver 生命周期解耦——Driver close 只释放算子逻辑状态,内存池对象保活到 ~Task(),支撑跨 Driver 零拷贝向量共享;
    • on-thread Driver 自己在自己的线程上 close——terminate 只关 off-thread 的,避免 close/abort 竞态;
    • 回收顺序受所有权约束——子池先于父池、childPools_ 先于 pool_drivers_ 清空解循环引用后 Task 才可能析构;
    • 析构期带调试探针——~Task()clearStage 字符串逐步标记,便于定位 jemalloc 析构崩溃(源码 TODO 注释)。

    十一、Future/Promise 管理:避免 Driver 泄漏的设计与实现

    Task/Driver 里散布着十来组 promise/future。它们是协作式调度的”唤醒总线”,但也是最危险的地方——一个永不兑现的 future
    就能让一个 Driver(及它通过 DriverCtx 拽住的整个 Task)永久泄漏
    。本章解读这套机制的设计与背后的防泄漏思考。

    11.1 为什么 future/promise 在这里特别危险

    回顾第三章的引用关系:Task ↔ Driver 是循环引用,靠 drivers_[i] = nullptr 手动切断。但 Driver 的 shared_ptr
    **不只存在于 drivers_**,还藏在 future 的回调闭包里:

    • BlockingStateDriver.cpp:184,并行模式):成员 std::shared_ptr<Driver> driver_,并把整个 state
      捕获进 setResumethenValue 闭包(Driver.cpp:232);
    • DriverBlockingStateTask.cpp:3772,串行模式):闭包捕获 driverHolder = driver_->shared_from_this()
    • Executor 队列[driver]() { Driver::run(driver); }Driver.cpp:302)。

    也就是说,一个阻塞中的 Driver,其生命被它正在等待的那个 future 兜住。推出核心不变量:

    每个挂起 Driver 的 future 都必须有一条”终将兑现或出错”的路径。只要有一个 future 可能永不了结,对应的 Driver 和
    Task 就会泄漏。
    文档 TaskDriverOperatorLifecycle.md 末尾那句警告正是此意(见第四章)。

    这套设计的全部精力,就是保证这个不变量在所有路径(正常、阻塞、暂停、取消、错误、提前终止)下都成立。

    11.2 全景:Task/Driver 中的 promise/future

    promise 组 等待者 兑现者 兑现时机
    BlockingState / DriverBlockingState 阻塞的 Driver 外部事件(split/数据/buildbridge…) future 兑现 → 重新 enqueue
    SplitsStore::promises_ 等 split 的 Driver addSplit / noMoreSplits / terminate 来 split / 收工 / 终止
    BarrierState::allPeersFinishedPromises join build 非末位 Driver 末位 Driver(或 terminate) 屏障完成
    threadFinishPromises_ requestPause / terminate 调用方 allThreadsFinishedLocked numThreads_ 归零
    resumePromises_ pauseRequested 调用方 Task::resume 任务恢复
    taskCompletionPromises_ taskCompletionFuture 等待方 terminate 任务终结
    stateChangePromises_ stateChangeFuture 等待方 removeDriver / terminate 状态变更
    taskDeletionPromises_ taskDeletionFuture 等待方 ~Task() 任务析构
    barrierFinishPromises_ requestBarrier 调用方 末位 barrier driver / terminate barrier 完成

    注意每一行的”兑现者”列都包含一条 terminate / 析构兜底路径——这不是巧合,是 11.4 要讲的设计。

    11.3 陷阱一:持锁兑现 promise → 死锁

    promise.setValue()同步触发挂在 future 上的 continuation,而 continuation 往往要回头抢 Task::mutex_
    (例如 BlockingState::setResume 的回调里 std::lock_guard l(task->mutex())Driver.cpp:236)。若在持锁时
    setValue,就是”持锁 → 触发回调 → 回调抢同一把锁”的自死锁。

    对策是贯穿全代码的**”锁内收集、锁外兑现”**模式。两种写法:

    (a) swap 到局部变量,出锁再兑现allThreadsFinishedLockedTask.cpp:3532):

    1
    2
    3
    4
    5
    std::vector<ContinuePromise> Task::allThreadsFinishedLocked() {
    std::vector<ContinuePromise> threadFinishPromises;
    threadFinishPromises.swap(threadFinishPromises_); // 锁内只搬走
    return threadFinishPromises; // 调用方出锁后才 setValue
    }

    (b) EventCompletionNotifier——两段式 + 析构兜底Task.cpp:51):

    1
    2
    3
    4
    5
    6
    7
    8
    class EventCompletionNotifier {
    ~EventCompletionNotifier() { notify(); } // 析构兜底,幂等
    void activate(std::vector<ContinuePromise> promises, // 锁内:收集 promise + callback
    std::function<void()> callback = nullptr);
    void notify() { // 锁外:兑现 + 回调,只生效一次
    if (active_) { for (auto& p : promises_) p.setValue(); ... active_ = false; }
    }
    };

    用法(terminateTask.cpp:2545):锁内 taskCompletionNotifier.activate(std::move(taskCompletionPromises_), ...)
    出锁后 taskCompletionNotifier.notify()。即便中途异常提前 return,析构函数也会 notify(),且 active_ 保证不重复兑现。

    11.4 陷阱二:future 永不兑现 → Driver 泄漏 → terminate 作”总清算”

    这是最致命的陷阱。一个 Driver 阻塞在某个 future 上(等 split、等 consumer、等 join build),如果任务因错误/取消而中止,
    那些事件可能永远不会自然发生——split 不会再来、consumer 已死、build 永远完不成。若放任不管,这些 Driver 的
    BlockingState 闭包会永久持有 Driver→Task,泄漏。

    Task::terminate(第十章 10.4)因此承担”promise 总清算”职责,把所有可能让 Driver 永远 block 的 promise 全部兑现
    Task.cpp:2502 起):

    1
    2
    3
    4
    5
    // terminate 锁外段,逐一兑现/取消,确保没有 Driver 留在 block 态:
    for (auto& promise : splitPromises) promise.setValue(); // 唤醒等 split 的 driver
    for (auto& bridge : oldBridges) bridge->cancel(); // 取消 join,唤醒等 build 的 driver
    for (auto& barrierPromise : barrierPromises) barrierPromise.setValue();
    // SplitsStore::noMoreSplits() 把每个 store 的 promises_ 全部收集后兑现

    被唤醒的 Driver 重新上线后,循环顶部 shouldStop() 看到 terminateRequested_ → 返回 kTerminate → 干净退出 →
    close()drivers_[i]=nullptr,闭包随 future 兑现而析构,Driver 引用计数归零。**”唤醒”不是为了让它继续干活,而是为了
    让它有机会发现该退出、从而释放自己。** 这就是为什么 11.2 表格里每组 promise 都有 terminate 兜底。

    对应地,makeFinishFutureLockedTask.cpp:2710)在 numThreads_ == 0立即兑现而非挂起——没有线程在跑就
    没什么可等的,避免凭空制造一个永不兑现的 future。

    11.5 陷阱三:double-resume / 两个线程进同一个 Driver

    future 兑现是异步的,可能与 Task 的 pause/terminate 竞争。若处理不当,同一个 Driver 可能被两个线程同时 run
    BlockingState::setResumeDriver.cpp:227)有两道防线:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
    std::move(state->future_).via(&exec).thenValue([state](auto&&) {
    ...
    std::lock_guard<std::timed_mutex> l(task->mutex()); // ① 在锁内决策
    driver->state().hasBlockingFuture = false;
    if (task->pauseRequested()) {
    return; // ② pause 中则不 enqueue,留给 resume
    }
    Driver::enqueue(state->driver_);
    })
    .thenError(folly::tag_t<std::exception>{}, [state](std::exception const& e) {
    ... task->setError(...); // ③ future 出错也要兜底转成 task error
    });
    }

    三个要点:

    1. 在 Task 锁外注册 resumeDriver::run 里调 setResume,此时已不持锁)——注释(Driver.cpp:863)说明这样
      “若 future 已经兑现,也不会有第二个线程进入同一个 Driver”;
    2. pause 检查:若任务正在 pause,不重新 enqueue,把恢复权交给 Task::resumeTask.cpp:1204)——避免 pause
      期间 Driver 偷偷回到线程;
    3. thenError 兜底:folly future 若以异常兑现(包括 promise 被析构导致的 BrokenPromise),thenError 把它转成
      task->setError绝不让一个 future 静默地烂尾DriverBlockingState::setDriverFutureTask.cpp:3808)有完全
      对称的 thenError 分支。

    Driver::run 里还有一处微妙竞态处理(Driver.cpp:856):拿到 kBlock 后若发现 task->shouldStop()==kTerminate
    直接 return 而不进 resume 模式——否则会在已终止的任务上挂一个永不兑现的 future。

    11.6 可观测性:让泄漏”看得见”

    设计者清楚这类泄漏极难调试,于是埋了两个探针:

    • **numBlockedDrivers_**(Driver.cpp:205BlockingState 构造 ++、析构 --):进程级”当前有多少 Driver 处于
      block 态”的计数,通过 BlockingState::numBlockedDrivers() 暴露。泄漏的 Driver 会让这个数永不归零;
    • **driversClosedByTask_**(Task.h:1307vector<weak_ptr<Driver>>):记录被 Task 强行关闭的 Driver。注释
      Task.h:1303)直言其目的——“当 race/bug 导致这些 Driver 被永久持有、进而把 Task 变成僵尸时,用这个 vector 辅助
      调试僵尸 Task”。用 weak_ptr 是为了观测而不延长生命。

    11.7 设计哲学总结

    原则 实现
    每个 future 必有兑现路径 正常兑现 + terminate 总清算 + thenError 兜底,三重保证
    promise 兑现必在锁外 swap-to-local / EventCompletionNotifier 两段式
    兑现幂等、提前退出也兜底 EventCompletionNotifier 析构调 notify() + active_ 标志
    唤醒是为了让 Driver 发现该退出 terminate 唤醒所有等待者,它们上线即见 kTerminate 自行 close
    无线程可等就别造 future makeFinishFutureLockednumThreads_==0 时立即兑现
    防双线程进同一 Driver setResume 锁外注册 + pause 检查 + run 里的 terminate 竞态短路
    泄漏可观测 numBlockedDrivers_ 计数 + driversClosedByTask_ 僵尸探针

    一句话:Velox 不试图”小心翼翼地不泄漏”,而是建立一个强不变量——“任何挂起的 Driver 都被某个 future 兜住,而每个
    future 都有正常兑现、terminate 兜底、thenError 转错三条出路之一”——再让 terminate 这个单一漏斗在所有异常路径上强制兑现
    一切,从结构上消灭”future 永不兑现”的可能。
    这与第五章的”把并发正确性从靠程序员小心转化为靠结构保证”一脉相承。