1. 1. 零、Query 如何并行执行
    1. 1.1. 1. Stage 级:query 切成多个 plan fragment
    2. 1.2. 2. Task 级:每个 stage 分发到多个 worker
    3. 1.3. 3. Driver 级:每个 task 内的并行靠 driver
    4. 1.4. Splits 与 plan fragment 分开传输
    5. 1.5. 并行度小结
  2. 2. 零之二、Split 分发与消费机制
    1. 2.1. 全链路概览
    2. 2.2. 第 1 步:Worker 接收与转换
    3. 2.3. 第 2 步:addSplitWithSequence —— 为什么需要 sequence
    4. 2.4. 第 3 步:Velox 侧 split 队列结构
      1. 2.4.1. 为什么拆成 SplitsState / SplitsStore 两层
      2. 2.4.2. 非 grouped 执行(ungrouped)长什么样
    5. 2.5. 第 4 步:Source operator 领取 split
    6. 2.6. 第 5 步:noMoreSplits 的两个层级 + 延迟处理
    7. 2.7. 第 6 步:Split group(bucketed/grouped 执行)
    8. 2.8. Split 分发关键代码位置
  3. 3. 层级结构总览
  4. 4. 一、Worker 接收查询:HTTP → Velox Task
    1. 4.1. HTTP 端点(TaskResource)
    2. 4.2. Plan Fragment 转换(PrestoToVeloxQueryPlan)
    3. 4.3. Task 创建流程(TaskManager)
  5. 5. 二、Task 内部:Pipeline × Driver × Operator
    1. 5.1. Task 内结构
    2. 5.2. Driver 执行循环(Driver::runInternal)
    3. 5.3. 阻塞与调度
    4. 5.4. Pipeline 间同步:HashJoinBridge
  6. 6. 三、Task 内数据交换:LocalPartition / LocalExchange
    1. 6.1. 整体结构
    2. 6.2. LocalExchangeMemoryManager
    3. 6.3. LocalExchangeQueue
    4. 6.4. LocalPartition 算子(Producer,sink)
    5. 6.5. LocalExchange 算子(Consumer,source)
  7. 7. 四、Task 间数据交换:Remote Exchange
    1. 7.1. 整体数据流
    2. 7.2. 上游:PartitionedOutput
    3. 7.3. 上游:OutputBuffer / OutputBufferManager
    4. 7.4. 下游:ExchangeClient
    5. 7.5. 下游:ExchangeSource 抽象接口
    6. 7.6. 下游:PrestoExchangeSource(HTTP 实现)
    7. 7.7. 下游:Exchange 算子
    8. 7.8. 序列号机制(可靠传输)
    9. 7.9. 流控总结
  8. 8. 五、内存管理
  9. 9. 六、完整端到端数据流
  10. 10. 七、关键代码位置

Velox Query In the Presto Worker

零、Query 如何并行执行

一个 query 的并行度由三个层级叠加而成,从粗到细:

1
2
3
Stage 并行(query 切成多个 plan fragment)
└── Task 并行(每个 stage 分发到多个 worker)
└── Driver 并行(每个 task 内多个 driver)

1. Stage 级:query 切成多个 plan fragment

Coordinator 把 query 的执行计划按 shuffle 边界(exchange)切成多个 plan fragment,每个对应一个 stage:

1
2
3
4
Query
├── Fragment 0 (Stage 0): TableScan → partial agg → PartitionedOutput
├── Fragment 1 (Stage 1): RemoteSource → HashJoin → PartitionedOutput
└── Fragment 2 (Stage 2): RemoteSource → final agg → output
  • **Plan fragment 描述”怎么算”**:算子树结构,静态,task 创建时发一次
  • 上下游 stage 通过 PartitionedOutput(上游 sink)↔ RemoteSource/Exchange(下游 source)衔接

2. Task 级:每个 stage 分发到多个 worker

Coordinator 把每个 stage 的 plan fragment 分发到参与该 stage 的所有 worker,每个 worker 为该 fragment 创建一个 task:

1
2
3
4
Stage 0 (Fragment 0)            Stage 1 (Fragment 1)
├── Worker A → Task-0-A ├── Worker A → Task-1-A
├── Worker B → Task-0-B ├── Worker B → Task-1-B
└── Worker C → Task-0-C └── Worker C → Task-1-C

唯一性是按 (worker, stage) 维度,不是按 (worker, query) 维度:

说法 对错
一个 query 每个 worker 只收到一个 plan fragment
一个 (worker, stage) 只对应一个 plan fragment → 一个 task
同一 stage 的同一 fragment 不会重复发给同一 worker
1
2
(worker, stage)  →  唯一一个 plan fragment  →  唯一一个 task
(worker, query) → 可能多个 plan fragment → 多个 task(每参与一个 stage 一个)

所以同一个 worker 上,同一个 query 通常同时运行多个 task(每个 stage 一个),它们彼此独立,用不同的内存池和 driver 线程。

3. Driver 级:每个 task 内的并行靠 driver

Task 内的并行度不靠 task 数量,靠 driver 数量

1
2
3
4
5
6
Worker A, Task-0-A
└── Pipeline 0
├── Driver 0 → 处理 split[0], split[3], split[6]...
├── Driver 1 → 处理 split[1], split[4], split[7]...
└── Driver 2 → 处理 split[2], split[5], split[8]...
(N = maxDrivers,通常 ≈ CPU 核数)

Coordinator 把属于 Worker A 的所有 splits 都增量塞进同一个 Task-0-A,task 内的多个 driver 从共享的 split queue 里抢着消费。

