1. 1. Velox Query In the Presto Worker
    1. 1.1. 零、Query 如何并行执行
      1. 1.1.1. 1. Stage 级:query 切成多个 plan fragment
      2. 1.1.2. 2. Task 级:每个 stage 分发到多个 worker
      3. 1.1.3. 3. Driver 级:每个 task 内的并行靠 driver
      4. 1.1.4. Splits 与 plan fragment 分开传输
      5. 1.1.5. 并行度小结
    2. 1.2. 零之二、Split 分发与消费机制
      1. 1.2.1. 全链路概览
      2. 1.2.2. 第 1 步:Worker 接收与转换
      3. 1.2.3. 第 2 步:addSplitWithSequence —— 为什么需要 sequence
      4. 1.2.4. 第 3 步:Velox 侧 split 队列结构
        1. 1.2.4.1. 为什么拆成 SplitsState / SplitsStore 两层
        2. 1.2.4.2. 非 grouped 执行(ungrouped)长什么样
      5. 1.2.5. 第 4 步:Source operator 领取 split
      6. 1.2.6. 第 5 步:noMoreSplits 的两个层级 + 延迟处理
      7. 1.2.7. 第 6 步:Split group(bucketed/grouped 执行)
      8. 1.2.8. Split 分发关键代码位置
    3. 1.3. 层级结构总览
    4. 1.4. 一、Worker 接收查询:HTTP → Velox Task
      1. 1.4.1. HTTP 端点(TaskResource)
      2. 1.4.2. Plan Fragment 转换(PrestoToVeloxQueryPlan)
      3. 1.4.3. Task 创建流程(TaskManager)
    5. 1.5. 二、Task 内部:Pipeline × Driver × Operator
      1. 1.5.1. Task 内结构
      2. 1.5.2. Driver 执行循环(Driver::runInternal)
      3. 1.5.3. 阻塞与调度
      4. 1.5.4. Pipeline 间同步:HashJoinBridge
    6. 1.6. 三、Task 内数据交换:LocalPartition / LocalExchange
      1. 1.6.1. 整体结构
      2. 1.6.2. LocalExchangeMemoryManager
      3. 1.6.3. LocalExchangeQueue
      4. 1.6.4. LocalPartition 算子(Producer,sink)
      5. 1.6.5. LocalExchange 算子(Consumer,source)
    7. 1.7. 四、Task 间数据交换:Remote Exchange
      1. 1.7.1. 整体数据流
      2. 1.7.2. 上游:PartitionedOutput
      3. 1.7.3. 上游:OutputBuffer / OutputBufferManager
      4. 1.7.4. 下游:ExchangeClient
      5. 1.7.5. 下游:ExchangeSource 抽象接口
      6. 1.7.6. 下游:PrestoExchangeSource(HTTP 实现)
      7. 1.7.7. 下游:Exchange 算子
      8. 1.7.8. 序列号机制(可靠传输)
      9. 1.7.9. 流控总结
    8. 1.8. 五、内存管理
    9. 1.9. 六、完整端到端数据流
    10. 1.10. 七、关键代码位置

Velox Query In the Presto Worker

VELOX · QUERYEXECUTION2024-07-30

Velox Query In the Presto Worker

2024-07-30#Velox #QueryExecution

零、Query 如何并行执行

一个 query 的并行度由三个层级叠加而成,从粗到细:

Stage 并行(query 切成多个 plan fragment)
  └── Task 并行(每个 stage 分发到多个 worker)
        └── Driver 并行(每个 task 内多个 driver)

1. Stage 级:query 切成多个 plan fragment

Coordinator 把 query 的执行计划按 shuffle 边界(exchange)切成多个 plan fragment,每个对应一个 stage:

Query
├── Fragment 0 (Stage 0): TableScan → partial agg → PartitionedOutput
├── Fragment 1 (Stage 1): RemoteSource → HashJoin → PartitionedOutput
└── Fragment 2 (Stage 2): RemoteSource → final agg → output
  • **Plan fragment 描述"怎么算"**:算子树结构,静态,task 创建时发一次
  • 上下游 stage 通过 PartitionedOutput(上游 sink)↔ RemoteSource/Exchange(下游 source)衔接

2. Task 级:每个 stage 分发到多个 worker

Coordinator 把每个 stage 的 plan fragment 分发到参与该 stage 的所有 worker,每个 worker 为该 fragment 创建一个 task:

Stage 0 (Fragment 0)            Stage 1 (Fragment 1)
├── Worker A → Task-0-A          ├── Worker A → Task-1-A
├── Worker B → Task-0-B          ├── Worker B → Task-1-B
└── Worker C → Task-0-C          └── Worker C → Task-1-C

唯一性是按 (worker, stage) 维度,不是按 (worker, query) 维度:

说法 对错
一个 query 每个 worker 只收到一个 plan fragment
一个 (worker, stage) 只对应一个 plan fragment → 一个 task
同一 stage 的同一 fragment 不会重复发给同一 worker
(worker, stage)  →  唯一一个 plan fragment  →  唯一一个 task
(worker, query)  →  可能多个 plan fragment  →  多个 task(每参与一个 stage 一个)

所以同一个 worker 上,同一个 query 通常同时运行多个 task(每个 stage 一个),它们彼此独立,用不同的内存池和 driver 线程。

3. Driver 级:每个 task 内的并行靠 driver

Task 内的并行度不靠 task 数量,靠 driver 数量

