Velox Query In the Presto Worker
零、Query 如何并行执行
一个 query 的并行度由三个层级叠加而成,从粗到细:
1 | Stage 并行(query 切成多个 plan fragment) |
1. Stage 级:query 切成多个 plan fragment
Coordinator 把 query 的执行计划按 shuffle 边界(exchange)切成多个 plan fragment,每个对应一个 stage:
1 | Query |
- **Plan fragment 描述”怎么算”**:算子树结构,静态,task 创建时发一次
- 上下游 stage 通过
PartitionedOutput(上游 sink)↔RemoteSource/Exchange(下游 source)衔接
2. Task 级:每个 stage 分发到多个 worker
Coordinator 把每个 stage 的 plan fragment 分发到参与该 stage 的所有 worker,每个 worker 为该 fragment 创建一个 task:
1 | Stage 0 (Fragment 0) Stage 1 (Fragment 1) |
唯一性是按 (worker, stage) 维度,不是按 (worker, query) 维度:
| 说法 | 对错 |
|---|---|
| 一个 query 每个 worker 只收到一个 plan fragment | ❌ |
一个 (worker, stage) 只对应一个 plan fragment → 一个 task |
✅ |
| 同一 stage 的同一 fragment 不会重复发给同一 worker | ✅ |
1 | (worker, stage) → 唯一一个 plan fragment → 唯一一个 task |
所以同一个 worker 上,同一个 query 通常同时运行多个 task(每个 stage 一个),它们彼此独立,用不同的内存池和 driver 线程。
3. Driver 级:每个 task 内的并行靠 driver
Task 内的并行度不靠 task 数量,靠 driver 数量:
1 | Worker A, Task-0-A |
Coordinator 把属于 Worker A 的所有 splits 都增量塞进同一个 Task-0-A,task 内的多个 driver 从共享的 split queue 里抢着消费。
Splits 与 plan fragment 分开传输
Plan fragment 不包含 splits。两者通过同一个 POST /v1/task/{taskId} 的不同字段传输:
1 | TaskUpdateRequest { |
- **Plan fragment 描述”怎么算”**:静态,发一次
- **Splits 描述”算什么数据”**:动态,分批追加,可能 task 启动后才陆续到达
- 在 Velox 侧:
Task::create(planFragment)一次性建树;addSplitWithSequence(planNodeId, split)增量喂数据;noMoreSplits(planNodeId)标记结束 - Source operator(
TableScan/Exchange)无 split 时返回kWaitForSplit阻塞,等待 coordinator 下发
并行度小结
| 维度 | 由谁决定 | 数量 |
|---|---|---|
| Stage 数 | query plan 的 shuffle 边界 | 通常 2~5 |
| 每 stage 的 task 数 | coordinator 选中的 worker 数 | = 参与该 stage 的 worker 数 |
| 每 worker 的 task 数 | 该 worker 参与的 stage 数 | 每参与一个 stage 一个 |
| 每 task 的 pipeline 数 | plan 中 join 数 | = HashJoin/CrossJoin 数 + 1 |
| 每 pipeline 的 driver 数 | maxDrivers 配置 |
≈ CPU 核数 |
| 每 driver 的线程数 | 固定 | 1(串行执行算子链) |
零之二、Split 分发与消费机制
上面讲了 split 和 plan fragment 分开传输,这里展开完整链路:从 coordinator 下发,到 worker 入队,到 source operator 领取消费。
全链路概览
1 | Coordinator (Java, 不在本仓库) |
边界说明:哪个 split 分给哪个 worker,是 coordinator(Java) 的调度决策,不在 C++ worker 代码里。Worker 侧只负责接收分配给自己的 split 并消费。
第 1 步:Worker 接收与转换
TaskResource::createOrUpdateTask() 从 TaskUpdateRequest 解析出 sources[],每个元素(protocol::TaskSource)含:
1 | struct TaskSource { |
TaskManager::createOrUpdateTaskImpl() 处理两阶段:
- 合并:同一请求里可能有多个相同
planNodeId的 source,先按 planNodeId 合并splits、noMoreSplitsForLifespan,noMoreSplits做 OR - 转换:
toVeloxSplit(ScheduledSplit)把 Presto 协议 split 转成velox::exec::SplitRemoteSplit→RemoteConnectorSplit(下游拉上游用)EmptySplit→ 空 split(占位,什么都不读)- 其他 → 委托给 connector 特定的
toVeloxSplit()(如 Hive split) lifespan.groupid→Split.groupId(-1 表示 ungrouped)
第 2 步:addSplitWithSequence —— 为什么需要 sequence
Coordinator 可能重发同一批 split(网络重试、task update 重叠),所以用 sequence id 去重:
1 | // Velox Task.cpp |
maxSequenceId是水位线:seqId ≤ 水位的 split 视为重复直接丢弃- 注意:
addSplitWithSequence本身不更新水位,而是由setMaxSplitSequenceId(nodeId, maxSeqId)在一批 split 加完后统一设置水位 - 对比
addSplit():无 sequence 检查,无条件入队(用于不需要幂等投递的场景)
第 3 步:Velox 侧 split 队列结构
1 | // 每个 plan node 一份状态 |
splitsStates_:map<PlanNodeId, SplitsState>,每个 source 节点一份- ungrouped 执行:每个节点只有一个 store(key =
kUngroupedGroupId) - grouped 执行:每个 split group 一个 store
为什么拆成 SplitsState / SplitsStore 两层
这个两层嵌套不是随意设计,而是两个正交维度被拆开:
- SplitsState = per plan node(每个 source 节点一份)
- SplitsStore = per **(plan node, split group)**(每个 split group 一个队列)
有四个独立的设计力推动这个拆分:
1. 去重水位是 node 级,消费队列是 group 级——作用域不同。maxSequenceId 去重水位必须是 node 级:coordinator 给每个 plan node 的 split 分配连续 sequence id,与 group 无关。而”排队 + 阻塞 + 消费”是 group 级的。两者作用域天然不同。
2. Bucketed 执行要求”一个节点 N 个独立队列”(最硬的需求)。
同一个 TableScan 节点产出的 split 带不同 groupId,每个 group 的 driver 组只能拉自己 group 的 split,且要独立阻塞——group 5 队列空了阻塞,不能影响 group 1 继续跑。所以每个 group 必须有自己的队列和自己的阻塞 promises,强制了 map<groupId, SplitsStore>。
3. 阻塞/唤醒机制必须跟队列绑定。SplitsStore::promises_(等待 split 的 driver)是 per-group 的——driver 阻塞的是”某个 group 队列空了”,不是”整个节点没数据”。所以 promises 放在 store 里而非 state 里。这也是 SplitsState 显式 delete 拷贝构造的原因:它间接持有 promise,地址必须稳定(driver 持有指向它的引用)。
4. SplitsStore 是抽象类,还抽象了”split 从哪来”。
store 带虚函数(nextSplit / requestBarrier),注释原文:either can accumulate splits through addSplit() or generating splits from its own source。即 split 来源可替换:QueueSplitsStore 被动接收 coordinator push 的 split,或自生成 source。用类继承而非结构体字段来解耦”消费接口”与”来源实现”。
| SplitsState | SplitsStore | |
|---|---|---|
| 作用域 | 每个 plan node | 每个 (node, split group) |
| 装什么 | 去重水位、node 级 noMoreSplits、是否 TableScan | split 队列、barrier split、阻塞 promises |
| 生命周期 | 整个 task | 随 split group 动态创建/清理 |
| 形态 | 不可拷贝 struct | 带虚函数的抽象类 |
一句话:SplitsState 管”节点整体元数据”,SplitsStore 管”某 group 具体怎么排队和阻塞”。根本原因是 grouped 执行要求同一节点下多个 group 各自独立排队与阻塞。
非 grouped 执行(ungrouped)长什么样
绝大多数普通查询(非 bucketed)走的是 ungrouped 执行,此时这套结构退化成最简单的形态:
1 | ungrouped 执行(所有 split.groupId == -1) |
| ungrouped 执行 | grouped 执行 | |
|---|---|---|
split.groupId |
-1 | bucket id(0..N-1) |
| 每节点 store 数 | 1(kUngroupedGroupId) |
每 bucket 一个 |
| driver 创建时机 | task start 时一次性建好 | 每个 group 到达时按需创建 |
| split 消费 | 所有 driver 抢同一队列,谁空闲谁拿 | 每 group 的 driver 只拿本 group split |
| 阻塞粒度 | 整个节点一个 promises 集合 | 每 group 独立阻塞 |
noMoreSplits |
通知唯一的 store | 通知该节点所有 group 的 store |
| 适用场景 | 普通表扫描、shuffle 后的 join | bucketed 表的 grouped/colocated join |
关键点:ungrouped 不是”另一套代码路径”,而是 grouped 设计的特例——map<groupId, store> 里只有一个固定 key kUngroupedGroupId = UINT32_MAX 的 entry。这样一套数据结构同时覆盖两种执行模式,无需为常规查询单独写一条路径。多个 driver 并行消费同一队列,靠的就是 SplitsStore 内部对队列和 promises 的加锁访问(实际锁在 Task::mutex_)。
第 4 步:Source operator 领取 split
TableScan 在 getOutput() 中调用 Task::getSplitOrFuture() 领取 split:
1 | BlockingReason Task::getSplitOrFuture( |
SplitsStore::nextSplit() 三种结果:
| 情况 | 返回 |
|---|---|
splits_ 非空 |
弹出 split,kNotBlocked |
队列空但已 noMoreSplits_ |
无 split,kNotBlocked(operator 据此 finish) |
| 队列空且未结束 | 创建 ContinuePromise 存入 promises_,返回 kWaitForSplit + future |
阻塞时 driver 退出线程池;当新 split 入队或 noMoreSplits 触发时,fulfill promises_ 唤醒 driver 重新调度。
第 5 步:noMoreSplits 的两个层级 + 延迟处理
两个层级:
1 | // 单个 group 结束(grouped 执行) |
延迟处理(delayedNoMoreSplitsPlanNodes_):
noMoreSplits 信号可能在 task->start() 之前就到达。若此时直接调 Velox 的 noMoreSplits 会出问题(task 还没起来)。所以:
1 | // TaskManager.cpp |
否则 driver 会永久阻塞在 kWaitForSplit,等待一个永远不会来的 split。
第 6 步:Split group(bucketed/grouped 执行)
针对 bucketed 表的分组执行,split 带 groupId:
1 | // Task.cpp,split 带 groupId != -1 时 |
- 每个 split group 独立创建一组 driver,
concurrentSplitGroups控制同时处理的 group 数 kUngroupedGroupId = UINT32_MAX表示非分组执行- group 间通过 barrier 同步(如 grouped HashJoin 的 build/probe)
Split 分发关键代码位置
| 环节 | 文件 |
|---|---|
| 解析 sources[] | presto_cpp/main/TaskResource.cpp createOrUpdateTask() |
| 入队 + 去重 + noMoreSplits | presto_cpp/main/TaskManager.cpp createOrUpdateTaskImpl() |
| 协议 split 转换 | presto_cpp/main/PrestoToVeloxSplit.cpp toVeloxSplit() |
| 延迟标记 | presto_cpp/main/PrestoTask.h delayedNoMoreSplitsPlanNodes_ |
| addSplitWithSequence / 去重 | velox/exec/Task.cpp |
| SplitsState / SplitsStore | velox/exec/TaskStructs.h |
| getSplitOrFuture | velox/exec/Task.cpp |
| noMoreSplits / ForGroup | velox/exec/Task.cpp |
层级结构总览
1 | Query |
关键约束:
(worker, stage)→ 唯一一个 task,同一 stage 在同一 worker 上不会有两个 task- 同一 query 的不同 stage 在同一 worker 上各有一个 task(可同时运行)
- Task 内的并行度靠 Driver 数量,不靠 Task 数量
- Coordinator 把属于某 worker 的所有 splits 都增量发给同一个 task,Driver 并行消费
一、Worker 接收查询:HTTP → Velox Task
HTTP 端点(TaskResource)
Coordinator 通过 REST API 驱动每个 Worker:
1 | POST /v1/task/{taskId} 创建/更新 task(含 plan fragment + splits) |
Plan Fragment 转换(PrestoToVeloxQueryPlan)
TaskUpdateRequest 中的 fragment 字段是 Base64 编码的 JSON Plan:
1 | Presto JSON PlanFragment |
关键 PlanNode 映射:
| Presto PlanNode | Velox PlanNode | 说明 |
|---|---|---|
TableScanNode |
TableScanNode |
读存储 |
RemoteSourceNode |
ExchangeNode |
跨 worker 拉数据 |
PartitionedOutputNode |
PartitionedOutputNode |
推数据给下游 |
AggregationNode |
AggregationNode |
聚合 |
ExchangeNode |
LocalExchangeNode |
task 内重分区 |
Task 创建流程(TaskManager)
1 | POST /v1/task/{taskId} |
PrestoTask 是 Velox exec::Task 的包装,额外持有:
- 待处理的
ResultRequest(下游长轮询的 HTTP 请求) - 状态变更 Promise(用于 long-polling)
二、Task 内部:Pipeline × Driver × Operator
Task 内结构
1 | Task |
- Pipeline 数量由 plan 决定(每个 HashJoin/CrossJoin 会拆出 build/probe 两条 pipeline)
- Driver 数量(每条 pipeline 内的并行度)由
maxDrivers配置决定 - 每个 Driver 独立运行,处理不同的 split 或不同分区的数据
- 每个 Driver 内部单线程串行执行算子链
Driver 执行循环(Driver::runInternal)
核心是从 sink 向 source 反向推进的 pull 模型:
1 | loop: |
Operator 核心接口(velox/exec/Operator.h):
1 | RowVectorPtr getOutput(); // 产生一批行(RowVector) |
数据流示例(Pipeline 0):
1 | TableScan.getOutput() → RowVector(一批行) |
阻塞与调度
Driver 运行于共享线程池(folly::CPUThreadPoolExecutor)。算子阻塞时(如等待 HashTable 建好、等待 Exchange 数据到来),Driver 持有 ContinueFuture 后退出线程;Future 完成后重新入队:
1 | Driver → isBlocked() → 得到 future → 退出线程池 |
阻塞原因(BlockingReason):
| 原因 | 触发场景 |
|---|---|
kWaitForSplit |
TableScan 等待 coordinator 下发 split |
kWaitForProducer |
Exchange/LocalExchange 等待上游数据 |
kWaitForConsumer |
PartitionedOutput OutputBuffer 已满 |
kWaitForJoinBuild |
HashProbe 等待 HashBuild 完成 |
kWaitForMemory |
内存仲裁,等待 spill 释放内存 |
Pipeline 间同步:HashJoinBridge
1 | Pipeline 1 (build): TableScan → HashBuild ──写入──► HashJoinBridge |
HashBuild 完成后通过 HashJoinBridge 通知 HashProbe 的所有 Driver 解除阻塞。
三、Task 内数据交换:LocalPartition / LocalExchange
适用场景:同一 task 内,不同 pipeline 之间需要重分区(如两阶段聚合、repartitioned join probe 侧重分区)。纯共享内存,无序列化,无网络。
“local” 的含义是 local to the task(同进程同 task),不是 local to the machine。
整体结构
1 | Task 内部 |
- 每个
LocalExchangeQueue对应一个分区,有且只有一个 consumer driver 消费 - 所有
LocalPartitionproducer 共享同一个LocalExchangeMemoryManager,统一控制总缓冲上限
LocalExchangeMemoryManager
职责:跨所有 LocalExchangeQueue 追踪总内存用量,在超限时阻塞 producer。
1 | // velox/exec/LocalPartition.h |
- **
increaseMemoryUsage(added)**:producer 入队后调用。若bufferedBytes_ >= maxBufferSize_,创建ContinuePromise存入promises_,返回 true(producer 阻塞) - **
decreaseMemoryUsage(removed)**:consumer 出队后调用。若降到maxBufferSize_以下,取出所有promises_并 fulfill,唤醒所有阻塞的 producer
LocalExchangeQueue
职责:单个分区的 FIFO 队列,协调多 producer 写入 / 单 consumer 读取。
1 | folly::Synchronized<Queue> queue_; // std::queue<pair<RowVectorPtr, int64_t>> |
入队(producer 调用):
1 | BlockingReason enqueue(RowVectorPtr input, int64_t inputBytes, ContinueFuture* future) { |
出队(consumer 调用):
1 | BlockingReason next(ContinueFuture* future, memory::MemoryPool*, RowVectorPtr* data, bool& drained) { |
Producer 生命周期管理:
| 方法 | 时机 | 作用 |
|---|---|---|
addProducer() |
producer 构造时 | pendingProducers_++ |
noMoreData() |
producer 处理完所有输入后 | pendingProducers_--;若归零且 noMoreProducers_ 则唤醒 consumer |
noMoreProducers() |
task 确认不再添加新 pipeline 后 | 设标志;若 pendingProducers_==0 则立即唤醒 consumer |
drain() |
barrier 处理(grouped execution) | drainedProducers_++;全部 drain 后唤醒 consumer |
LocalPartition 算子(Producer,sink)
构造:从 task 获取所有分区队列,并对每个队列调用 addProducer()。
addInput() 分区逻辑:
1 | 1. partitionFunction_->partition(*input, partitions_) |
wrapChildren():用 dictionary 向量包装输入列,指向原始数据,零拷贝。适合分区数少、批次大的场景。
**copy() / populatePartitionBuffer()**:把行物理复制进 partitionBuffers_[p],适合分区数多(字典开销大)的场景。
阻塞:若任意队列 enqueue() 返回 kWaitForConsumer,收集对应 future,在 isBlocked() 时返回,driver 退出线程池。
**noMoreInput()**:flush 剩余 buffer,对所有队列调用 noMoreData(),通知 consumer 不再有数据。
LocalExchange 算子(Consumer,source)
构造:从 task 获取自己负责的分区队列(partition_ = driverId mod numPartitions)。
1 | // getOutput() |
isBlocked() 直接返回 blockingReason_ 和 future_,无额外逻辑。
四、Task 间数据交换:Remote Exchange
适用场景:不同 task 之间(跨 stage),始终走 HTTP,即使两个 task 在同一台物理机上也不例外。
整体数据流
1 | Worker A(上游 task) Worker B(下游 task) |
上游:PartitionedOutput
addInput() 分区流程:
1 | 1. 估算每行序列化大小(用于 flush 时机判断) |
getOutput() 序列化 + flush 流程:
1 | for each Destination: |
随机 flush 阈值(targetSizePct_ = 70~120%):各 Driver 的 flush 时机错开,避免同时写入造成突发压力。
上游:OutputBuffer / OutputBufferManager
1 | // velox/exec/OutputBuffer.h |
背压:bufferedBytes_ > maxSize_ 时,enqueue() 返回 ContinueFuture,producer(PartitionedOutput)阻塞。当下游 ACK 消费数据后,bufferedBytes_ 降到 continueSize_ 以下时 fulfill promises,解除阻塞。
getData() 回调机制:下游 HTTP GET 到来时若无数据,注册 notify_ 回调;有数据入队时回调触发,立即响应 HTTP 请求。
下游:ExchangeClient
同一 pipeline 的所有 Driver 共享一个 ExchangeClient,但各自独立从 ExchangeQueue 消费(多消费者)。
1 | ExchangeClient { |
**addRemoteTaskId()**:为每个上游 task 创建一个 ExchangeSource,加入 emptySources_ 队列,启动首次数据量探测请求。
**next(consumerId, maxBytes)**(由 ExchangeNode.isBlocked() 调用):
1 | 1. queue_->dequeueLocked(maxBytes) → 若有足够数据立即返回 |
**ExchangeQueue**:
1 | ExchangeQueue { |
**enqueueLocked()**(ExchangeSource 收到数据后调用):
nullptr入队表示某个 source 数据结束- 计算 unassigned bytes,若
>= minOutputBatchBytes_则 fulfill 等待中的消费者 promise minOutputBatchBytes_自适应:min(配置值, 已接收总量 / 100),防止小流量场景下消费者频繁空唤醒
**dequeueLocked(maxBytes)**:取出页面直到达到 maxBytes 或队列耗尽;数据不足时返回 ContinueFuture(消费者 driver 阻塞)。
下游:ExchangeSource 抽象接口
1 | // velox/exec/ExchangeSource.h |
Response 含:
bytes:本次响应数据量atEnd:source 是否已全部传完remainingBytes:上游 DestinationBuffer 中各 page 的剩余大小列表(用于 ExchangeClient 调度决策)
下游:PrestoExchangeSource(HTTP 实现)
1 | shouldRequestLocked() → true(无 pending 请求且未 atEnd) |
重试:网络失败时指数退避(100ms → 10s,带 jitter),超过 exchangeMaxErrorDuration 后 queue_->setError(),task 报错。
下游:Exchange 算子
1 | // isBlocked(): |
**getOutputFromColumnarPages()**:用 VectorSerde::deserialize() 增量反序列化,每次读满 preferredOutputBatchBytes_ 后返回,跨 currentPages_ 中多个 page 累积。
**getOutputFromRowPages()**:先 mergePages() 把多个 page 合并成连续 IOBuf,再用 row iterator 批量反序列化。
序列号机制(可靠传输)
1 | ExchangeSource.sequence_ → HTTP GET .../results/{partition}/{sequence} |
ACK:GET .../results/{partition}/{sequence}/acknowledge → OutputBuffer::acknowledge() → 删除 sequence 之前的所有 pages,释放内存。
流控总结
| 层级 | 机制 | 上限 |
|---|---|---|
| OutputBuffer(上游) | bufferedBytes_ > maxSize_ 时阻塞 producer |
task.maxOutputBufferSize(默认 32MB) |
| ExchangeQueue(下游本地缓存) | totalBytes_ 控制发起的请求量 |
maxQueuedBytes_(默认 32MB) |
| ExchangeClient 请求调度 | maxQueuedBytes_ - totalBytes_ - pendingBytes = 可发请求量 |
动态计算 |
| 消费者唤醒阈值 | minOutputBatchBytes_ 防止小批次频繁唤醒 |
自适应(已收量 / 100) |
五、内存管理
内存池形成层级树,每层独立追踪和回收:
1 | QueryCtx::MemoryPool(query 级,所有 task 共享上限) |
Task::MemoryReclaimer在内存压力下触发 spill(HashAggregation、HashJoin 等支持 spill 的算子)- Operator 通过
addOperatorPool(planNodeId, splitGroupId, pipelineId, driverId)在构造时申请子池
六、完整端到端数据流
1 | Coordinator |
七、关键代码位置
| 组件 | 文件 |
|---|---|
| HTTP 端点 | presto_cpp/main/TaskResource.cpp |
| Task 生命周期管理 | presto_cpp/main/TaskManager.cpp |
| Plan 转换 | presto_cpp/main/PrestoToVeloxQueryPlan.cpp |
| Velox Task | velox/exec/Task.h, velox/exec/Task.cpp |
| Driver 执行循环 | velox/exec/Driver.cpp → runInternal() |
| Operator 接口 | velox/exec/Operator.h |
| LocalPartition/LocalExchange | velox/exec/LocalPartition.h, velox/exec/LocalPartition.cpp |
| Remote Exchange 算子 | velox/exec/Exchange.h, velox/exec/Exchange.cpp |
| ExchangeClient + ExchangeQueue | velox/exec/ExchangeClient.h/cpp, velox/exec/ExchangeQueue.h/cpp |
| ExchangeSource 抽象接口 | velox/exec/ExchangeSource.h |
| HTTP 数据拉取实现 | presto_cpp/main/PrestoExchangeSource.h/cpp |
| PartitionedOutput | velox/exec/PartitionedOutput.h, velox/exec/PartitionedOutput.cpp |
| OutputBuffer / Manager | velox/exec/OutputBuffer.h/cpp, velox/exec/OutputBufferManager.h/cpp |