Splits 与 plan fragment 分开传输

Plan fragment 不包含 splits。两者通过同一个 POST /v1/task/{taskId} 的不同字段传输:

1
2
3
4
5
6
7
8
TaskUpdateRequest {
fragment: <Base64 PlanFragment> ← 算子树,只在首次创建 task 时发
sources: [ ← splits,运行时可多次增量追加
{ planNodeId: "0", ← 关联到 fragment 中的某个 source 节点
splits: [split1, split2, ...],
noMoreSplits: false } ← 全部下发完后置 true
]
}
  • **Plan fragment 描述”怎么算”**:静态,发一次
  • **Splits 描述”算什么数据”**:动态,分批追加,可能 task 启动后才陆续到达
  • 在 Velox 侧:Task::create(planFragment) 一次性建树;addSplitWithSequence(planNodeId, split) 增量喂数据;noMoreSplits(planNodeId) 标记结束
  • Source operator(TableScan/Exchange)无 split 时返回 kWaitForSplit 阻塞,等待 coordinator 下发

并行度小结

维度 由谁决定 数量
Stage 数 query plan 的 shuffle 边界 通常 2~5
每 stage 的 task 数 coordinator 选中的 worker 数 = 参与该 stage 的 worker 数
每 worker 的 task 数 该 worker 参与的 stage 数 每参与一个 stage 一个
每 task 的 pipeline 数 plan 中 join 数 = HashJoin/CrossJoin 数 + 1
每 pipeline 的 driver 数 maxDrivers 配置 ≈ CPU 核数
每 driver 的线程数 固定 1(串行执行算子链)

零之二、Split 分发与消费机制

上面讲了 split 和 plan fragment 分开传输,这里展开完整链路:从 coordinator 下发,到 worker 入队,到 source operator 领取消费。

全链路概览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Coordinator (Java, 不在本仓库)
决定 split → worker 分配(节点亲和、负载均衡)
↓ POST /v1/task/{id} body.sources[]
┌─────────────────────────────────────────────────────────────┐
│ Worker (Presto C++) │
│ │
│ TaskResource::createOrUpdateTask() │
│ 解析 TaskUpdateRequest.sources[] │
│ ↓ │
│ TaskManager::createOrUpdateTaskImpl() │
│ 1. 按 planNodeId 合并同请求内的多个 source │
│ 2. toVeloxSplit() 转换 ScheduledSplit → exec::Split │
│ 3. execTask->addSplitWithSequence(nodeId, split, seqId) │
│ 4. execTask->setMaxSplitSequenceId(nodeId, maxSeqId) │
│ 5. noMoreSplits / delayedNoMoreSplitsPlanNodes_ │
│ ↓ │
│ Velox Task: 按 (planNodeId, splitGroupId) 存入 SplitsStore │
│ ↓ │
│ TableScan(driver).getSplitOrFuture(planNodeId) │
│ 有 split → 读; 无 → kWaitForSplit; 结束 → finished │
└─────────────────────────────────────────────────────────────┘

边界说明:哪个 split 分给哪个 worker,是 coordinator(Java) 的调度决策,不在 C++ worker 代码里。Worker 侧只负责接收分配给自己的 split 并消费。

第 1 步:Worker 接收与转换

TaskResource::createOrUpdateTask()TaskUpdateRequest 解析出 sources[],每个元素(protocol::TaskSource)含:

1
2
3
4
5
6
struct TaskSource {
PlanNodeId planNodeId; // 关联到 fragment 中的 source 节点
vector<ScheduledSplit> splits; // 本批 split
bool noMoreSplits; // 该节点是否不再有 split
vector<Lifespan> noMoreSplitsForLifespan; // grouped 执行的 per-group 结束标记
};

TaskManager::createOrUpdateTaskImpl() 处理两阶段:

  1. 合并:同一请求里可能有多个相同 planNodeId 的 source,先按 planNodeId 合并 splitsnoMoreSplitsForLifespan,noMoreSplits 做 OR
  2. 转换toVeloxSplit(ScheduledSplit) 把 Presto 协议 split 转成 velox::exec::Split
    • RemoteSplitRemoteConnectorSplit(下游拉上游用)
    • EmptySplit → 空 split(占位,什么都不读)
    • 其他 → 委托给 connector 特定的 toVeloxSplit()(如 Hive split)
    • lifespan.groupidSplit.groupId(-1 表示 ungrouped)

第 2 步:addSplitWithSequence —— 为什么需要 sequence

Coordinator 可能重发同一批 split(网络重试、task update 重叠),所以用 sequence id 去重:

1
2
3
4
5
6
7
8
9
// Velox Task.cpp
bool Task::addSplitWithSequence(planNodeId, Split&& split, long sequenceId) {
// 仅当 sequenceId > splitsState.maxSequenceId 才真正入队
if (sequenceId > splitsState.maxSequenceId) {
addSplitLocked(splitsState, std::move(split));
return true;
}
return false; // 重复 split,丢弃
}
  • maxSequenceId水位线:seqId ≤ 水位的 split 视为重复直接丢弃
  • 注意:addSplitWithSequence 本身不更新水位,而是由 setMaxSplitSequenceId(nodeId, maxSeqId) 在一批 split 加完后统一设置水位
  • 对比 addSplit():无 sequence 检查,无条件入队(用于不需要幂等投递的场景)