Worker A, Task-0-A
└── Pipeline 0
    ├── Driver 0 → 处理 split[0], split[3], split[6]...
    ├── Driver 1 → 处理 split[1], split[4], split[7]...
    └── Driver 2 → 处理 split[2], split[5], split[8]...
        (N = maxDrivers,通常 ≈ CPU 核数)

Coordinator 把属于 Worker A 的所有 splits 都增量塞进同一个 Task-0-A,task 内的多个 driver 从共享的 split queue 里抢着消费。

Splits 与 plan fragment 分开传输

Plan fragment 不包含 splits。两者通过同一个 POST /v1/task/{taskId} 的不同字段传输:

TaskUpdateRequest {
    fragment:  <Base64 PlanFragment>     ← 算子树,只在首次创建 task 时发
    sources: [                           ← splits,运行时可多次增量追加
        { planNodeId: "0",               ← 关联到 fragment 中的某个 source 节点
          splits: [split1, split2, ...],
          noMoreSplits: false }          ← 全部下发完后置 true
    ]
}
  • **Plan fragment 描述"怎么算"**:静态,发一次
  • **Splits 描述"算什么数据"**:动态,分批追加,可能 task 启动后才陆续到达
  • 在 Velox 侧:Task::create(planFragment) 一次性建树;addSplitWithSequence(planNodeId, split) 增量喂数据;noMoreSplits(planNodeId) 标记结束
  • Source operator(TableScan/Exchange)无 split 时返回 kWaitForSplit 阻塞,等待 coordinator 下发

并行度小结

维度 由谁决定 数量
Stage 数 query plan 的 shuffle 边界 通常 2~5
每 stage 的 task 数 coordinator 选中的 worker 数 = 参与该 stage 的 worker 数
每 worker 的 task 数 该 worker 参与的 stage 数 每参与一个 stage 一个
每 task 的 pipeline 数 plan 中 join 数 = HashJoin/CrossJoin 数 + 1
每 pipeline 的 driver 数 maxDrivers 配置 ≈ CPU 核数
每 driver 的线程数 固定 1(串行执行算子链)

零之二、Split 分发与消费机制

上面讲了 split 和 plan fragment 分开传输,这里展开完整链路:从 coordinator 下发,到 worker 入队,到 source operator 领取消费。

全链路概览

Coordinator (Java,不在本仓库) — 决定 split → worker 分配(节点亲和 / 负载均衡)POST /v1/task/{id} body.sources[]Worker (Presto C++)TaskResource::createOrUpdateTask()解析 TaskUpdateRequest.sources[]TaskManager::createOrUpdateTaskImpl()1. 按 planNodeId 合并同请求内的多个 source2. toVeloxSplit() 转换 ScheduledSplit → exec::Split3. execTask->addSplitWithSequence(nodeId, split, seqId)4. execTask->setMaxSplitSequenceId(nodeId, maxSeqId)5. noMoreSplits / delayedNoMoreSplitsPlanNodes_Velox Task — 按 (planNodeId, splitGroupId) 存入 SplitsStoreTableScan(driver).getSplitOrFuture(planNodeId)有 split → 读 · 无 → kWaitForSplit · 结束 → finished
Fig. Worker split 处理管线:从 Coordinator 的 POST 到 TableScan 取 split

边界说明:哪个 split 分给哪个 worker,是 coordinator(Java) 的调度决策,不在 C++ worker 代码里。Worker 侧只负责接收分配给自己的 split 并消费。

第 1 步:Worker 接收与转换

TaskResource::createOrUpdateTask()TaskUpdateRequest 解析出 sources[],每个元素(protocol::TaskSource)含:

struct TaskSource {
    PlanNodeId planNodeId;                 // 关联到 fragment 中的 source 节点
    vector<ScheduledSplit> splits;         // 本批 split
    bool noMoreSplits;                     // 该节点是否不再有 split
    vector<Lifespan> noMoreSplitsForLifespan;  // grouped 执行的 per-group 结束标记
};

TaskManager::createOrUpdateTaskImpl() 处理两阶段:

  1. 合并:同一请求里可能有多个相同 planNodeId 的 source,先按 planNodeId 合并 splitsnoMoreSplitsForLifespan,noMoreSplits 做 OR
  2. 转换toVeloxSplit(ScheduledSplit) 把 Presto 协议 split 转成 velox::exec::Split
    • RemoteSplitRemoteConnectorSplit(下游拉上游用)
    • EmptySplit → 空 split(占位,什么都不读)
    • 其他 → 委托给 connector 特定的 toVeloxSplit()(如 Hive split)
    • lifespan.groupidSplit.groupId(-1 表示 ungrouped)

第 2 步:addSplitWithSequence —— 为什么需要 sequence

Coordinator 可能重发同一批 split(网络重试、task update 重叠),所以用 sequence id 去重:

// Velox Task.cpp
bool Task::addSplitWithSequence(planNodeId, Split&& split, long sequenceId) {
    // 仅当 sequenceId > splitsState.maxSequenceId 才真正入队
    if (sequenceId > splitsState.maxSequenceId) {
        addSplitLocked(splitsState, std::move(split));
        return true;
    }
    return false;  // 重复 split,丢弃
}
  • maxSequenceId水位线:seqId ≤ 水位的 split 视为重复直接丢弃
  • 注意:addSplitWithSequence 本身不更新水位,而是由 setMaxSplitSequenceId(nodeId, maxSeqId) 在一批 split 加完后统一设置水位
  • 对比 addSplit():无 sequence 检查,无条件入队(用于不需要幂等投递的场景)

第 3 步:Velox 侧 split 队列结构

// 每个 plan node 一份状态
struct SplitsState {
    bool sourceIsTableScan{false};
    bool noMoreSplits{false};
    long maxSequenceId{LONG_MIN};                  // 去重水位
    // splitGroupId → 该 group 的 split 队列
    unordered_map<uint32_t, unique_ptr<SplitsStore>> groupSplitsStores;
};

// 单个 (planNodeId, splitGroupId) 的 split 队列
class SplitsStore {  // 实现 QueueSplitsStore
    deque<Split> splits_;                          // 待消费 split
    bool noMoreSplits_{false};
    vector<ContinuePromise> promises_;             // 等待 split 的 driver
};
  • splitsStates_map<PlanNodeId, SplitsState>,每个 source 节点一份
  • ungrouped 执行:每个节点只有一个 store(key = kUngroupedGroupId)
  • grouped 执行:每个 split group 一个 store

为什么拆成 SplitsState / SplitsStore 两层

这个两层嵌套不是随意设计,而是两个正交维度被拆开:

  • SplitsState = per plan node(每个 source 节点一份)
  • SplitsStore = per **(plan node, split group)**(每个 split group 一个队列)

有四个独立的设计力推动这个拆分:

1. 去重水位是 node 级,消费队列是 group 级——作用域不同。 maxSequenceId 去重水位必须是 node 级:coordinator 给每个 plan node 的 split 分配连续 sequence id,与 group 无关。而"排队 + 阻塞 + 消费"是 group 级的。两者作用域天然不同。

2. Bucketed 执行要求"一个节点 N 个独立队列"(最硬的需求)。 同一个 TableScan 节点产出的 split 带不同 groupId,每个 group 的 driver 组只能拉自己 group 的 split,且要独立阻塞——group 5 队列空了阻塞,不能影响 group 1 继续跑。所以每个 group 必须有自己的队列和自己的阻塞 promises,强制了 map<groupId, SplitsStore>

3. 阻塞/唤醒机制必须跟队列绑定。 SplitsStore::promises_(等待 split 的 driver)是 per-group 的——driver 阻塞的是"某个 group 队列空了",不是"整个节点没数据"。所以 promises 放在 store 里而非 state 里。这也是 SplitsState 显式 delete 拷贝构造的原因:它间接持有 promise,地址必须稳定(driver 持有指向它的引用)。

4. SplitsStore 是抽象类,还抽象了"split 从哪来"。 store 带虚函数(nextSplit / requestBarrier),注释原文:either can accumulate splits through addSplit() or generating splits from its own source。即 split 来源可替换:QueueSplitsStore 被动接收 coordinator push 的 split,或自生成 source。用类继承而非结构体字段来解耦"消费接口"与"来源实现"。

SplitsState SplitsStore
作用域 每个 plan node 每个 (node, split group)
装什么 去重水位、node 级 noMoreSplits、是否 TableScan split 队列、barrier split、阻塞 promises
生命周期 整个 task 随 split group 动态创建/清理
形态 不可拷贝 struct 带虚函数的抽象类

一句话SplitsState 管"节点整体元数据",SplitsStore 管"某 group 具体怎么排队和阻塞"。根本原因是 grouped 执行要求同一节点下多个 group 各自独立排队与阻塞。

非 grouped 执行(ungrouped)长什么样

绝大多数普通查询(非 bucketed)走的是 ungrouped 执行,此时这套结构退化成最简单的形态:

ungrouped 执行(所有 split.groupId == -1)

SplitsState (planNode "0")
├── maxSequenceId
├── noMoreSplits
└── groupSplitsStores
        └── [kUngroupedGroupId] → SplitsStore   ← 整个节点只有这一个 store
                                    ├── splits_ (一条队列)
                                    └── promises_