第 3 步:Velox 侧 split 队列结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 每个 plan node 一份状态
struct SplitsState {
bool sourceIsTableScan{false};
bool noMoreSplits{false};
long maxSequenceId{LONG_MIN}; // 去重水位
// splitGroupId → 该 group 的 split 队列
unordered_map<uint32_t, unique_ptr<SplitsStore>> groupSplitsStores;
};

// 单个 (planNodeId, splitGroupId) 的 split 队列
class SplitsStore { // 实现 QueueSplitsStore
deque<Split> splits_; // 待消费 split
bool noMoreSplits_{false};
vector<ContinuePromise> promises_; // 等待 split 的 driver
};
  • splitsStates_map<PlanNodeId, SplitsState>,每个 source 节点一份
  • ungrouped 执行:每个节点只有一个 store(key = kUngroupedGroupId)
  • grouped 执行:每个 split group 一个 store

为什么拆成 SplitsState / SplitsStore 两层

这个两层嵌套不是随意设计,而是两个正交维度被拆开:

  • SplitsState = per plan node(每个 source 节点一份)
  • SplitsStore = per **(plan node, split group)**(每个 split group 一个队列)

有四个独立的设计力推动这个拆分:

1. 去重水位是 node 级,消费队列是 group 级——作用域不同。
maxSequenceId 去重水位必须是 node 级:coordinator 给每个 plan node 的 split 分配连续 sequence id,与 group 无关。而”排队 + 阻塞 + 消费”是 group 级的。两者作用域天然不同。

2. Bucketed 执行要求”一个节点 N 个独立队列”(最硬的需求)。
同一个 TableScan 节点产出的 split 带不同 groupId,每个 group 的 driver 组只能拉自己 group 的 split,且要独立阻塞——group 5 队列空了阻塞,不能影响 group 1 继续跑。所以每个 group 必须有自己的队列和自己的阻塞 promises,强制了 map<groupId, SplitsStore>

3. 阻塞/唤醒机制必须跟队列绑定。
SplitsStore::promises_(等待 split 的 driver)是 per-group 的——driver 阻塞的是”某个 group 队列空了”,不是”整个节点没数据”。所以 promises 放在 store 里而非 state 里。这也是 SplitsState 显式 delete 拷贝构造的原因:它间接持有 promise,地址必须稳定(driver 持有指向它的引用)。

4. SplitsStore 是抽象类,还抽象了”split 从哪来”。
store 带虚函数(nextSplit / requestBarrier),注释原文:either can accumulate splits through addSplit() or generating splits from its own source。即 split 来源可替换:QueueSplitsStore 被动接收 coordinator push 的 split,或自生成 source。用类继承而非结构体字段来解耦”消费接口”与”来源实现”。

SplitsState SplitsStore
作用域 每个 plan node 每个 (node, split group)
装什么 去重水位、node 级 noMoreSplits、是否 TableScan split 队列、barrier split、阻塞 promises
生命周期 整个 task 随 split group 动态创建/清理
形态 不可拷贝 struct 带虚函数的抽象类

一句话SplitsState 管”节点整体元数据”,SplitsStore 管”某 group 具体怎么排队和阻塞”。根本原因是 grouped 执行要求同一节点下多个 group 各自独立排队与阻塞。

非 grouped 执行(ungrouped)长什么样

绝大多数普通查询(非 bucketed)走的是 ungrouped 执行,此时这套结构退化成最简单的形态:

1
2
3
4
5
6
7
8
9
10
11
ungrouped 执行(所有 split.groupId == -1)

SplitsState (planNode "0")
├── maxSequenceId
├── noMoreSplits
└── groupSplitsStores
└── [kUngroupedGroupId] → SplitsStore ← 整个节点只有这一个 store
├── splits_ (一条队列)
└── promises_