所有 driver(Driver 0..N)都从这唯一一个 store 抢 split
ungrouped 执行 grouped 执行
split.groupId -1 bucket id(0..N-1)
每节点 store 数 1(kUngroupedGroupId 每 bucket 一个
driver 创建时机 task start 时一次性建好 每个 group 到达时按需创建
split 消费 所有 driver 抢同一队列,谁空闲谁拿 每 group 的 driver 只拿本 group split
阻塞粒度 整个节点一个 promises 集合 每 group 独立阻塞
noMoreSplits 通知唯一的 store 通知该节点所有 group 的 store
适用场景 普通表扫描、shuffle 后的 join bucketed 表的 grouped/colocated join

关键点:ungrouped 不是"另一套代码路径",而是 grouped 设计的特例——map<groupId, store> 里只有一个固定 key kUngroupedGroupId = UINT32_MAX 的 entry。这样一套数据结构同时覆盖两种执行模式,无需为常规查询单独写一条路径。多个 driver 并行消费同一队列,靠的就是 SplitsStore 内部对队列和 promises 的加锁访问(实际锁在 Task::mutex_)。

第 4 步:Source operator 领取 split

TableScangetOutput() 中调用 Task::getSplitOrFuture() 领取 split:

BlockingReason Task::getSplitOrFuture(
    driverId, splitGroupId, planNodeId, maxPreloadSplits, preload,
    Split& split, ContinueFuture& future) {
    // 取 (planNodeId, splitGroupId) 对应的 SplitsStore
    // 调 store->nextSplit(...)
}

SplitsStore::nextSplit() 三种结果:

情况 返回
splits_ 非空 弹出 split,kNotBlocked
队列空但已 noMoreSplits_ 无 split,kNotBlocked(operator 据此 finish)
队列空且未结束 创建 ContinuePromise 存入 promises_,返回 kWaitForSplit + future

阻塞时 driver 退出线程池;当新 split 入队或 noMoreSplits 触发时,fulfill promises_ 唤醒 driver 重新调度。

第 5 步:noMoreSplits 的两个层级 + 延迟处理

两个层级

// 单个 group 结束(grouped 执行)
void Task::noMoreSplitsForGroup(planNodeId, splitGroupId);

// 整个 plan node 结束
void Task::noMoreSplits(planNodeId) {
    splitsState.noMoreSplits = true;
    // ungrouped: 通知唯一的 store
    // grouped:   通知该节点所有 group 的 store
    // 若所有 split group 都处理完 → terminate(kFinished)
}

延迟处理(delayedNoMoreSplitsPlanNodes_

noMoreSplits 信号可能在 task->start() 之前就到达。若此时直接调 Velox 的 noMoreSplits 会出问题(task 还没起来)。所以:

// TaskManager.cpp
if (prestoTask->taskStarted) {
    execTask->noMoreSplits(planNodeId);          // 已启动,直接调
} else {
    prestoTask->delayedNoMoreSplitsPlanNodes_.emplace(planNodeId);  // 暂存
}

// 之后 startTaskLocked() 中补调:
for (auto& nodeId : delayedNoMoreSplitsPlanNodes_) {
    execTask->noMoreSplits(nodeId);
}
delayedNoMoreSplitsPlanNodes_.clear();

否则 driver 会永久阻塞在 kWaitForSplit,等待一个永远不会来的 split。

第 6 步:Split group(bucketed/grouped 执行)

针对 bucketed 表的分组执行,split 带 groupId

// Task.cpp,split 带 groupId != -1 时
if (!seenSplitGroups_.contains(groupId)) {
    seenSplitGroups_.insert(groupId);
    queuedSplitGroups_.push(groupId);
    ensureSplitGroupsAreBeingProcessedLocked();   // 为新 group 创建 driver
}
addSplitToStore(planNodeId, groupId, split);
  • 每个 split group 独立创建一组 driver,concurrentSplitGroups 控制同时处理的 group 数
  • kUngroupedGroupId = UINT32_MAX 表示非分组执行
  • group 间通过 barrier 同步(如 grouped HashJoin 的 build/probe)

Split 分发关键代码位置

环节 文件
解析 sources[] presto_cpp/main/TaskResource.cpp createOrUpdateTask()
入队 + 去重 + noMoreSplits presto_cpp/main/TaskManager.cpp createOrUpdateTaskImpl()
协议 split 转换 presto_cpp/main/PrestoToVeloxSplit.cpp toVeloxSplit()
延迟标记 presto_cpp/main/PrestoTask.h delayedNoMoreSplitsPlanNodes_
addSplitWithSequence / 去重 velox/exec/Task.cpp
SplitsState / SplitsStore velox/exec/TaskStructs.h
getSplitOrFuture velox/exec/Task.cpp
noMoreSplits / ForGroup velox/exec/Task.cpp

层级结构总览

Query
└── Stage 0  (plan fragment 0: TableScan + partial agg)
│   ├── Worker A → Task-0-A
│   │   ├── Pipeline 0: Driver0, Driver1, ... DriverN
│   │   └── Pipeline 1: Driver0  (HashBuild side)
│   ├── Worker B → Task-0-B
│   └── Worker C → Task-0-C
│
└── Stage 1  (plan fragment 1: Exchange + HashJoin + PartitionedOutput)
    ├── Worker A → Task-1-A
    ├── Worker B → Task-1-B
    └── Worker C → Task-1-C

关键约束:

  • (worker, stage) → 唯一一个 task,同一 stage 在同一 worker 上不会有两个 task
  • 同一 query 的不同 stage 在同一 worker 上各有一个 task(可同时运行)
  • Task 内的并行度靠 Driver 数量,不靠 Task 数量
  • Coordinator 把属于某 worker 的所有 splits 都增量发给同一个 task,Driver 并行消费

一、Worker 接收查询:HTTP → Velox Task

HTTP 端点(TaskResource)

Coordinator 通过 REST API 驱动每个 Worker:

POST   /v1/task/{taskId}                              创建/更新 task(含 plan fragment + splits)
GET    /v1/task/{taskId}/results/{bufferId}/{token}   下游拉取输出数据
GET    /v1/task/{taskId}/status                       轮询 task 状态(支持 long-polling)
DELETE /v1/task/{taskId}                              终止 task
GET    /v1/task/{taskId}/results/.../acknowledge      ACK 已收数据
DELETE /v1/task/{taskId}/results/{bufferId}           释放 output buffer

Plan Fragment 转换(PrestoToVeloxQueryPlan)

TaskUpdateRequest 中的 fragment 字段是 Base64 编码的 JSON Plan:

Presto JSON PlanFragment
        ↓  VeloxInteractiveQueryPlanConverter
Velox PlanFragment(PlanNode 树)

关键 PlanNode 映射:

Presto PlanNode Velox PlanNode 说明
TableScanNode TableScanNode 读存储
RemoteSourceNode ExchangeNode 跨 worker 拉数据
PartitionedOutputNode PartitionedOutputNode 推数据给下游
AggregationNode AggregationNode 聚合
ExchangeNode LocalExchangeNode task 内重分区

Task 创建流程(TaskManager)

POST /v1/task/{taskId}
    ↓
findOrCreateTask()         → 创建 PrestoTask(状态 kPlanned,暂无 Velox Task)
    ↓
createOrUpdateTaskImpl()
    ├── exec::Task::create(planFragment, queryCtx, ExecutionMode::kParallel)
    ├── execTask->addSplitWithSequence()   ← 注册 splits
    └── startTaskLocked()
            └── execTask->start(maxDrivers, concurrentLifespans)

PrestoTask 是 Velox exec::Task 的包装,额外持有:

  • 待处理的 ResultRequest(下游长轮询的 HTTP 请求)
  • 状态变更 Promise(用于 long-polling)

二、Task 内部:Pipeline × Driver × Operator

Task 内结构

Task
├── Pipeline 0  (probe side: Exchange → HashProbe → Agg → PartitionedOutput)
│   ├── Driver 0  ── 一个线程,处理若干 splits
│   ├── Driver 1
│   └── Driver N   (N = maxDrivers,通常 ≈ CPU 核数)
│
└── Pipeline 1  (build side: Exchange → HashBuild)
    └── Driver 0   (build side 通常只需 1 个 driver)
  • Pipeline 数量由 plan 决定(每个 HashJoin/CrossJoin 会拆出 build/probe 两条 pipeline)
  • Driver 数量(每条 pipeline 内的并行度)由 maxDrivers 配置决定
  • 每个 Driver 独立运行,处理不同的 split 或不同分区的数据
  • 每个 Driver 内部单线程串行执行算子链

Driver 执行循环(Driver::runInternal

核心是从 sink 向 source 反向推进的 pull 模型:

loop:
    从 sink 向上游遍历每个 operator:
        if 下游.needsInput():
            output = 上游.getOutput()
            if output: 下游.addInput(output)
        if operator.isBlocked(&future):
            // 注册 future,退出线程池;future 完成后重新入队
            break

Operator 核心接口velox/exec/Operator.h):

RowVectorPtr getOutput();                            // 产生一批行(RowVector)
void         addInput(RowVectorPtr input);           // 消费上游输出
bool         needsInput();                           // 是否能接收更多输入
BlockingReason isBlocked(ContinueFuture* future);   // 是否阻塞,设置 future
bool         isFinished();                           // 是否全部完成

数据流示例(Pipeline 0)

TableScan.getOutput()        → RowVector(一批行)
    ↓ addInput
Filter.addInput/getOutput    → 过滤后的行
    ↓ addInput
HashProbe.addInput           → 关联 HashTable
HashProbe.getOutput()        → join 结果行
    ↓ addInput
PartitionedOutput.addInput   → 序列化,写入 OutputBuffer

阻塞与调度

Driver 运行于共享线程池(folly::CPUThreadPoolExecutor)。算子阻塞时(如等待 HashTable 建好、等待 Exchange 数据到来),Driver 持有 ContinueFuture 后退出线程;Future 完成后重新入队:

Driver → isBlocked() → 得到 future → 退出线程池
                                     ↓  future 完成
                            Driver::enqueue() → 重新调度

阻塞原因(BlockingReason)

原因 触发场景
kWaitForSplit TableScan 等待 coordinator 下发 split
kWaitForProducer Exchange/LocalExchange 等待上游数据
kWaitForConsumer PartitionedOutput OutputBuffer 已满
kWaitForJoinBuild HashProbe 等待 HashBuild 完成
kWaitForMemory 内存仲裁,等待 spill 释放内存

Pipeline 间同步:HashJoinBridge

Pipeline 1 (build): TableScan → HashBuild ──写入──► HashJoinBridge
                                                          │ build done
Pipeline 0 (probe): Exchange → HashProbe ◄──读取──────────┘

HashBuild 完成后通过 HashJoinBridge 通知 HashProbe 的所有 Driver 解除阻塞。


三、Task 内数据交换:LocalPartition / LocalExchange

适用场景:同一 task 内,不同 pipeline 之间需要重分区(如两阶段聚合、repartitioned join probe 侧重分区)。纯共享内存,无序列化,无网络。

"local" 的含义是 local to the task(同进程同 task),不是 local to the machine。

整体结构

Task 内部

Pipeline 0(上游,N 个 producer driver)
  Driver 0 → ... → LocalPartition ┐
  Driver 1 → ... → LocalPartition ┼─→ LocalExchangeQueue[0] ─→ LocalExchange → Driver 0
  Driver 2 → ... → LocalPartition ┘─→ LocalExchangeQueue[1] ─→ LocalExchange → Driver 1
                   (按 key 哈希分桶)    (按 partition 分桶)    (source 算子)

Pipeline 1(下游,M 个 consumer driver,M = numPartitions)
  • 每个 LocalExchangeQueue 对应一个分区,有且只有一个 consumer driver 消费
  • 所有 LocalPartition producer 共享同一个 LocalExchangeMemoryManager,统一控制总缓冲上限

LocalExchangeMemoryManager

职责:跨所有 LocalExchangeQueue 追踪总内存用量,在超限时阻塞 producer。

// velox/exec/LocalPartition.h
int64_t         maxBufferSize_;     // 总缓冲上限
int64_t         bufferedBytes_;     // 当前已用字节(原子)
std::vector<ContinuePromise> promises_;  // 被阻塞的 producer
  • **increaseMemoryUsage(added)**:producer 入队后调用。若 bufferedBytes_ >= maxBufferSize_,创建 ContinuePromise 存入 promises_,返回 true(producer 阻塞)
  • **decreaseMemoryUsage(removed)**:consumer 出队后调用。若降到 maxBufferSize_ 以下,取出所有 promises_ 并 fulfill,唤醒所有阻塞的 producer

LocalExchangeQueue

职责:单个分区的 FIFO 队列,协调多 producer 写入 / 单 consumer 读取。

folly::Synchronized<Queue> queue_;        // std::queue<pair<RowVectorPtr, int64_t>>
std::vector<ContinuePromise> consumerPromises_;  // consumer 等待数据
int pendingProducers_{0};                 // 尚未完成的 producer 数量
bool noMoreProducers_{false};             // 不再有新 producer
bool closed_{false};

入队(producer 调用)

BlockingReason enqueue(RowVectorPtr input, int64_t inputBytes, ContinueFuture* future) {
    // 1. 追加到 queue_
    // 2. 唤醒等待的 consumer(fulfil consumerPromises_)
    // 3. 调用 memoryManager_->increaseMemoryUsage()
    //    → 若超限,返回 kWaitForConsumer(producer 阻塞)
    //    → 否则返回 kNotBlocked
}

出队(consumer 调用)

BlockingReason next(ContinueFuture* future, memory::MemoryPool*, RowVectorPtr* data, bool& drained) {
    // 1. 队列为空:
    //    - 若所有 producer 已完成 → 返回 finished
    //    - 否则创建 consumerPromise → 返回 kWaitForProducer
    // 2. 有数据:
    //    - 弹出队头
    //    - 调用 memoryManager_->decreaseMemoryUsage() → 可能唤醒 producer
    //    - 返回 kNotBlocked
}

Producer 生命周期管理

方法 时机 作用
addProducer() producer 构造时 pendingProducers_++
noMoreData() producer 处理完所有输入后 pendingProducers_--;若归零且 noMoreProducers_ 则唤醒 consumer
noMoreProducers() task 确认不再添加新 pipeline 后 设标志;若 pendingProducers_==0 则立即唤醒 consumer
drain() barrier 处理(grouped execution) drainedProducers_++;全部 drain 后唤醒 consumer

LocalPartition 算子(Producer,sink)

构造:从 task 获取所有分区队列,并对每个队列调用 addProducer()

addInput() 分区逻辑

1. partitionFunction_->partition(*input, partitions_)
      → partitions_[i] = 第 i 行应去的分区号

2. 若所有行去同一个分区:直接调用 queue[p]->enqueue(input)

3. 若行分散到多个分区:
   a. 统计每分区行数,构建索引数组 rawIndices_[partition]
   b. 根据 singlePartitionBufferSize_ 判断用哪种策略:
      - Buffer 模式(numPartitions 较多):
          populatePartitionBuffer() 把行 copy 进 partitionBuffers_[p]
          累积到 singlePartitionBufferSize_ * numPartitions 时统一 flush
      - 即时模式(numPartitions 少或 eagerFlush):
          wrapChildren() 用 dictionary 索引包装列,零拷贝创建 RowVector
          立即 enqueue

wrapChildren():用 dictionary 向量包装输入列,指向原始数据,零拷贝。适合分区数少、批次大的场景。

**copy() / populatePartitionBuffer()**:把行物理复制进 partitionBuffers_[p],适合分区数多(字典开销大)的场景。

阻塞:若任意队列 enqueue() 返回 kWaitForConsumer,收集对应 future,在 isBlocked() 时返回,driver 退出线程池。

**noMoreInput()**:flush 剩余 buffer,对所有队列调用 noMoreData(),通知 consumer 不再有数据。

LocalExchange 算子(Consumer,source)

构造:从 task 获取自己负责的分区队列(partition_ = driverId mod numPartitions)。

// getOutput()
BlockingReason reason = queue_->next(&future_, pool(), &data, drained);
if (reason != kNotBlocked) {
    blockingReason_ = reason;
    return nullptr;    // driver 将在 isBlocked() 中返回 future_
}
return data;

isBlocked() 直接返回 blockingReason_future_,无额外逻辑。


四、Task 间数据交换:Remote Exchange

适用场景:不同 task 之间(跨 stage),始终走 HTTP,即使两个 task 在同一台物理机上也不例外。

整体数据流

Worker A — 上游 taskPartitionedOutput ├─ partitionFunction_ ├─ Destination[0..N] │ └─ VectorStreamGroup └─ OutputBufferManager └─ OutputBuffer └─ DestinationBuffer [partition 0..N]Worker B — 下游 taskExchangeNode (SourceOperator) └─ ExchangeClient ├─ ExchangeQueue │ └─ SerializedPage deque └─ ExchangeSource[0..M] └─ PrestoExchangeSource (HTTP client)HTTP GET(B 拉 A)
Fig. 上下游 Exchange:下游 ExchangeSource 通过 HTTP GET 拉上游 PartitionedOutput 的分区数据

上游:PartitionedOutput

addInput() 分区流程

1. 估算每行序列化大小(用于 flush 时机判断)
2. partitionFunction_->partition(*input, partitions_)
3. 将每行的索引分配到 Destination[p].rows_

getOutput() 序列化 + flush 流程

for each Destination:
    advance():
        收集 rows_ 中的行索引
        用 VectorStreamGroup 序列化(Presto / CompactRow / UnsafeRow 格式)
        累积到 maxPageSize 或行数阈值时 flush()

flush():
    VectorStreamGroup → IOBufOutputStream → IOBuf chain(Presto Page 格式)
    bufferManager_.enqueue(taskId, partition, page)
        → OutputBuffer::enqueue()
        → DestinationBuffer[p].data_.push_back(page)
        → 若 bufferedBytes_ > maxSize_ → 返回 blocking future(producer 阻塞)

随机 flush 阈值targetSizePct_ = 70~120%):各 Driver 的 flush 时机错开,避免同时写入造成突发压力。

上游:OutputBuffer / OutputBufferManager

// velox/exec/OutputBuffer.h
OutputBuffer {
    std::vector<DestinationBuffer> buffers_;  // 每个下游分区一个
    int64_t  maxSize_;                        // 总缓冲上限
    int64_t  continueSize_;                   // 恢复阈值(= maxSize_ * 0.9)
    int64_t  bufferedBytes_;                  // 当前已缓冲字节
    std::vector<ContinuePromise> promises_;   // 被阻塞的 producer
}

DestinationBuffer {
    std::deque<SerializedPageBase> data_;     // 待下游拉取的 pages
    int64_t sequence_;                        // data_[0] 对应的全局序列号
    std::function<...> notify_;              // 有数据时唤醒等待的 HTTP 请求
}

背压bufferedBytes_ > maxSize_ 时,enqueue() 返回 ContinueFuture,producer(PartitionedOutput)阻塞。当下游 ACK 消费数据后,bufferedBytes_ 降到 continueSize_ 以下时 fulfill promises,解除阻塞。

getData() 回调机制:下游 HTTP GET 到来时若无数据,注册 notify_ 回调;有数据入队时回调触发,立即响应 HTTP 请求。

下游:ExchangeClient

同一 pipeline 的所有 Driver 共享一个 ExchangeClient,但各自独立从 ExchangeQueue 消费(多消费者)。

ExchangeClient {
    ExchangeQueue queue_;                    // 多 source 入队,多 driver 消费
    std::vector<ExchangeSource> sources_;   // 每个上游 task 一个
    int64_t maxQueuedBytes_;               // 本地缓冲上限(默认 32MB)
    int64_t minOutputBatchBytes_;          // 最小返回批次(防止消费者空跑)
    std::deque<ProducingSource> producingSources_;  // 有剩余数据的 source
    std::deque<ExchangeSource*>  emptySources_;     // 尚未探测数据量的 source
}

**addRemoteTaskId()**:为每个上游 task 创建一个 ExchangeSource,加入 emptySources_ 队列,启动首次数据量探测请求。

**next(consumerId, maxBytes)**(由 ExchangeNode.isBlocked() 调用):

1. queue_->dequeueLocked(maxBytes) → 若有足够数据立即返回
2. 若数据不足:pickSourcesToRequestLocked()
   a. 对 emptySources_ 中的 source 发 requestDataSizes()(HEAD 请求,探测剩余量)
   b. 按剩余容量(maxQueuedBytes_ - queue_.totalBytes() - totalPendingBytes_)
      分配请求量给 producingSources_
   c. 特殊处理:若单个 page 超过容量上限,强制发一次请求防止死锁

**ExchangeQueue**:

ExchangeQueue {
    std::deque<SerializedPageBase> queue_;
    int64_t totalBytes_;
    int64_t minOutputBatchBytes_;        // 消费者唤醒阈值
    std::map<int, ContinuePromise> promises_;  // consumerId → promise
}

**enqueueLocked()**(ExchangeSource 收到数据后调用):

  • nullptr 入队表示某个 source 数据结束
  • 计算 unassigned bytes,若 >= minOutputBatchBytes_ 则 fulfill 等待中的消费者 promise
  • minOutputBatchBytes_ 自适应:min(配置值, 已接收总量 / 100),防止小流量场景下消费者频繁空唤醒

**dequeueLocked(maxBytes)**:取出页面直到达到 maxBytes 或队列耗尽;数据不足时返回 ContinueFuture(消费者 driver 阻塞)。

下游:ExchangeSource 抽象接口

// velox/exec/ExchangeSource.h
ExchangeSource {
    std::string remoteTaskId_;
    int         destination_;       // 拉取上游哪个分区
    ExchangeQueue* queue_;
    int64_t     sequence_;          // 下次请求的序列号(可靠传输)
    bool        requestPending_;    // 防止并发发起重复请求
    bool        atEnd_;
}

virtual Response request(uint32_t maxBytes, uint32_t maxWaitMs);
virtual void requestDataSizes(uint32_t maxWaitMs);
virtual void pause();    // 提示暂停(队列满时)
virtual void close();

Response 含:

  • bytes:本次响应数据量
  • atEnd:source 是否已全部传完
  • remainingBytes:上游 DestinationBuffer 中各 page 的剩余大小列表(用于 ExchangeClient 调度决策)

下游:PrestoExchangeSource(HTTP 实现)

shouldRequestLocked() → true(无 pending 请求且未 atEnd)
    ↓
request(maxBytes, maxWaitMs)
    ↓
doRequest(sequence_, maxBytes, maxWaitMs)
    HTTP GET /v1/task/{remoteTaskId}/results/{destination}/{sequence_}
    Headers: X-Presto-Max-Size: maxBytes
             X-Presto-Max-Wait: maxWaitMs
    ↓
processDataResponse(response)
    读 response headers:
        X-Presto-Page-Next-Token        → 更新 sequence_(下次请求用)
        X-Presto-Buffer-Complete        → 若 true,入队 nullptr(end marker)
        X-Presto-Buffer-Remaining-Bytes → remainingBytes(返回给 ExchangeClient)
    IOBuf chain → SerializedPage → queue_->enqueueLocked()
    fulfill 请求 promise → ExchangeClient 调度下一批请求

重试:网络失败时指数退避(100ms → 10s,带 jitter),超过 exchangeMaxErrorDurationqueue_->setError(),task 报错。

下游:Exchange 算子

// isBlocked():
// 1. 若 currentPages_ 有数据 → kNotBlocked
// 2. getSplits() 获取上游 task ID → 调用 client_->addRemoteTaskId()
// 3. client_->next() → 若无数据返回 kWaitForProducer + future

// getOutput():
// 根据 serdeKind_ 分发:
//   columnar(Presto): getOutputFromColumnarPages()
//   row(CompactRow/UnsafeRow): getOutputFromRowPages()

**getOutputFromColumnarPages()**:用 VectorSerde::deserialize() 增量反序列化,每次读满 preferredOutputBatchBytes_ 后返回,跨 currentPages_ 中多个 page 累积。

**getOutputFromRowPages()**:先 mergePages() 把多个 page 合并成连续 IOBuf,再用 row iterator 批量反序列化。

序列号机制(可靠传输)

ExchangeSource.sequence_  →  HTTP GET .../results/{partition}/{sequence}
                              ↓
上游 DestinationBuffer[partition].sequence_  →  data_[0] 的全局编号
(验证请求序列号是否与当前可用数据对齐)
                              ↓
响应 header X-Presto-Page-Next-Token  →  下次请求的 sequence
(ExchangeSource 更新 sequence_ 用于下次请求)

ACKGET .../results/{partition}/{sequence}/acknowledgeOutputBuffer::acknowledge() → 删除 sequence 之前的所有 pages,释放内存。

流控总结

层级 机制 上限
OutputBuffer(上游) bufferedBytes_ > maxSize_ 时阻塞 producer task.maxOutputBufferSize(默认 32MB)
ExchangeQueue(下游本地缓存) totalBytes_ 控制发起的请求量 maxQueuedBytes_(默认 32MB)
ExchangeClient 请求调度 maxQueuedBytes_ - totalBytes_ - pendingBytes = 可发请求量 动态计算
消费者唤醒阈值 minOutputBatchBytes_ 防止小批次频繁唤醒 自适应(已收量 / 100)

五、内存管理

内存池形成层级树,每层独立追踪和回收:

QueryCtx::MemoryPool(query 级,所有 task 共享上限)
└── Task::MemoryPool(task 级)
    ├── Operator pools(每个 operator 实例独立池)
    ├── ExchangeClient pool(每条 pipeline 一个)
    └── LocalExchange pool
  • Task::MemoryReclaimer 在内存压力下触发 spill(HashAggregation、HashJoin 等支持 spill 的算子)
  • Operator 通过 addOperatorPool(planNodeId, splitGroupId, pipelineId, driverId) 在构造时申请子池

六、完整端到端数据流

Coordinator
    │
    ├── POST /v1/task/A0  →  Worker A  (Stage 0: TableScan → HashBuild → PartitionedOutput)
    │   splits: [file0, file1, file2, ...]
    │
    ├── POST /v1/task/B1  →  Worker B  (Stage 1: Exchange → HashProbe → Agg → PartitionedOutput)
    │
    └── POST /v1/task/C2  →  Worker C  (Stage 2: Exchange → FinalAgg → PartitionedOutput)

执行时序:
Worker A: TableScan(splits) → HashBuild → PartitionedOutput → OutputBuffer[0..N]
                                                                      ↑ HTTP GET
Worker B: PrestoExchangeSource ────────────────────────────────────────┘
          ExchangeNode → HashProbe → Agg → PartitionedOutput → OutputBuffer
                                                                      ↑ HTTP GET
Worker C: PrestoExchangeSource ────────────────────────────────────────┘
          ExchangeNode → FinalAgg → PartitionedOutput → OutputBuffer
                                                              ↑ HTTP GET (coordinator 拉结果)
Coordinator: GET /v1/task/C2/results/0/0

七、关键代码位置

组件 文件
HTTP 端点 presto_cpp/main/TaskResource.cpp
Task 生命周期管理 presto_cpp/main/TaskManager.cpp
Plan 转换 presto_cpp/main/PrestoToVeloxQueryPlan.cpp
Velox Task velox/exec/Task.h, velox/exec/Task.cpp
Driver 执行循环 velox/exec/Driver.cpprunInternal()
Operator 接口 velox/exec/Operator.h
LocalPartition/LocalExchange velox/exec/LocalPartition.h, velox/exec/LocalPartition.cpp
Remote Exchange 算子 velox/exec/Exchange.h, velox/exec/Exchange.cpp
ExchangeClient + ExchangeQueue velox/exec/ExchangeClient.h/cpp, velox/exec/ExchangeQueue.h/cpp
ExchangeSource 抽象接口 velox/exec/ExchangeSource.h
HTTP 数据拉取实现 presto_cpp/main/PrestoExchangeSource.h/cpp
PartitionedOutput velox/exec/PartitionedOutput.h, velox/exec/PartitionedOutput.cpp
OutputBuffer / Manager velox/exec/OutputBuffer.h/cpp, velox/exec/OutputBufferManager.h/cpp
Velox Query In the Presto Worker
#Velox #QueryExecution · 2024-07-30