所有 driver(Driver 0..N)都从这唯一一个 store 抢 split
ungrouped 执行 grouped 执行
split.groupId -1 bucket id(0..N-1)
每节点 store 数 1(kUngroupedGroupId 每 bucket 一个
driver 创建时机 task start 时一次性建好 每个 group 到达时按需创建
split 消费 所有 driver 抢同一队列,谁空闲谁拿 每 group 的 driver 只拿本 group split
阻塞粒度 整个节点一个 promises 集合 每 group 独立阻塞
noMoreSplits 通知唯一的 store 通知该节点所有 group 的 store
适用场景 普通表扫描、shuffle 后的 join bucketed 表的 grouped/colocated join

关键点:ungrouped 不是”另一套代码路径”,而是 grouped 设计的特例——map<groupId, store> 里只有一个固定 key kUngroupedGroupId = UINT32_MAX 的 entry。这样一套数据结构同时覆盖两种执行模式,无需为常规查询单独写一条路径。多个 driver 并行消费同一队列,靠的就是 SplitsStore 内部对队列和 promises 的加锁访问(实际锁在 Task::mutex_)。

第 4 步:Source operator 领取 split

TableScangetOutput() 中调用 Task::getSplitOrFuture() 领取 split:

1
2
3
4
5
6
BlockingReason Task::getSplitOrFuture(
driverId, splitGroupId, planNodeId, maxPreloadSplits, preload,
Split& split, ContinueFuture& future) {
// 取 (planNodeId, splitGroupId) 对应的 SplitsStore
// 调 store->nextSplit(...)
}

SplitsStore::nextSplit() 三种结果:

情况 返回
splits_ 非空 弹出 split,kNotBlocked
队列空但已 noMoreSplits_ 无 split,kNotBlocked(operator 据此 finish)
队列空且未结束 创建 ContinuePromise 存入 promises_,返回 kWaitForSplit + future

阻塞时 driver 退出线程池;当新 split 入队或 noMoreSplits 触发时,fulfill promises_ 唤醒 driver 重新调度。

第 5 步:noMoreSplits 的两个层级 + 延迟处理

两个层级

1
2
3
4
5
6
7
8
9
10
// 单个 group 结束(grouped 执行)
void Task::noMoreSplitsForGroup(planNodeId, splitGroupId);

// 整个 plan node 结束
void Task::noMoreSplits(planNodeId) {
splitsState.noMoreSplits = true;
// ungrouped: 通知唯一的 store
// grouped: 通知该节点所有 group 的 store
// 若所有 split group 都处理完 → terminate(kFinished)
}

延迟处理(delayedNoMoreSplitsPlanNodes_

noMoreSplits 信号可能在 task->start() 之前就到达。若此时直接调 Velox 的 noMoreSplits 会出问题(task 还没起来)。所以:

1
2
3
4
5
6
7
8
9
10
11
12
// TaskManager.cpp
if (prestoTask->taskStarted) {
execTask->noMoreSplits(planNodeId); // 已启动,直接调
} else {
prestoTask->delayedNoMoreSplitsPlanNodes_.emplace(planNodeId); // 暂存
}

// 之后 startTaskLocked() 中补调:
for (auto& nodeId : delayedNoMoreSplitsPlanNodes_) {
execTask->noMoreSplits(nodeId);
}
delayedNoMoreSplitsPlanNodes_.clear();

否则 driver 会永久阻塞在 kWaitForSplit,等待一个永远不会来的 split。

第 6 步:Split group(bucketed/grouped 执行)

针对 bucketed 表的分组执行,split 带 groupId

1
2
3
4
5
6
7
// Task.cpp,split 带 groupId != -1 时
if (!seenSplitGroups_.contains(groupId)) {
seenSplitGroups_.insert(groupId);
queuedSplitGroups_.push(groupId);
ensureSplitGroupsAreBeingProcessedLocked(); // 为新 group 创建 driver
}
addSplitToStore(planNodeId, groupId, split);
  • 每个 split group 独立创建一组 driver,concurrentSplitGroups 控制同时处理的 group 数
  • kUngroupedGroupId = UINT32_MAX 表示非分组执行
  • group 间通过 barrier 同步(如 grouped HashJoin 的 build/probe)

Split 分发关键代码位置

环节 文件
解析 sources[] presto_cpp/main/TaskResource.cpp createOrUpdateTask()
入队 + 去重 + noMoreSplits presto_cpp/main/TaskManager.cpp createOrUpdateTaskImpl()
协议 split 转换 presto_cpp/main/PrestoToVeloxSplit.cpp toVeloxSplit()
延迟标记 presto_cpp/main/PrestoTask.h delayedNoMoreSplitsPlanNodes_
addSplitWithSequence / 去重 velox/exec/Task.cpp
SplitsState / SplitsStore velox/exec/TaskStructs.h
getSplitOrFuture velox/exec/Task.cpp
noMoreSplits / ForGroup velox/exec/Task.cpp

层级结构总览

1
2
3
4
5
6
7
8
9
10
11
12
Query
└── Stage 0 (plan fragment 0: TableScan + partial agg)
│ ├── Worker A → Task-0-A
│ │ ├── Pipeline 0: Driver0, Driver1, ... DriverN
│ │ └── Pipeline 1: Driver0 (HashBuild side)
│ ├── Worker B → Task-0-B
│ └── Worker C → Task-0-C

└── Stage 1 (plan fragment 1: Exchange + HashJoin + PartitionedOutput)
├── Worker A → Task-1-A
├── Worker B → Task-1-B
└── Worker C → Task-1-C

关键约束:

  • (worker, stage) → 唯一一个 task,同一 stage 在同一 worker 上不会有两个 task
  • 同一 query 的不同 stage 在同一 worker 上各有一个 task(可同时运行)
  • Task 内的并行度靠 Driver 数量,不靠 Task 数量
  • Coordinator 把属于某 worker 的所有 splits 都增量发给同一个 task,Driver 并行消费

一、Worker 接收查询:HTTP → Velox Task

HTTP 端点(TaskResource)

Coordinator 通过 REST API 驱动每个 Worker:

1
2
3
4
5
6
POST   /v1/task/{taskId}                              创建/更新 task(含 plan fragment + splits)
GET /v1/task/{taskId}/results/{bufferId}/{token} 下游拉取输出数据
GET /v1/task/{taskId}/status 轮询 task 状态(支持 long-polling)
DELETE /v1/task/{taskId} 终止 task
GET /v1/task/{taskId}/results/.../acknowledge ACK 已收数据
DELETE /v1/task/{taskId}/results/{bufferId} 释放 output buffer

Plan Fragment 转换(PrestoToVeloxQueryPlan)

TaskUpdateRequest 中的 fragment 字段是 Base64 编码的 JSON Plan:

1
2
3
Presto JSON PlanFragment
↓ VeloxInteractiveQueryPlanConverter
Velox PlanFragment(PlanNode 树)

关键 PlanNode 映射:

Presto PlanNode Velox PlanNode 说明
TableScanNode TableScanNode 读存储
RemoteSourceNode ExchangeNode 跨 worker 拉数据
PartitionedOutputNode PartitionedOutputNode 推数据给下游
AggregationNode AggregationNode 聚合
ExchangeNode LocalExchangeNode task 内重分区

Task 创建流程(TaskManager)

1
2
3
4
5
6
7
8
9
POST /v1/task/{taskId}

findOrCreateTask() → 创建 PrestoTask(状态 kPlanned,暂无 Velox Task)

createOrUpdateTaskImpl()
├── exec::Task::create(planFragment, queryCtx, ExecutionMode::kParallel)
├── execTask->addSplitWithSequence() ← 注册 splits
└── startTaskLocked()
└── execTask->start(maxDrivers, concurrentLifespans)

PrestoTask 是 Velox exec::Task 的包装,额外持有:

  • 待处理的 ResultRequest(下游长轮询的 HTTP 请求)
  • 状态变更 Promise(用于 long-polling)

二、Task 内部:Pipeline × Driver × Operator

Task 内结构

1
2
3
4
5
6
7
8
Task
├── Pipeline 0 (probe side: Exchange → HashProbe → Agg → PartitionedOutput)
│ ├── Driver 0 ── 一个线程,处理若干 splits
│ ├── Driver 1
│ └── Driver N (N = maxDrivers,通常 ≈ CPU 核数)

└── Pipeline 1 (build side: Exchange → HashBuild)
└── Driver 0 (build side 通常只需 1 个 driver)
  • Pipeline 数量由 plan 决定(每个 HashJoin/CrossJoin 会拆出 build/probe 两条 pipeline)
  • Driver 数量(每条 pipeline 内的并行度)由 maxDrivers 配置决定
  • 每个 Driver 独立运行,处理不同的 split 或不同分区的数据
  • 每个 Driver 内部单线程串行执行算子链

Driver 执行循环(Driver::runInternal

核心是从 sink 向 source 反向推进的 pull 模型:

1
2
3
4
5
6
7
8
loop:
从 sink 向上游遍历每个 operator:
if 下游.needsInput():
output = 上游.getOutput()
if output: 下游.addInput(output)
if operator.isBlocked(&future):
// 注册 future,退出线程池;future 完成后重新入队
break

Operator 核心接口velox/exec/Operator.h):

1
2
3
4
5
RowVectorPtr getOutput();                            // 产生一批行(RowVector)
void addInput(RowVectorPtr input); // 消费上游输出
bool needsInput(); // 是否能接收更多输入
BlockingReason isBlocked(ContinueFuture* future); // 是否阻塞,设置 future
bool isFinished(); // 是否全部完成

数据流示例(Pipeline 0)

1
2
3
4
5
6
7
8
TableScan.getOutput()        → RowVector(一批行)
↓ addInput
Filter.addInput/getOutput → 过滤后的行
↓ addInput
HashProbe.addInput → 关联 HashTable
HashProbe.getOutput() → join 结果行
↓ addInput
PartitionedOutput.addInput → 序列化,写入 OutputBuffer

阻塞与调度

Driver 运行于共享线程池(folly::CPUThreadPoolExecutor)。算子阻塞时(如等待 HashTable 建好、等待 Exchange 数据到来),Driver 持有 ContinueFuture 后退出线程;Future 完成后重新入队:

1
2
3
Driver → isBlocked() → 得到 future → 退出线程池
↓ future 完成
Driver::enqueue() → 重新调度

阻塞原因(BlockingReason)

原因 触发场景
kWaitForSplit TableScan 等待 coordinator 下发 split
kWaitForProducer Exchange/LocalExchange 等待上游数据
kWaitForConsumer PartitionedOutput OutputBuffer 已满
kWaitForJoinBuild HashProbe 等待 HashBuild 完成
kWaitForMemory 内存仲裁,等待 spill 释放内存

Pipeline 间同步:HashJoinBridge

1
2
3
Pipeline 1 (build): TableScan → HashBuild ──写入──► HashJoinBridge
│ build done
Pipeline 0 (probe): Exchange → HashProbe ◄──读取──────────┘

HashBuild 完成后通过 HashJoinBridge 通知 HashProbe 的所有 Driver 解除阻塞。


三、Task 内数据交换:LocalPartition / LocalExchange

适用场景:同一 task 内,不同 pipeline 之间需要重分区(如两阶段聚合、repartitioned join probe 侧重分区)。纯共享内存,无序列化,无网络。

“local” 的含义是 local to the task(同进程同 task),不是 local to the machine。

整体结构

1
2
3
4
5
6
7
8
9
Task 内部

Pipeline 0(上游,N 个 producer driver)
Driver 0 → ... → LocalPartition ┐
Driver 1 → ... → LocalPartition ┼─→ LocalExchangeQueue[0] ─→ LocalExchange → Driver 0
Driver 2 → ... → LocalPartition ┘─→ LocalExchangeQueue[1] ─→ LocalExchange → Driver 1
(按 key 哈希分桶) (按 partition 分桶) (source 算子)

Pipeline 1(下游,M 个 consumer driver,M = numPartitions)
  • 每个 LocalExchangeQueue 对应一个分区,有且只有一个 consumer driver 消费
  • 所有 LocalPartition producer 共享同一个 LocalExchangeMemoryManager,统一控制总缓冲上限

LocalExchangeMemoryManager

职责:跨所有 LocalExchangeQueue 追踪总内存用量,在超限时阻塞 producer。

1
2
3
4
// velox/exec/LocalPartition.h
int64_t maxBufferSize_; // 总缓冲上限
int64_t bufferedBytes_; // 当前已用字节(原子)
std::vector<ContinuePromise> promises_; // 被阻塞的 producer
  • **increaseMemoryUsage(added)**:producer 入队后调用。若 bufferedBytes_ >= maxBufferSize_,创建 ContinuePromise 存入 promises_,返回 true(producer 阻塞)
  • **decreaseMemoryUsage(removed)**:consumer 出队后调用。若降到 maxBufferSize_ 以下,取出所有 promises_ 并 fulfill,唤醒所有阻塞的 producer

LocalExchangeQueue

职责:单个分区的 FIFO 队列,协调多 producer 写入 / 单 consumer 读取。

1
2
3
4
5
folly::Synchronized<Queue> queue_;        // std::queue<pair<RowVectorPtr, int64_t>>
std::vector<ContinuePromise> consumerPromises_; // consumer 等待数据
int pendingProducers_{0}; // 尚未完成的 producer 数量
bool noMoreProducers_{false}; // 不再有新 producer
bool closed_{false};

入队(producer 调用)

1
2
3
4
5
6
7
BlockingReason enqueue(RowVectorPtr input, int64_t inputBytes, ContinueFuture* future) {
// 1. 追加到 queue_
// 2. 唤醒等待的 consumer(fulfil consumerPromises_)
// 3. 调用 memoryManager_->increaseMemoryUsage()
// → 若超限,返回 kWaitForConsumer(producer 阻塞)
// → 否则返回 kNotBlocked
}

出队(consumer 调用)

1
2
3
4
5
6
7
8
9
BlockingReason next(ContinueFuture* future, memory::MemoryPool*, RowVectorPtr* data, bool& drained) {
// 1. 队列为空:
// - 若所有 producer 已完成 → 返回 finished
// - 否则创建 consumerPromise → 返回 kWaitForProducer
// 2. 有数据:
// - 弹出队头
// - 调用 memoryManager_->decreaseMemoryUsage() → 可能唤醒 producer
// - 返回 kNotBlocked
}

Producer 生命周期管理

方法 时机 作用
addProducer() producer 构造时 pendingProducers_++
noMoreData() producer 处理完所有输入后 pendingProducers_--;若归零且 noMoreProducers_ 则唤醒 consumer
noMoreProducers() task 确认不再添加新 pipeline 后 设标志;若 pendingProducers_==0 则立即唤醒 consumer
drain() barrier 处理(grouped execution) drainedProducers_++;全部 drain 后唤醒 consumer

LocalPartition 算子(Producer,sink)

构造:从 task 获取所有分区队列,并对每个队列调用 addProducer()

addInput() 分区逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1. partitionFunction_->partition(*input, partitions_)
→ partitions_[i] = 第 i 行应去的分区号

2. 若所有行去同一个分区:直接调用 queue[p]->enqueue(input)

3. 若行分散到多个分区:
a. 统计每分区行数,构建索引数组 rawIndices_[partition]
b. 根据 singlePartitionBufferSize_ 判断用哪种策略:
- Buffer 模式(numPartitions 较多):
populatePartitionBuffer() 把行 copy 进 partitionBuffers_[p]
累积到 singlePartitionBufferSize_ * numPartitions 时统一 flush
- 即时模式(numPartitions 少或 eagerFlush):
wrapChildren() 用 dictionary 索引包装列,零拷贝创建 RowVector
立即 enqueue

wrapChildren():用 dictionary 向量包装输入列,指向原始数据,零拷贝。适合分区数少、批次大的场景。

**copy() / populatePartitionBuffer()**:把行物理复制进 partitionBuffers_[p],适合分区数多(字典开销大)的场景。

阻塞:若任意队列 enqueue() 返回 kWaitForConsumer,收集对应 future,在 isBlocked() 时返回,driver 退出线程池。

**noMoreInput()**:flush 剩余 buffer,对所有队列调用 noMoreData(),通知 consumer 不再有数据。

LocalExchange 算子(Consumer,source)

构造:从 task 获取自己负责的分区队列(partition_ = driverId mod numPartitions)。

1
2
3
4
5
6
7
// getOutput()
BlockingReason reason = queue_->next(&future_, pool(), &data, drained);
if (reason != kNotBlocked) {
blockingReason_ = reason;
return nullptr; // driver 将在 isBlocked() 中返回 future_
}
return data;

isBlocked() 直接返回 blockingReason_future_,无额外逻辑。


四、Task 间数据交换:Remote Exchange

适用场景:不同 task 之间(跨 stage),始终走 HTTP,即使两个 task 在同一台物理机上也不例外。

整体数据流

1
2
3
4
5
6
7
8
9
10
11
Worker A(上游 task)                              Worker B(下游 task)
┌────────────────────────────────┐ ┌──────────────────────────────────────┐
│ PartitionedOutput │ │ ExchangeNode(SourceOperator) │
│ ├─ partitionFunction_ │ │ └─ ExchangeClient │
│ ├─ Destination[0..N] │ │ ├─ ExchangeQueue │
│ │ └─ VectorStreamGroup │ │ │ └─ SerializedPage deque │
│ └─ OutputBufferManager │ HTTP GET │ └─ ExchangeSource[0..M] │
│ └─ OutputBuffer │ ◄──────────── │ └─ PrestoExchangeSource │
│ └─ DestinationBuffer │ │ (HTTP client) │
│ [partition 0..N] │ └──────────────────────────────────────┘
└────────────────────────────────┘

上游:PartitionedOutput

addInput() 分区流程

1
2
3
1. 估算每行序列化大小(用于 flush 时机判断)
2. partitionFunction_->partition(*input, partitions_)
3. 将每行的索引分配到 Destination[p].rows_

getOutput() 序列化 + flush 流程

1
2
3
4
5
6
7
8
9
10
11
12
for each Destination:
advance():
收集 rows_ 中的行索引
用 VectorStreamGroup 序列化(Presto / CompactRow / UnsafeRow 格式)
累积到 maxPageSize 或行数阈值时 flush()

flush():
VectorStreamGroup → IOBufOutputStream → IOBuf chain(Presto Page 格式)
bufferManager_.enqueue(taskId, partition, page)
→ OutputBuffer::enqueue()
→ DestinationBuffer[p].data_.push_back(page)
→ 若 bufferedBytes_ > maxSize_ → 返回 blocking future(producer 阻塞)

随机 flush 阈值targetSizePct_ = 70~120%):各 Driver 的 flush 时机错开,避免同时写入造成突发压力。

上游:OutputBuffer / OutputBufferManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// velox/exec/OutputBuffer.h
OutputBuffer {
std::vector<DestinationBuffer> buffers_; // 每个下游分区一个
int64_t maxSize_; // 总缓冲上限
int64_t continueSize_; // 恢复阈值(= maxSize_ * 0.9)
int64_t bufferedBytes_; // 当前已缓冲字节
std::vector<ContinuePromise> promises_; // 被阻塞的 producer
}

DestinationBuffer {
std::deque<SerializedPageBase> data_; // 待下游拉取的 pages
int64_t sequence_; // data_[0] 对应的全局序列号
std::function<...> notify_; // 有数据时唤醒等待的 HTTP 请求
}

背压bufferedBytes_ > maxSize_ 时,enqueue() 返回 ContinueFuture,producer(PartitionedOutput)阻塞。当下游 ACK 消费数据后,bufferedBytes_ 降到 continueSize_ 以下时 fulfill promises,解除阻塞。

getData() 回调机制:下游 HTTP GET 到来时若无数据,注册 notify_ 回调;有数据入队时回调触发,立即响应 HTTP 请求。

下游:ExchangeClient

同一 pipeline 的所有 Driver 共享一个 ExchangeClient,但各自独立从 ExchangeQueue 消费(多消费者)。

1
2
3
4
5
6
7
8
ExchangeClient {
ExchangeQueue queue_; // 多 source 入队,多 driver 消费
std::vector<ExchangeSource> sources_; // 每个上游 task 一个
int64_t maxQueuedBytes_; // 本地缓冲上限(默认 32MB)
int64_t minOutputBatchBytes_; // 最小返回批次(防止消费者空跑)
std::deque<ProducingSource> producingSources_; // 有剩余数据的 source
std::deque<ExchangeSource*> emptySources_; // 尚未探测数据量的 source
}

**addRemoteTaskId()**:为每个上游 task 创建一个 ExchangeSource,加入 emptySources_ 队列,启动首次数据量探测请求。

**next(consumerId, maxBytes)**(由 ExchangeNode.isBlocked() 调用):

1
2
3
4
5
6
1. queue_->dequeueLocked(maxBytes) → 若有足够数据立即返回
2. 若数据不足:pickSourcesToRequestLocked()
a. 对 emptySources_ 中的 source 发 requestDataSizes()(HEAD 请求,探测剩余量)
b. 按剩余容量(maxQueuedBytes_ - queue_.totalBytes() - totalPendingBytes_)
分配请求量给 producingSources_
c. 特殊处理:若单个 page 超过容量上限,强制发一次请求防止死锁

**ExchangeQueue**:

1
2
3
4
5
6
ExchangeQueue {
std::deque<SerializedPageBase> queue_;
int64_t totalBytes_;
int64_t minOutputBatchBytes_; // 消费者唤醒阈值
std::map<int, ContinuePromise> promises_; // consumerId → promise
}

**enqueueLocked()**(ExchangeSource 收到数据后调用):

  • nullptr 入队表示某个 source 数据结束
  • 计算 unassigned bytes,若 >= minOutputBatchBytes_ 则 fulfill 等待中的消费者 promise
  • minOutputBatchBytes_ 自适应:min(配置值, 已接收总量 / 100),防止小流量场景下消费者频繁空唤醒

**dequeueLocked(maxBytes)**:取出页面直到达到 maxBytes 或队列耗尽;数据不足时返回 ContinueFuture(消费者 driver 阻塞)。

下游:ExchangeSource 抽象接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// velox/exec/ExchangeSource.h
ExchangeSource {
std::string remoteTaskId_;
int destination_; // 拉取上游哪个分区
ExchangeQueue* queue_;
int64_t sequence_; // 下次请求的序列号(可靠传输)
bool requestPending_; // 防止并发发起重复请求
bool atEnd_;
}

virtual Response request(uint32_t maxBytes, uint32_t maxWaitMs);
virtual void requestDataSizes(uint32_t maxWaitMs);
virtual void pause(); // 提示暂停(队列满时)
virtual void close();

Response 含:

  • bytes:本次响应数据量
  • atEnd:source 是否已全部传完
  • remainingBytes:上游 DestinationBuffer 中各 page 的剩余大小列表(用于 ExchangeClient 调度决策)

下游:PrestoExchangeSource(HTTP 实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shouldRequestLocked() → true(无 pending 请求且未 atEnd)

request(maxBytes, maxWaitMs)

doRequest(sequence_, maxBytes, maxWaitMs)
HTTP GET /v1/task/{remoteTaskId}/results/{destination}/{sequence_}
Headers: X-Presto-Max-Size: maxBytes
X-Presto-Max-Wait: maxWaitMs

processDataResponse(response)
读 response headers:
X-Presto-Page-Next-Token → 更新 sequence_(下次请求用)
X-Presto-Buffer-Complete → 若 true,入队 nullptr(end marker)
X-Presto-Buffer-Remaining-Bytes → remainingBytes(返回给 ExchangeClient)
IOBuf chain → SerializedPage → queue_->enqueueLocked()
fulfill 请求 promise → ExchangeClient 调度下一批请求

重试:网络失败时指数退避(100ms → 10s,带 jitter),超过 exchangeMaxErrorDurationqueue_->setError(),task 报错。

下游:Exchange 算子

1
2
3
4
5
6
7
8
9
// isBlocked():
// 1. 若 currentPages_ 有数据 → kNotBlocked
// 2. getSplits() 获取上游 task ID → 调用 client_->addRemoteTaskId()
// 3. client_->next() → 若无数据返回 kWaitForProducer + future

// getOutput():
// 根据 serdeKind_ 分发:
// columnar(Presto): getOutputFromColumnarPages()
// row(CompactRow/UnsafeRow): getOutputFromRowPages()

**getOutputFromColumnarPages()**:用 VectorSerde::deserialize() 增量反序列化,每次读满 preferredOutputBatchBytes_ 后返回,跨 currentPages_ 中多个 page 累积。

**getOutputFromRowPages()**:先 mergePages() 把多个 page 合并成连续 IOBuf,再用 row iterator 批量反序列化。

序列号机制(可靠传输)

1
2
3
4
5
6
7
ExchangeSource.sequence_  →  HTTP GET .../results/{partition}/{sequence}

上游 DestinationBuffer[partition].sequence_ → data_[0] 的全局编号
(验证请求序列号是否与当前可用数据对齐)

响应 header X-Presto-Page-Next-Token → 下次请求的 sequence
(ExchangeSource 更新 sequence_ 用于下次请求)

ACKGET .../results/{partition}/{sequence}/acknowledgeOutputBuffer::acknowledge() → 删除 sequence 之前的所有 pages,释放内存。

流控总结

层级 机制 上限
OutputBuffer(上游) bufferedBytes_ > maxSize_ 时阻塞 producer task.maxOutputBufferSize(默认 32MB)
ExchangeQueue(下游本地缓存) totalBytes_ 控制发起的请求量 maxQueuedBytes_(默认 32MB)
ExchangeClient 请求调度 maxQueuedBytes_ - totalBytes_ - pendingBytes = 可发请求量 动态计算
消费者唤醒阈值 minOutputBatchBytes_ 防止小批次频繁唤醒 自适应(已收量 / 100)

五、内存管理

内存池形成层级树,每层独立追踪和回收:

1
2
3
4
5
QueryCtx::MemoryPool(query 级,所有 task 共享上限)
└── Task::MemoryPool(task 级)
├── Operator pools(每个 operator 实例独立池)
├── ExchangeClient pool(每条 pipeline 一个)
└── LocalExchange pool
  • Task::MemoryReclaimer 在内存压力下触发 spill(HashAggregation、HashJoin 等支持 spill 的算子)
  • Operator 通过 addOperatorPool(planNodeId, splitGroupId, pipelineId, driverId) 在构造时申请子池

六、完整端到端数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Coordinator

├── POST /v1/task/A0 → Worker A (Stage 0: TableScan → HashBuild → PartitionedOutput)
│ splits: [file0, file1, file2, ...]

├── POST /v1/task/B1 → Worker B (Stage 1: Exchange → HashProbe → Agg → PartitionedOutput)

└── POST /v1/task/C2 → Worker C (Stage 2: Exchange → FinalAgg → PartitionedOutput)

执行时序:
Worker A: TableScan(splits) → HashBuild → PartitionedOutput → OutputBuffer[0..N]
↑ HTTP GET
Worker B: PrestoExchangeSource ────────────────────────────────────────┘
ExchangeNode → HashProbe → Agg → PartitionedOutput → OutputBuffer
↑ HTTP GET
Worker C: PrestoExchangeSource ────────────────────────────────────────┘
ExchangeNode → FinalAgg → PartitionedOutput → OutputBuffer
↑ HTTP GET (coordinator 拉结果)
Coordinator: GET /v1/task/C2/results/0/0

七、关键代码位置

组件 文件
HTTP 端点 presto_cpp/main/TaskResource.cpp
Task 生命周期管理 presto_cpp/main/TaskManager.cpp
Plan 转换 presto_cpp/main/PrestoToVeloxQueryPlan.cpp
Velox Task velox/exec/Task.h, velox/exec/Task.cpp
Driver 执行循环 velox/exec/Driver.cpprunInternal()
Operator 接口 velox/exec/Operator.h
LocalPartition/LocalExchange velox/exec/LocalPartition.h, velox/exec/LocalPartition.cpp
Remote Exchange 算子 velox/exec/Exchange.h, velox/exec/Exchange.cpp
ExchangeClient + ExchangeQueue velox/exec/ExchangeClient.h/cpp, velox/exec/ExchangeQueue.h/cpp
ExchangeSource 抽象接口 velox/exec/ExchangeSource.h
HTTP 数据拉取实现 presto_cpp/main/PrestoExchangeSource.h/cpp
PartitionedOutput velox/exec/PartitionedOutput.h, velox/exec/PartitionedOutput.cpp
OutputBuffer / Manager velox/exec/OutputBuffer.h/cpp, velox/exec/OutputBufferManager.h/cpp