Velox Memory Pool and Arbitrator
0. 架构总览
0.1 组件关系(简洁版)
1 | MemoryManager (process singleton) |
0.2 组件关系(详细版:关键字段与职责)
1 | MemoryManager (process singleton) |
0.3 Global Arbitration 线程交互时序
sequenceDiagram
participant RD as Requestor Driver
participant Arb as Arb 线程 (×1)
participant Rcl as Rcl 线程 (×N)
participant VD as Victim Driver
participant SE as spillExecutor (×M)
RD->>RD: allocate(bytes)
RD->>RD: reserve 失败 → growCapacity
RD->>Arb: enqueue(ArbitrationOperation)
RD->>RD: SUSPENDED(numThreads_-=1)
BLOCKED on ContinueFuture
Arb->>Arb: dequeue · pickVictim
Arb->>Arb: 尝试 shrink free capacity
Arb->>Rcl: dispatch(participant)
Rcl->>Rcl: lock reclaimMutex_
Rcl->>VD: requestPause()
VD->>VD: enterSuspended
numThreads_ -= 1
PAUSED (future 挂起)
VD-->>Rcl: numThreads_ == 0
pause 窗口就绪
loop per driver op
Rcl->>SE: fork op->reclaim(targetBytes)
SE->>SE: spill RowVector → SpillFile
(disk I/O)
SE-->>Rcl: spill 完成
end
Rcl->>VD: resume()
VD->>VD: leaveSuspended
numThreads_ += 1
恢复执行
Rcl->>Rcl: shrink(reclaimedBytes)
归还 capacity 给 Arbitrator
Rcl-->>Arb: reclaim 完成
Arb->>RD: grant capacity
wake ContinueFuture
RD->>RD: leaveArbitration
SUSPENDED(numThreads_+=1)
RD->>RD: retry allocate ✓
一、整体层次结构
Velox 的内存管理采用严格的四层树状结构,每层对应查询执行的一个抽象级别:
1 | MemoryManager(全局单例) |
核心文件
| 文件 | 职责 |
|---|---|
velox/common/memory/MemoryPool.h |
抽象接口与 MemoryPoolImpl |
velox/common/memory/Memory.h |
MemoryManager 生命周期管理 |
velox/common/memory/MemoryPool.cpp |
reservation / release 核心逻辑 |
velox/common/memory/MemoryArbitrator.h/cpp |
MemoryReclaimer 接口与仲裁基类 |
velox/common/memory/SharedArbitrator.h/cpp |
共享内存仲裁器实现 |
velox/common/memory/ArbitrationParticipant.h/cpp |
每个 root pool 的仲裁代理 |
velox/common/memory/ArbitrationOperation.h/cpp |
单次仲裁请求状态机 |
velox/exec/Task.cpp |
各层 pool 的创建入口 |
velox/exec/Driver.cpp |
DriverCtx::addOperatorPool() |
velox/exec/Operator.cpp |
OperatorCtx 持有 operator pool |
presto_cpp/main/QueryContextManager.cpp |
Presto 侧 root pool 创建 |
presto_cpp/main/TaskManager.cpp |
Presto 侧 task 创建编排 |
2. Pool 类型
| 类型 | 能否分配内存 | 能否创建子 Pool | 用途 |
|---|---|---|---|
kAggregate |
否 | 是 | 层次管理、聚合统计 |
kLeaf |
是 | 否 | 实际分配,追踪用量 |
Leaf pool 可选线程安全模式(threadSafe=true),同一 operator 被多线程访问时开启。
3. 各层 Pool 详解
3.1 Root Pool(Query 级)
1 | // velox/common/memory/Memory.h:216 |
- 唯一能设置 capacity 上限的节点,子孙节点不独立设 capacity。
- 同一 queryId 的所有 task(stage)共享同一个 Root Pool,内存限额在 query 粒度统一管控。
- 在 Presto Native 中由
QueryContextManager::findOrCreateQueryCtx()创建,有缓存,同一 queryId 只创建一次。
3.2 Task Pool
1 | // velox/exec/Task.cpp:710 |
- 名字格式:
task.<taskId>(如task.20240515_abc123_001.0) - 在
Task::init()中同步创建,Task 构造完成时即存在。 - 挂载
TaskReclaimer,支持 task 级别的内存回收(spill)。
3.3 Node Pool
1 | // velox/exec/Task.cpp:716 |
- 名字格式:
node.<planNodeId> - 懒创建:第一个使用该 PlanNodeId 的 Operator 实例化时才创建。
- HashJoin 特殊处理:build/probe 侧共享同一个 join node pool(名字带
[<splitGroupId>]后缀),以便跨 pipeline 聚合内存。 - 同一 plan node 的多个并发 Driver 共享同一个 Node Pool,Node Pool 聚合所有并发的内存用量。
3.4 Operator Pool
1 | // velox/exec/Task.cpp:781 |
- 名字格式:
op.<planNodeId>.<pipelineId>.<driverId>.<operatorType> - 每个 Driver 实例的每个 Operator 各自独立一个 Leaf Pool,粒度最细。
- Connector 场景:operator pool 本身是
kAggregate,其下再挂 connector 子 pool(名字再加.<connectorId>)。
触发点在 OperatorCtx 构造:
1 | // velox/exec/Operator.cpp:31 |
4. 内存分配与 Reservation 机制
Velox 采用预留(reservation)而非直接计量的模型:先向 pool 树预占配额,再委托 MemoryAllocator 实际申请物理内存。
4.1 分配流程
1 | operator->allocate(size) |
4.2 释放流程
1 | operator->free(ptr) |
4.3 Reservation 量化策略
为减少跨层加锁频率,reservation 按以下粒度向上取整:
1 | // velox/common/memory/MemoryPool.h:518 |
4.4 关键统计字段(MemoryPoolImpl)
| 字段 | 含义 | 有效层级 |
|---|---|---|
reservationBytes_ |
当前持有的预留字节(含量化膨胀) | 所有 |
usedReservationBytes_ |
实际分配出去的字节 | Leaf |
minReservationBytes_ |
强制保持的最小预留量 | Leaf |
peakBytes_ |
历史峰值 | 所有 |
cumulativeBytes_ |
累计分配总量 | 所有 |
不变量:reservationBytes_ >= usedReservationBytes_ >= 0
Aggregate pool 的 usedBytes() 通过遍历所有子孙的 usedReservationBytes_ 求和得到。
4.5 两条正交路径:capacity 配额 vs 物理内存
单一检查点设计
整条 pool 树上只有 Root Pool 一处做 capacity 检查,中间层(Operator/Node/Task)只负责”记账+传播”:
1 | // MemoryPool.cpp |
| 字段 | Operator Pool | Node Pool | Task Pool | Root Pool |
|---|---|---|---|---|
capacity_ |
— | — | — | ✓(动态可变) |
maxCapacity_ |
— | — | — | ✓(创建时定,不变) |
reservationBytes_ |
✓(记账) | ✓(记账) | ✓(记账) | ✓(参与检查) |
中间层 reservationBytes_ 仅用于:① release 时不能超过自己 reserve 量;② 诊断统计(看哪个 node/operator 占多少)。不参与决策。
配额与物理内存的正交性
Velox 内存管理中,capacity 配额管理与物理内存分配完全正交,互不感知:
1 | pool->allocate(size) |
两步均成功才算分配完成,失败原因相互独立:
reserve失败 → capacity 配额不足,触发仲裁MemoryAllocator::allocate失败 → 物理内存真正耗尽(极少见)
完整正常分配路径(capacity 充足时,不触发仲裁):
1 | pool->allocate(size) |
MemoryAllocator 完全不知道 capacity 的存在;capacity 是仲裁器维护的纯逻辑配额层。pool 的 capacity_ 不足时,MemoryAllocator 甚至还没被调用就已经失败并触发仲裁了。
5. Capacity 检查与内存仲裁
5.1 Capacity 检查
只有 Root Pool 做硬性 capacity 检查:
1 | // velox/common/memory/MemoryPool.cpp |
5.2 仲裁流程
1 | void MemoryPoolImpl::growCapacity(MemoryPool* requestor, uint64_t size) { |
仲裁器可以:
- 触发其他 query/task 的 spill 释放内存
- 收缩其他 pool 的 capacity
- 找不到足够内存时抛出 OOM,终止 query
6. Pool 的父子关系与生命周期
6.1 父子存储方式
1 | // MemoryPool.h |
父 pool 用 weak_ptr 持有子 pool——子 pool 的实际生命周期由各自的强引用持有者控制,父 pool 不延长其生命周期。
子 pool 析构时在 MemoryPoolImpl::~MemoryPoolImpl() 中自动调用 parent->dropChild(this),将自己从父 pool 的 children_ map 中移除:
1 | // velox/common/memory/MemoryPool.cpp:469 |
MemoryPool::~MemoryPool() 也断言 children_ 必须为空,确保析构顺序正确。
6.2 Root Pool 的创建与生命周期(Presto Native)
创建:QueryContextManager::findOrCreateQueryCtx() 中,同一 queryId 只创建一次:
1 | // presto_cpp/main/QueryContextManager.cpp:107 |
**缓存结构——QueryContextCache 只存 weak_ptr**:
1 | // presto_cpp/main/QueryContextManager.h:28,71 |
cache 不计入 QueryCtx 的引用计数,QueryCtx(连同 Root Pool)的实际生命周期由持有 shared_ptr<QueryCtx> 的 Task 决定。
evict() 不强杀存活的 QueryCtx:
1 | // presto_cpp/main/QueryContextManager.h:91 |
pool 名字追加单调 poolId 的原因(代码注释原文):
“In some edge case, we found some background activities such as the long-running memory arbitration process will still hold the query root memory pool even though the query ctx has been evicted out of the cache.”
内存仲裁过程在仲裁期间会持有 Root Pool 引用,导致 Root Pool 析构延迟。若此时同一 queryId 重新进来,不加 poolId 后缀会与 MemoryManager 中的旧 pool 名冲突。
Root Pool 引用计数来源:
1 | TaskResource(瞬时,调用完即丢) |
6.3 Task 管理的 Pool 成员
Task 用三个成员统一管理所有 pool:
1 | // velox/exec/Task.h:1205 |
Operator 只持有 pool_ 的裸指针(通过 operatorCtx_->pool()),所有权始终在 Task::childPools_ 中。
6.4 各层 Pool 的释放时机
| 层级 | close/release 时机 | 真正销毁时机 |
|---|---|---|
| Operator Pool | Driver::closeOperators() → op->close() → pool()->release()(仅归还多余 reservation,pool 对象存活) |
Task::~Task() 中 childPools_.clear() |
| Node Pool | 无独立 close | Task::~Task() 中 childPools_.clear() |
| Task Pool | 无独立 close | Task::~Task() 中 pool_.reset() |
| Root Pool | 无独立 close | 最后一个持有 QueryCtx 的 Task 析构后,QueryCtx 引用计数归零 |
Task::~Task() 中的析构顺序(顺序至关重要):
1 | // velox/exec/Task.cpp:483-493 |
必须先 childPools_.clear() 再 pool_.reset():子 pool 析构时调用 parent->dropChild(),此时 parent(Task Pool)必须还存活。
6.5 完整生命周期时序
1 | [Task 收到请求] |
7. Presto Native 中的完整调用链
7.1 创建时序
1 | POST /v1/task/{taskId} [TaskResource.cpp:345] |
7.2 Pool 树示例
以 TableScan → Aggregation (2并发) → PartitionedOutput 为例:
1 | Root Pool: "20240515_abc123_0" kAggregate capacity=10GB |
多个 task(stage)共享同一个 Root Pool:
1 | Root Pool: "20240515_abc123_0" capacity=10GB |
8. 内存仲裁机制(SharedArbitrator)
8.1 整体架构
1 | MemoryPool (leaf) 申请内存 |
触发入口(MemoryPool.cpp:~1128):
1 | bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) { |
!underMemoryArbitration() 是防重入关键:仲裁线程自身申请内存时允许临时超限,不会触发嵌套仲裁。
8.2 ArbitrationParticipant 与 ArbitrationOperation
ArbitrationParticipant 是 root pool 在仲裁器侧的代理,每个 root pool 注册时创建一个,拥有单调递增的 id(用于 abort 时选”最年轻”的牺牲者)。
同一 participant 的 operation 串行执行(ArbitrationParticipant.cpp:217):
1 | void ArbitrationParticipant::startArbitration(ArbitrationOperation* op) { |
8.3 Capacity 体系:从进程到 pool 的三层 quota
三层 quota 概览
整个 Velox 内存配额体系是一个三层嵌套约束:
1 | ┌─────────────────────────────────────────────────────────────┐ |
| 量 | 字段 | 含义 | Presto 配置 | 典型值 |
|---|---|---|---|---|
| 进程总上限 | SharedArbitrator::capacity_ |
所有 query 持有的 capacity 之和不能超过 | system-memory-gb |
80GB |
| 保留池 | SharedArbitrator::freeReservedCapacity_ |
给低优先级或饥饿 query 兜底 | query-reserved-memory-gb |
4GB |
| 单 query 上限 | MemoryPool::maxCapacity_ |
query 永远不能超过的 capacity(创建时定,不变) | query.max-memory-per-node |
30GB |
| 单 query 当前 | MemoryPool::capacity_ |
该 query 目前持有的动态 capacity | — | 运行时变化 |
| 初始 capacity | initMemoryPoolCapacity_ |
query 创建时同步划拨的起步配额 | memory-pool-initial-capacity |
128MB |
三个限额的角色分工:
| 限额 | 目的 | 生效层 |
|---|---|---|
arbitrator.capacity_ |
进程稳定性:所有 query 总和不能撑爆 worker | 最外层 hard line |
maxCapacity_ |
公平隔离:单个 query 不能独吞整机 | 每个 root pool 自己的上限 |
initCapacity |
性能优化:新 query 不必冷启动就触发仲裁 | 仅创建时一次性使用 |
Arbitrator.capacity_ 的来源:MemoryManager 初始化
Arbitrator 的总配额只在 MemoryManager 构造时设定一次,运行期不变:
1 | // velox/common/memory/Memory.cpp |
在 Presto Native 中,这些值在 worker 启动时从配置文件读入:
1 | // presto_cpp/main/PrestoServer.cpp |
关键性质:arbitrator.capacity_ 只是一个”逻辑配额数字”,不预占任何物理内存。物理内存仍由 jemalloc/mmap 按需分配,arbitrator 只负责账面上不超量。
8.4 Pool capacity 的赋值:addPool 初始分配 vs 运行时扩容
Pool 的 capacity_ 有两个赋值时机,均来自 SharedArbitrator::freeNonReservedCapacity_。
创建时:addPool 划拨初始 capacity
Root Pool 通过 SharedArbitrator::addPool() 注册时,arbitrator 同步从 freeNonReservedCapacity_ 划拨初始容量:
1 | // SharedArbitrator.cpp: addPool |
只要系统空闲容量足够,这一步同步完成,无需排队。若系统空闲容量不足 initMemoryPoolCapacity_,则按实际可用量给。
运行时:capacity 不足时通过仲裁快速扩容
当 maybeIncrementReservation() 返回 false,触发 growCapacity,进入本地仲裁。Step 1(maybeGrowFromSelf)是非 spill 快速路径:
1 | root->growCapacity(requestor, size) |
即使是这种”不需要 spill”的快速扩容,仍然要经过:
MemoryPoolArbitrationSection(driver 短暂进入 suspended 状态)- thread-local kLocal 仲裁上下文标记
SharedArbitrator::freeNonReservedCapacity_扣减
与”完整仲裁”的区别仅在于:Step 1 就成功返回,不需要 Step 2-5(不涉及 shrink/spill/abort 其他 pool)。
capacity 增长策略(非线性)
allocateCapacityLocked 并不总是按请求量分配,而是按增长策略计算目标量,以减少未来频繁扩容:
1 | // SharedArbitrator.cpp: maxGrowBytes |
三种路径对比汇总:
| 路径 | 触发条件 | capacity 操作 | driver 状态 | 耗时级别 |
|---|---|---|---|---|
| 正常路径 | reservationBytes_ + size ≤ capacity_ | 无(仅记账) | RUNNING | 纳秒级 |
| 快速扩容(Step 1) | capacity_ 不足,arbFree 足够 | 从 freeNonReservedCapacity_ 划拨 | 短暂 SUSPENDED | 微秒级 |
| 完整仲裁(Step 2-5) | arbFree 不足,需回收其他 pool | spill/shrink/abort 其他 pool | SUSPENDED / WAITING_ARB | 毫秒~秒级 |
addPool 不够分怎么办
freeNonReservedCapacity_ 不足时,addPool 按现有量给而非失败:
1 | // SharedArbitrator.cpp: addPool(简化) |
初始 capacity = 0 不是错误,仅意味着第一次 allocate 就会触发 growCapacity,走完整的 5 步本地仲裁流程。初始划拨只是性能优化,不是正确性必需。
全部 capacity 耗尽时的兜底链
当 freeNonReservedCapacity_ 与 freeReservedCapacity_ 都耗尽:
1 | growCapacity(op) |
物理内存是硬约束,arbitrator 宁可 kill 少数 query 也不能让进程 OOM。
8.5 本地仲裁流程
**SharedArbitrator.cpp:891**,按序尝试,成功即返回:
| 步骤 | 操作 | 说明 |
|---|---|---|
| 1 | maybeGrowFromSelf() |
仲裁器空闲容量够用?直接分配 |
| 2 | ensureCapacity() |
是否超出 pool maxCapacity?shrink 自身→reclaim 自身→再 shrink |
| 3 | growWithFreeCapacity() |
allocateCapacityLocked() 从仲裁器空闲池取容量,checkedGrow() 转给 pool |
| 4 | reclaimUnusedCapacity() |
shrink 所有其他 participants 的空闲容量(不涉及 spill),再重试步骤 3 |
| 5 | 全局仲裁 | 加入 globalArbitrationWaiters_,唤醒后台线程,阻塞等待 |
仲裁器维护两个空闲池:
freeNonReservedCapacity_:普通空闲池freeReservedCapacity_:保留池(per-query 保护容量)
优先从普通池分配,不足时补充保留池。
pool capacity 增长策略:
capacity < fastExponentialGrowthCapacityLimit(默认 512MB)→ 翻倍增长- 超过后 → 按
slowCapacityGrowPct(默认 25%)增长
Step 2 ensureCapacity 详解:处理 maxCapacity 边界
maxCapacity_ 是 query 创建时定的硬上限(= query.max-memory-per-node),即使系统有大量空闲内存也不能跨过。Step 2 专门处理”扩容会撞 maxCapacity 上限”的情形:
1 | // SharedArbitrator.cpp (简化) |
关键性质:
| 性质 | 解释 |
|---|---|
| maxCapacity 永远是硬约束 | 即使全局仲裁 spill 了 1TB 也救不了——别人 spill 出的容量不能给已到上限的 query |
| Self-reclaim 优先于 global-reclaim | 撞自己上限时,先逼自己 spill;不行才波及其他 query |
| Self-reclaim 仍是本地仲裁 | 全程在 driver 自己的线程同步执行,不需要后台仲裁线程介入 |
| 失败语义不同 | 系统级 OOM = 整机配额耗尽;单 query OOM = 自己撞 maxCap 但系统可能还很空 |
典型场景:query.max-memory-per-node = 10GB,当前 Q1.capacity_ = 9.5GB,Q1.reservationBytes_ = 9.4GB,系统 arbFree = 50GB:
1 | Q1 某 operator 再 reserve 700MB |
尽管系统有 50GB 空闲,Q1 仍被迫先自己 spill 了 300MB——这是 query.max-memory-per-node 提供的硬隔离保证。
8.6 全局仲裁(后台线程)
本地仲裁步骤 4 仍不够时,进入全局仲裁。
后台线程主循环(SharedArbitrator.cpp:1045):
1 | void SharedArbitrator::globalArbitrationMain() { |
runGlobalArbitration() 每轮逻辑:
1 | 计算 targetBytes = max(所有 waiter maxGrowBytes 之和, capacity × reclaimPct) |
Spill → Abort 切换条件(SharedArbitrator.cpp:1064):
1 | bool shouldReclaimByAbort = |
Spill 牺牲者选择(优先级依次):
- 容量分桶(
spillCapacityLimits,默认起始 4GB):优先动大用量的 - 优先级:低优先级(高数值)先 spill
- 可回收量:同优先级内可回收量大的先动
- 跳过可回收量
< minReclaimBytes(默认 128MB)的参与者
Abort 牺牲者选择(优先级依次):
- 优先级:低优先级先 abort
- 容量分桶(
abortCapacityLimits,默认起始 1GB) - participant id:越大(越年轻)越先 abort,保护老查询
对多个 victim 同时提交到 memoryReclaimExecutor_ 线程池并行执行。
Arb 线程与 Rcl 线程池的分工
全局仲裁涉及两类性质完全不同的线程,职责严格分离:
| 维度 | Arb 线程 | Reclaim 线程池 |
|---|---|---|
| 数量 | 1(常驻 globalArbitrationMain) |
N(动态 worker,由 memoryReclaimExecutor_ 管理) |
| 角色 | 决策中心 | 执行单元 |
| 主要职责 | 计算 target、选 victim、分配 cap、唤醒 waiter | requestPause + reclaim + shrink + resume |
| 并发度 | 必须串行(全局视图) | 多 victim 必须并行 |
| 操作性质 | 决策、状态机驱动 | I/O 密集(磁盘 spill) |
| 持有锁时长 | 微秒级(轻量) | 秒级(受磁盘速度限制) |
| 阻塞类型 | cv.wait(轻量) |
pauseFuture.wait + 磁盘 IO(重量) |
为什么必须分离:
| 原因 | 解释 |
|---|---|
| 决策必须串行 | 多 Arb 线程读不一致的全局状态会做出冲突决策(超分、重复 reclaim、公平性破坏) |
| 执行必须并行 | spill 是秒级慢操作;若 Arb 自己 spill,新到 waiter 全部堆积无人处理 |
| Backpressure 隔离 | 磁盘慢不阻塞决策;某 victim 卡死不影响其他 victim |
| 可观测性 | 决策点集中(1 线程)便于调试;执行点分散(N worker)便于追踪 spill |
三层并行结构
1 | Layer 1 — Arb 线程 × 1 全局决策(串行) |
各层互不阻塞:
- Layer 1 不被 spill IO 拖累 — Arb 提交完任务立即继续处理其他 waiter
- Layer 2 多 query spill 互不阻塞 — N 个 worker 独立处理 N 个 victim
- Layer 3 单 query 内多 driver 并发 spill — 同 join 的 build/probe 多 driver 同时写盘
Victim 粒度:query(= root pool = participant)
sortAndGroup(participants_) 显式按 participant 排序——victim 单位就是 root pool,对应一个 QueryCtx,即一个 query:
1 | SharedArbitrator |
排序依据全部是 query 级指标:
reclaimableUsedCapacity— 该 query 当前可 spill 总量capacity— 该 query 当前持有 cap(用于容量桶)priority— query 优先级(低优先级先被 victim)id— query 单调 id(abort 时选最年轻保护老查询)
为什么以 query 为粒度:
capacity_只在 root pool 有意义(task / node / op 无独立 cap)- 公平性/隔离边界 =
query.max-memory-per-node - abort 的失败语义 = query 整体失败(不是单 task 死)
Rcl worker 完整职责:pause→reclaim→resume→shrink
一个 rcl worker 负责一个 participant 的整个 reclaim 周期,期间持有 reclaimMutex_ 不让出:
1 | rcl_worker (从 executor 接到任务): |
关键性质:
- rcl worker 在整个 reclaim 周期内持续持锁,避免并发 reclaim/abort
- pause 是 rcl worker 发起;BLOCK 等的也是 rcl worker;resume 也是 rcl worker 触发
- victim drivers 只是被动响应
pauseRequested_,不参与决策也不执行 spill
详细的 spill 执行机制和算子 spill 状态机参见 §8.9。
同 victim query 内多 task 的串行 reclaim 顺序
如果一个 query 在 worker 上有多个 task(如 grouped execution),rcl worker 在该 participant 内对多 task 串行处理,按 reclaimable 量贪心排序:
1 | rcl_worker 持有 participant.reclaimMutex_ 全程: |
排序与早停规则:
| 规则 | 含义 |
|---|---|
| 贪心最大 | 每轮选 reclaimable 最大的 task,最小化 pause 次数 |
| 早停 | 累计 ≥ target 立即停,不处理剩余 task |
| 不并行 pause 多 task | pauseRequested_ 是 task 级;并行 pause 会超量、加大 query 整体延迟 |
| task 内可并行 | 单 task 内多 driver 通过 spillExecutor 并发 spill(Layer 3) |
示例:Q1 在该 worker 上有 3 个 task,target=1024MB,reclaimable=[T2:800, T1:500, T3:200]MB
1 | 轮 1: pause T2 → spill 800MB → resume T2 (累计 800, 差 224) |
少 pause、早收工、影响面最小化是核心目标。
Presto Native 中的常见简化:Presto 把同 query 的不同 stage 分发到不同 worker,所以一个 worker 上通常一个 query 只有 1 个 task。这种情况下多 task 串行机制不会触发,”1 rcl worker = 1 task” 是大多数实际场景的近似。但底层数据结构和锁仍以 participant 为粒度,grouped execution 场景下多 task 串行机制才会生效。
8.7 Local vs Global:作用域与线程模型
作用域辨析:local 不等于”只动自己”
| Step | 阶段 | 触及对象 | 是否动用别人 |
|---|---|---|---|
1 maybeGrowFromSelf |
Local | arbitrator 空闲池 | ✗ |
2 ensureCapacity |
Local | 自己(撞 maxCap 时 self-spill) | ✗ |
3 growWithFreeCapacity |
Local | arbitrator 空闲池(同 Step 1,重试) | ✗ |
4 reclaimUnusedCapacity |
Local | **所有 query 的”未用 capacity”**(账面 shrink) | ⚠️ 账面,不 spill |
| 5 进入全局等待 | Global | **victim task 的”已用内存”**(真 spill) | ✓ |
关键澄清:Step 4 会扫描所有 participant,把它们持有但没 reserve 的 cap(capacity_ - reservationBytes_)账面收回。这对其他 query 完全无感知——没有 driver 被暂停,没有内存被释放——所以仍归类为 local。
Local 与 global 的本质区别不是”动不动其他 query”,而是是否需要让其他 query 的 driver 真正暂停下来 spill 已用内存。
线程角色对比
仲裁涉及四种线程角色:
| 角色 | 来源 | 在 Local 中做什么 | 在 Global 中做什么 |
|---|---|---|---|
| Requestor Driver | 触发分配的 driver 线程 | 同步执行 Step 1-4 | 提交 Step 5 后 BLOCK 在 future.wait() |
| Background Arb Thread | globalArbitrationMain() 常驻线程 |
不参与 | 选 victim、分配 cap、唤醒 waiter |
| Reclaim Thread Pool | memoryReclaimExecutor_ 工作线程池 |
不参与 | requestPause + reclaim + shrink |
| Victim’s Drivers | victim task 的 N 个 driver 线程 | 不存在 victim 概念 | 自己检测 pauseRequested_ → 协作式进入 PAUSED |
| 维度 | Local | Global |
|---|---|---|
| 决策线程 | Requestor driver 自己 | 后台 arb 线程 |
| 决策时机 | 同步、即刻 | 异步、可能等待 |
| 执行 spill 线程 | Requestor driver 自己(仅 Step 2 self-spill) | Reclaim 线程池 |
| 受影响”别人”的范围 | 别人的空闲 cap(账面) | 别人的已用内存(真 spill) |
| Pause 谁 | 不 pause 任何 task | 整个 victim task(全部 driver 一起 pause) |
| Requestor driver 状态 | 同步运行仲裁逻辑 | BLOCK 在 future,让出 CPU |
Pause 粒度:Task,不是 Driver
全局仲裁的 pause 是 整个 victim task 粒度,不能是单 driver:
- HashJoin build table 是同一 join node 下多个 driver 共享的
- 如果只 pause 1 个 driver,其余还在跑,它们可能在写共享数据结构 → spill 时数据不一致
task->requestPause()是 task 级,victim 的 N 个 driver 在各自runInternal()循环里协作式自暂停,等numThreads_ == 0时 reclaim 线程才动手
三种让出机制:SUSPENDED / PAUSED / BLOCKED on future
仲裁系统其实是三种语义完全不同的”让出 CPU”机制协作完成的,每种各管一个场景:
| 机制 | 主语 | 范围 | 触发方 | 唤醒方 | 物理 CPU |
|---|---|---|---|---|---|
| SUSPENDED | 单个 driver | 自己一个 | driver 自己 (enterSuspended) |
driver 自己 (leaveSuspended) |
仍占用(账面让出) |
| PAUSED | 单个 task 的所有 driver | task 内 N 个 driver | 外部 (task->requestPause()) |
外部 (task->resume()) |
已让出(BLOCK 在 future) |
| BLOCKED on future | 单个 driver | 自己一个 | driver 自己 (waitFuture.wait()) |
外部 (promise.setValue()) |
已让出 |
三者实现机制:
1 | // ① SUSPENDED:atomic 计数变化,物理线程仍在跑 |
关键不变量:Requestor driver 在整个仲裁期间始终 SUSPENDED(numThreads_ 不计自己)。这样如果 requestor 和 victim 是同一个 task(自己被 global 选为 victim),requestPause 等 numThreads_ == 0 时不会因为 requestor 还在”运行”而死锁。
enterArbitration / leaveArbitration / enterSuspended / leaveSuspended 的分层设计
这四个 API 体现了一套”RAII + 桥接 + 多态分发 + 状态复用“的优雅设计,把 pool / reclaimer / driver / task 四层语义桥接起来。
1 | ┌──────────────────────────────────────────────────────────────────┐ |
各层设计动机:
| 层 | 设计动机 |
|---|---|
| L1 RAII | 自动配对 enter/leave,避免遗漏导致 numThreads_ 不一致;throw-safe |
| L2 Pool 委托 | Pool 不直接知道 task;通过 reclaimer 间接桥接 |
| L3 多态 virtual | 不同 pool 层级(Root/Task/Node/Operator)对”进入仲裁”语义不同 |
| L4 Task 计数 | 状态机的真正落地:原子计数变化反映 driver 在/不在跑 |
Layer 1:MemoryPoolArbitrationSection(RAII)
1 | // MemoryArbitrator.h (简化) |
RAII 价值:仲裁中途 throw(如 abort)触发栈展开,dtor 自动跑 leaveArbitration,状态不残留;任何 return 路径都不会漏。
Layer 2:MemoryPoolImpl 委托
1 | // MemoryPool.cpp:1201 |
只做”指针检查 + 转发”。Pool 不应硬编码”task 怎么 suspend”这种特定语义——挂哪个 Reclaimer 决定 enter/leave 实际干什么。
Layer 3:MemoryReclaimer 多态分发
1 | class MemoryReclaimer { |
默认 no-op 的意义:Root / Task / Node Pool 的 Reclaimer 都不需要响应仲裁进入事件,直接继承 no-op。只有 Operator::MemoryReclaimer 重写,因为只有算子层有”对应到 driver”的语义:
1 | // Operator::MemoryReclaimer (在 §8.8 已详述) |
driver->state() 不是抽象的状态枚举,而是 Driver 的内部状态对象(包含 isOnThread、阻塞原因等),让 Task 精确知道是哪个 driver 进入 suspended。
Layer 4:Task::enter/leaveSuspended
1 | // Task.cpp:3424 |
这一层的关键设计:与 pause 协议原生双向集成——enterSuspended 内部检查”是不是 numThreads_=0 后该唤醒 pauseFuture”;leaveSuspended 内部检查”task 是否仍 paused 需要继续等”。不需要外部”check + notify”包装。
完整调用链回放
1 | D0 driver 线程: |
enterSuspended 同时支撑两种语义
enterSuspended/leaveSuspended 是 Layer 4 底层原语,被两种上层场景复用:
| 上层场景 | 调用路径 | 是否物理 BLOCK |
|---|---|---|
| Requestor 进入仲裁(SUSPENDED 语义) | enterArbitration → Operator::MemoryReclaimer |
否,继续跑仲裁代码 |
| Victim driver 响应 pause(PAUSED 语义) | Driver::runInternal 直接调用 |
是,pauseFuture.wait() BLOCK |
1 | // Driver::runInternal (简化) — 也调用 enterSuspended |
复用的智慧:两个场景对 numThreads_ 的语义需求一致(让 task 知道有多少 driver 在跑),所以共享同一对原语;不同点只在”是否额外 BLOCK 在 future”。
设计精髓汇总
| 设计要点 | 价值 |
|---|---|
| RAII guard 在最外层 | 编译期保证 enter/leave 配对,throw-safe |
leaveArbitration 声明 noexcept |
析构安全;leave 不应失败 |
| Pool 委托给 Reclaimer 多态分发 | Pool 不绑定具体执行语义 |
| 基类默认 no-op | Root/Task/Node 层零开销,遵循最小知识原则 |
| Operator::MemoryReclaimer 是唯一具体实现 | 只有算子层有真正的 driver 关联 |
enterSuspended 与 pause 协议双向集成 |
一个原语支撑 SUSPENDED 和 PAUSED 两种场景 |
state 对象传引用 |
Task 能精确知道是哪个 driver,避免多对一查找 |
| Layer 4 内部处理唤醒 | 不需要外部 check+notify,原子操作内联完成 |
整体优雅性:通过严格分层让每层只关心自己的事——Pool 不知道 Task 存在、Reclaimer 不知道 Driver 内部、Task 不知道仲裁逻辑——但通过 enter/leave 这条桥连起来形成完整的协调系统。
Local 阶段:谁 suspend/resume
1 | D0 driver 线程: |
Local 中谁受影响:
| 谁 | 状态 | 仅在哪些 Step | 解除时机 |
|---|---|---|---|
| D0(requestor) | SUSPENDED 全程 | 进入仲裁到析构 | leaveArbitration |
| D1/D2/D3(同 task 其他 driver) | 仅 Step 2 self-reclaim 时 PAUSED | Step 2 | task->resume()(reclaim 完立即) |
| 其他 query 的 driver | 完全不受影响 | — | — |
Step 2 self-reclaim 是 local 阶段唯一可能 pause 其他 driver 的情形,且 pause 范围仅限自己的 task。95% 情况下 local 完全不 pause 任何 driver。
Global 阶段:完整线程交互时序
1 | [T0] D0 driver thread: |
完整协调矩阵
按时间阶段汇总各类线程的状态:
| 阶段 | D0 (requestor) | D0 同 task 其他 driver | Victim task drivers | Arb 线程 | Reclaim worker |
|---|---|---|---|---|---|
| Local Step 1/3 成功 | SUSPENDED | RUNNING | — | IDLE | IDLE |
| Local Step 2 self-reclaim | SUSPENDED | PAUSED 短暂 | — | IDLE | — |
| Local Step 4 | SUSPENDED | RUNNING | — | IDLE | IDLE |
| Local → Global 入队 | SUSPENDED | RUNNING | RUNNING | 即将唤醒 | IDLE |
| Global D0 wait | SUSPENDED + BLOCKED | RUNNING | RUNNING | RUNNING (决策) | IDLE |
| Global reclaim 期间 | SUSPENDED + BLOCKED | RUNNING | PAUSED | RUNNING | RUNNING (spill) |
| Global resume victim | SUSPENDED + BLOCKED | RUNNING | RUNNING | RUNNING | 即将完成 |
| Global notify D0 | SUSPENDED + 即将 unblock | RUNNING | RUNNING | RUNNING | IDLE |
| 退出仲裁 | RUNNING | RUNNING | RUNNING | IDLE | IDLE |
两个对比维度:
- Local vs Global: Local 期间不动其他 query;Global 才会真正 pause victim 的 drivers
- 同 task vs 跨 task: 自己 task 的 driver 在 self-reclaim 时受影响;非自己 task / 非 victim task 的 driver 全程不动
一句话总结
- Local:仲裁工作全在 requestor driver 自己的线程内同步完成,不动其他 driver;可能 self-spill;可能 shrink 别人的空闲 cap,但仅账面操作。
- Global:requestor driver 提交请求后 BLOCK 不参与;由”后台 arb 线程 + reclaim 线程池 + victim task 全体 driver 协作自暂停”三方协作完成;pause 粒度是 task,不是单 driver。
三方握手协议的设计(why)
前面的时序和协调矩阵讲的是”发生了什么“。这一节退一步,把全局仲裁看成一个四方握手协议(requestor / arb / rcl / victim),讲”为什么这样设计“。理解了这四点,就理解了整套协调的骨架。
(1) 三个交接点,三种不同的同步原语
全局仲裁本质是一条控制权接力链:requestor 把活交给 arb,arb 交给 rcl,rcl 让 victim 停下来。链上三个交接点用了三种完全不同的同步原语,不是偶然,而是每个交接点的耦合需求不同:
1 | requestor ──①waiter队列+future──▶ arb ──②executor.add()──▶ rcl ──③requestPause+future──▶ victim |
| 交接点 | 原语 | 等待语义 | 为什么是这个原语 |
|---|---|---|---|
| ① requestor → arb | globalArbitrationWaiters_ 队列 + ContinueFuture |
排队等结果 | requestor 必须拿到 capacity 才能继续,是强依赖;入队后睡眠让出 CPU,不轮询;future 能带值唤醒(capacity 数量随唤醒一起返回) |
| ② arb → rcl | memoryReclaimExecutor_.add(task) |
异步派发,不等 | 决策线程绝不能阻塞在 spill I/O 上;扔进线程池立刻回去处理下一个 waiter;多 victim 可在池里并行执行 |
| ③ rcl → victim | task->requestPause() + numThreads_==0 future |
协作式,等对方自己停 | 不能抢占——driver 可能正在改写 RowContainer;只能”请求”暂停,等每个 driver 跑完当前算子 step 后自愿进入 suspended |
一句话:①是”我等你给我结果”,②是”我把活扔给你就走”,③是”我等你自己停稳”。三种耦合强度(强同步 / 完全解耦 / 协作同步)对应三种原语,用错任何一个都会出问题——比如②若改成同步调用,arb 线程就被 spill 拖死;③若改成抢占,就会读到中间状态数据损坏。
(2) “谁阻塞在谁身上”——依赖图无环是不死锁的根
把四方的阻塞依赖画成有向图(A→B 表示 A 阻塞等待 B):
1 | requestor ──等──▶ arb ──(不阻塞,异步派发) arb 不等任何人 |
关键性质:
- arb 不阻塞在任何人身上——决策完即异步派发,立刻服务下一个请求。这是整张图无环的枢纽:如果 arb 同步等 rcl、rcl 又等 victim、victim 又可能等 arb 分配的 capacity,就成环了。异步派发(②)把这条潜在的环剪断了。
- 唯一的”等”是单向链:requestor→arb→(委托)rcl→victim,没有任何反向边构成环。
- victim 的 driver pause 后确实在”等 resume”,但 resume 是 rcl 单向推给它的信号,不是 victim 主动去”获取”什么资源——所以不构成 victim→rcl 的依赖环。
文档 §8.11 的 Race2(requestor 与 victim 同 task 自死锁)是这张图上一条特殊的潜在环:requestor 在 numThreads_ 里 → rcl 等 numThreads_=0 → 永远等不到。解法 enterSuspended() 把 requestor 从 numThreads_ 摘掉,等于在图上删掉了那条会成环的边。所以 Race2 不是孤立的 bug fix,而是”维持依赖图无环”这条总不变量的一个具体落点。
(3) 为什么是 future/promise,而不是条件变量 / 同步调用
协调里所有”等”都用 ContinueFuture/ContinuePromise,不用 std::condition_variable,也不用同步阻塞调用。三个理由:
- 一次性 + 带值传递:仲裁结果是”分到了多少 capacity”,future 可以让这个值随唤醒一起传回(
op.notifyWaitFuture()带着 allocated 量)。condvar 只能唤醒、不能带值,还得唤醒后再加锁读共享变量。 - 不需要重新检查谓词:condvar 有虚假唤醒,醒来必须
while(!pred) wait()重查。future 是”被 set 才醒”,醒来即代表事件确定发生,逻辑更简单、更难写错。 - 解耦决策与执行(呼应 §10.1 第7条):异步派发(②)必须配 future——arb 把活扔给 rcl 后,requestor 挂在 future 上,rcl 干完由它(或 arb)来 set future。整个链条上没有任何一个线程需要同步等另一个线程的函数返回,这是”决策线程永不被 I/O 阻塞”能成立的前提。
对比一下:如果用同步调用(arb 直接 call rcl.reclaim()),代码看着简单,但 arb 线程就被一次 spill(可能几百 ms 磁盘 I/O)独占,后面排队的 waiter 全饿死——决策的串行化退化成了执行的串行化,整个仲裁吞吐崩塌。future + executor 是把这两件事拆开的唯一方式。
(4) 握手交接的不只是控制权——所有权与上下文要一起交接
最容易被忽略的一点:当控制权在线程间交接时,两样东西必须跟着一起交接,否则接力链断裂。文档把它们当成 Race6/Race7 分开讲了,这里点明它们其实是”握手协议”不可分割的一部分:
| 跟着交接的东西 | 机制 | 不交接会怎样 |
|---|---|---|
| 对象所有权(pool 保活) | ScopedArbitrationParticipant 把 victim 的 weak_ptr<pool> 升级为 shared_ptr,全程持有(Race6) |
rcl 正在 spill victim 的 pool,query 端却把 pool 析构了 → 悬空指针 |
| 仲裁上下文(thread-local) | createAsyncMemoryReclaimTask 把 kGlobal context 显式打包进 lambda,新线程 ScopedMemoryArbitrationContext 恢复(Race7) |
rcl 线程内 spill 又申请内存 → 不知道”自己在仲裁中” → 嵌套触发仲裁 → 死锁 |
为什么这是”握手”的一部分而非附加的 race 补丁?因为控制权(②的 executor.add)是跨线程的——一旦跨线程,thread-local 的上下文不会自动跟过去,栈上持有的 shared_ptr 也不会自动延寿。所以交接控制权的那一刻,必须手动把”执行这段活所需的所有隐式状态”一起打包过去。这是所有”把任务扔进线程池”的异步设计都要面对的根本问题:线程池任务必须是自包含的(self-contained),不能依赖提交线程的任何隐式上下文。Velox 这里给了两个教科书式的处理——一个保活对象,一个传播上下文。
小结:四个 why 串起来
1 | ①②③ 三种原语 → 耦合强度决定原语:强同步 / 解耦 / 协作 |
四点指向同一个设计内核:让决策(arb)保持轻量串行、让执行(rcl)异步并行、让交接(future + 打包)自包含无环。这正是 §9.2「决策/执行分离」在线程协调层面的具体兑现。
8.8 MemoryReclaimer 层次体系
所有权与引用关系
Reclaimer 严格绑定到 pool(pool 拥有 reclaimer),但内部持有对应资源(task / operator)的指针来执行真正的回收逻辑:
1 | MemoryPool |
- 正向:
pool->reclaimer()返回它持有的 reclaimer - 反向:reclaimer 内部存指针指向真正”能 spill”的对象
为什么这样设计:
- pool 是结构单元(树形组织),但不知道算子语义
- operator 是行为单元(HashJoin 怎么 spill),但不参与树形遍历
- reclaimer 是桥梁:挂在 pool 上让”按 pool 树遍历”的逻辑能联通;内部持算子指针让”真正干活”的逻辑能调到算子
基类接口
1 | // MemoryArbitrator.h |
默认基类行为:贪心向下递归
1 | uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t target, ...) { |
各层 Reclaimer 子类 overload 这个行为,加入特定动作(pause、并发分发等)。
按 pool 层级的 Reclaimer 类型
| Pool 层 | Reclaimer 类型 | 谁创建 | 引用资源 | 主要职责 |
|---|---|---|---|---|
| Root Pool | 基类 MemoryReclaimer(MemoryReclaimer::create()) |
QueryCtx::createPool() 调 addRootPool 时强制传入 |
— | 贪心路由:按 reclaimableBytes 降序遍历 Task children,逐个调 reclaim;自己不持数据,不 pause |
| Task Pool | Task::MemoryReclaimer (TaskReclaimer) |
Task::initTaskPool() |
Task* |
**触发 task->requestPause()**,等 numThreads_=0,递归向下 |
| Node Pool | ParallelMemoryReclaimer |
Task::getOrAddNodePool() |
spillExecutor_ |
跨多 driver 的 op pool fork-join 并发 spill |
| Operator Pool | Operator::MemoryReclaimer 实例 |
Operator 构造时 pool()->setReclaimer() |
Operator* |
调算子虚函数 op->reclaim(),转接到具体 spill |
TaskReclaimer:触发 pause + 委托基类向下递归
Task::MemoryReclaimer 是 Task 的嵌套类,挂在 Task Pool 上。其核心职责是在 reclaim 子 pool 之前 pause 整个 task,spill 完成后 resume。
1 | // Task.h |
**为什么用 weak_ptr
1 | Task 持有 shared_ptr<MemoryPool> pool_ ← Task 拥有 Task Pool |
reclaim() 实现
1 | // Task.cpp (简化) |
关键点:
| 设计要素 | 用意 |
|---|---|
weak_ptr<Task> + lock() 检查 |
task 可能在 reclaim 前已被销毁,安全降级返回 0 |
| try/catch 包住 reclaim 中段 | 任何异常都确保 task->resume() 被调用——否则 task 永远卡在 paused |
future.wait(maxWaitMs) 超时 |
如果 drivers 都卡在 nonReclaimableSection_ 内永远 pause 不下来,超时退出避免死等 |
复用基类 reclaim() 实现 |
TaskReclaimer 只加 pause/resume 包装,向下找 children 用基类默认贪心逻辑 |
priority_ 字段 |
给 SharedArbitrator 排序 victim 用(低优先级先 spill) |
reclaim 流程的 “三明治结构”
1 | TaskReclaimer::reclaim |
这就是为什么 TaskReclaimer 不重写 enter/leaveArbitration:它的职责是为下层 reclaim 提供一个安全的时间窗(task 全暂停),而不是响应 driver 的进出仲裁。enterArbitration 是 driver 视角的回调,与 TaskReclaimer 的职责完全无关——所以它继承基类的 no-op。
abort() 实现
1 | void Task::MemoryReclaimer::abort(MemoryPool* pool, const std::exception_ptr& error) { |
abort 不需要 pause(直接强杀);只需要标记错误 + 触发 task 退出 + 让子 pool 自己清理。
reclaimableBytes() 实现
1 | uint64_t Task::MemoryReclaimer::reclaimableBytes( |
canReclaim() 检查 task 是否正在运行(非 terminating / terminated 状态)。已经在退出过程中的 task 不应被选为 victim。
ParallelMemoryReclaimer:fork-join 调度器
ParallelMemoryReclaimer 挂在 Node Pool 上,实现的是 “把 children 的 reclaim 任务 fork 到 spillExecutor,自己 BLOCK 等结果“:
1 | // 简化伪码 |
详细 fork-join 安全前提与示意见 §8.9 “并行 spill 安全的两个必要前提”,此处不重复。
Operator::MemoryReclaimer:连接 pool 树和算子的桥梁
这是最关键的一层,把”内存树遍历”对接到”算子 spill 逻辑”:
1 | // Operator.h (简化) |
注意:Operator::MemoryReclaimer 本身不实现具体 spill,它调用 op_->reclaim(target, stats) 这个 Operator 基类虚函数,由具体算子重写。
算子级 spill 实现(重写 Operator::reclaim())
不同算子各自实现 spill 逻辑,统一通过 Operator::reclaim() 虚函数分派,不需要为每个算子写独立的 MemoryReclaimer 子类:
1 | // HashBuild.cpp |
整条 reclaim 调用链(带 reclaimer 类型)
1 | rcl 线程 |
每层 Reclaimer 的职责一目了然:TaskReclaimer 做 pause/resume,ParallelReclaimer 做并发,Operator::MemoryReclaimer 转接到算子,具体 spill 由算子虚函数实现。
生命周期
| 阶段 | 行为 |
|---|---|
| 创建 | pool 创建时通过 addAggregateChild(name, reclaimer) 传入;或后通过 setReclaimer() 单独设置(Operator 走这条路) |
| 运行 | pool->reclaimer() 返回 raw 指针;reclaim 调用都从这里发起 |
| 销毁 | pool 析构时 reclaimer_ 自动析构(unique_ptr) |
| 保活 | reclaimer 持的 Task* / Operator* 是裸指针;安全靠 Task::childPools_ 持有所有 operator pool 的 shared_ptr 保证(参见 §6.4 析构顺序) |
8.9 Reclaim 两步设计:reclaim + shrink
这是容易误解的核心设计:
1 | reclaim() → 释放内存(usedBytes 减少),capacity 不变 |
ArbitrationParticipant::reclaim() 完整流程(ArbitrationParticipant.cpp:277):
1 | uint64_t ArbitrationParticipant::reclaim( |
capacity shrink 委托到 root pool(MemoryPool.cpp:1213):
1 | uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes) { |
shrink 保留最小空闲量:max(minFreeCapacityBytes, capacity × minFreeCapacityPct),不会把所有空闲都还走。
Spill 执行模型:reclaim 线程操刀,driver 旁观
Spill 不是由 driver 自己做的,而是由 reclaim 线程在 driver 全部 PAUSED 后独占执行:
1 | reclaim 线程主流程: |
为什么不让 driver 自己 spill? Driver 可能正在算子的中间状态(写一半的 RowContainer / HashTable),如果 driver 自己 spill 会读到不一致的结构。Pause 所有 driver 后由 reclaim 线程独占访问,保证数据结构处于稳定状态。
“挑选谁 spill” 的真实粒度:算子,不是 driver
走的是 MemoryReclaimer 调用链(参见 §8.8),按 op_pool.reclaimableBytes() 估算量优先选可回收量最大的算子:
1 | TaskReclaimer::reclaim(task_pool, target) |
多 driver 同 plan node 的算子可通过 spillExecutor 并行 spill——这正是 ParallelMemoryReclaimer 的用途。
并行 spill 安全的两个必要前提
ParallelMemoryReclaimer 把多 driver 的 spill fork 到 spillExecutor 并发执行,两个前提缺一不可:
| 前提 | 缺失会怎样 |
|---|---|
| ① Task 全部 driver PAUSED | driver 正在写数据结构,spillExecutor 线程读到中间状态 → 数据损坏 / segfault |
| ② Per-driver op pool 数据独立 | 即使 pause 了,多线程访问同一份共享数据仍需加锁,并行收益被锁开销吃掉 |
前提 ① 由 TaskReclaimer 保证:在调用 ParallelMemoryReclaimer 之前 task->requestPause() 并等到 numThreads_=0。
前提 ② 由 plan node 设计保证:同一 plan node 下,每个 driver 有自己独立的 op pool 和数据结构(RowContainer / HashTable),互不共享。
1 | // ParallelMemoryReclaimer::reclaim (简化) |
经典 fork-join:rcl worker 自己不 spill,只负责调度 + BLOCK 等。
Driver 内多 op 是串行 spill 的
由于 reclaim chain 在 task 层 “每轮挑 reclaimable 最大的一个 node 下钻”,同一 driver 内属于不同 node 的 op 自然落到不同轮次执行,永远不会同时 spill:
1 | Driver 0 Driver 1 Driver 2 Driver 3 |
为什么 driver 内多 op 不并行:
- reclaim chain 设计简单:每层只挑 max-reclaimable 一个子递归向下
- target 通常一个 node 就满足(没必要展开成全树扫描)
- 同 driver 的多 op 并行会争抢同 driver 的 CPU 缓存、磁盘带宽
- 大多数 pipeline 一个 driver 只有 1 个 spillable op,优化罕见 case 不值得
完整并行结构总览
1 | Layer 1 (Arb) : 1 个 victim 选择 —— 串行决策 |
Resume 机制:task->resume() 唤醒所有 paused drivers
Reclaim 完成后通过 ContinuePromise 唤醒所有 paused driver:
1 | // Task.cpp: resume |
Driver 侧的恢复逻辑(在 runInternal 主循环中):
1 | while (...) { |
算子的 spill 状态机:对 driver 透明
Driver 完全不感知刚才发生了 spill。算子内部维护 spiller_ 成员记录 spill 状态,getOutput() 自动切换”纯内存”和”merge-spill”模式:
| 算子 | spill 前状态 | spill 后状态 | resume 后输出阶段 |
|---|---|---|---|
| OrderBy | RowContainer 内存累积行 | RowContainer 清空 + 磁盘上 sorted run | getOutput() merge-sort:内存剩余 + 磁盘 run 合并 |
| HashJoin (build) | HashTable 内存构建 | HashTable 释放 + 磁盘 partition 文件 | Probe 阶段按 hash 分区,逐 partition 读回 build 再 join |
| HashAggregation | 内存 hash table | hash table 释放 + 磁盘 partitioned spill | getOutput() merge:磁盘 partition 读回再 agg |
| Window | RowContainer 累积分区 | 磁盘上排好序的 partition runs | getOutput() merge-read partition runs |
Driver 只是按平常方式调 getOutput(),算子内部自己处理 spill 数据的读回——这是 spill 状态机透明设计的核心。
8.10 Abort 流程
**ArbitrationParticipant.cpp:343**:
1 | uint64_t ArbitrationParticipant::abortLocked(const std::exception_ptr& error) { |
pool abort 向根传播,在 root 调用 reclaimer()->abort(),触发 TaskReclaimer 停止 Task 执行、释放所有内存。此后对该 root pool 任何 reservation 操作都抛异常。
8.11 线程安全与 Race 防范
整个仲裁/回收流程涉及多线程协作(driver / arb / reclaim / spill executor),由 5 层同步原语 + 3 类状态机不变量 + 上下文跨线程传递 三方面共同保证无 race。
五层同步原语
| 层 | 原语 | 类型 | 保护内容 |
|---|---|---|---|
| 进程级 | SharedArbitrator::stateMutex_ |
std::mutex |
freeNonReservedCapacity_、freeReservedCapacity_、globalArbitrationWaiters_、shutdown_ |
| Participant 注册表 | SharedArbitrator::participantLock_ |
folly::SharedMutex |
participants_ map 的读写 |
| Participant 仲裁 | ArbitrationParticipant::reclaimMutex_ |
std::timed_mutex (via ArbitrationTimedLock) |
单个 root pool 上 reclaim/shrink/abort 串行化;local 阶段有超时,global 无超时 |
| Participant 队列 | ArbitrationParticipant::runningOpLock_ |
std::mutex |
runningOp_、waitOps_ 队列;同 participant 仲裁严格串行 |
| Pool 层 | MemoryPoolImpl::mutex_ |
folly::SharedMutex |
reservationBytes_、capacity_、children_;按 leaf→root 顺序加锁 |
| Task 级 | Task::mutex_ + numThreads_ |
mutex + atomic<int32_t> |
pauseRequested_、pausePromises_、drivers_、driver 调度 |
关键 Race 场景与保护
Race 1:Driver 跑到一半被 spill → 读到不一致的数据结构
1 | D8 正在 SortBuffer::addInput 中改写 RowContainer |
保护:双闸门
- Pause 闸门:reclaim 线程在
pool->reclaim()前必须task->requestPause()并等numThreads_==0 - ReclaimableSection 闸门:算子用
nonReclaimableSection_标记临界区
1 | // SortBuffer.cpp:204 |
nonReclaimableSection_ 是 tsan_atomic<bool>,保证 driver 主线程的开/关窗对 reclaim 线程立即可见。
Race 2:Requestor 与 victim 是同一 task → 自死锁
1 | D0 (Q1/T1) growCapacity → global 选中 Q1/T1 为 victim |
保护:requestor 进入仲裁时 enterSuspended() 把自己从 numThreads_ 中减掉:
1 | // Task.cpp:3424 |
Race 3:同一 participant 并发仲裁 → 重复回收/记账错乱
保护:runningOpLock_ 队列严格串行:
1 | // ArbitrationParticipant.cpp:217 |
Race 4:Pool 树 reservation 跨层并发更新
1 | D0 在 op_pool.reserve(1024MB) → 向 root 传播 |
保护:incrementReservationThreadSafe 自下而上递归加锁,每层一把锁,顺序固定:
1 | // MemoryPool.cpp:952 |
顺序固定为 leaf → root,永远不会出现循环依赖。
Race 5:Reclaim 期间 pool 被并发 abort
保护:reclaim 与 abort 共享 reclaimMutex_,互斥:
1 | // ArbitrationParticipant.cpp:277 / 343 |
Race 6:Pool 在 reclaim 期间被 query 端析构 → 悬空指针
保护:ScopedArbitrationParticipant 升级 weak_ptr 为 shared_ptr,保活全程:
1 | // ArbitrationParticipant.cpp:118 / 371 |
这也呼应 §6.2 提到的 poolId 单调递增——后台仲裁可能延迟 pool 析构,重名时序问题靠 poolId 解决。
Race 7:Thread-local 仲裁上下文跨线程丢失 → 嵌套仲裁死锁
1 | 后台 arb 线程持有 kGlobal context |
保护:createAsyncMemoryReclaimTask 显式打包传递:
1 | // MemoryArbitrator.cpp:528 |
工作线程内 underMemoryArbitration() 返回 true,maybeIncrementReservation 跳过 capacity 检查,避免嵌套。
Race 8:Free capacity 计数错乱
1 | A 线程:freeNonReservedCapacity_ -= 100MB (划拨给 Q1) |
保护:所有读写 free pool 计数的代码路径必须先持 stateMutex_,无例外。
状态机不变量
三个核心状态机消除”不可能状态”:
① Task::numThreads_ 与 driver 状态
1 | 不变量: (RUNNING drivers 数) = numThreads_ |
enterSuspended / leaveSuspended 在 Task::mutex_ 内同步更新两计数。
② ArbitrationOperation 状态机
1 | kInit ──startArbitration──> kRunning ──finishArbitration──> kFinished |
所有 transition 在 runningOpLock_ 内完成,一个 op 任意时刻只在一个状态。
③ ArbitrationParticipant::aborted_ 单调性
1 | aborted_ : false → true,不可逆 |
单调性消除了”abort 完一半又取消”的复杂状态。
死锁预防
| 死锁来源 | 防范手段 |
|---|---|
| 锁循环依赖 | 锁顺序固定:pool leaf→root;reclaimMutex_ 内不再获取上层锁;ABBA 避免 |
| Self-pause 死锁 | requestor 进入仲裁前 enterSuspended 把自己从 numThreads_ 减掉 |
| Local 等不到 reclaim | ArbitrationTimedLock 在 local 阶段带超时(默认 ~100ms),失败回退到下一步 |
| 异步 spill 嵌套仲裁 | thread-local context 跨线程传递,underMemoryArbitration() 跳过嵌套触发 |
| Reclaim 与析构竞争 | ScopedArbitrationParticipant shared_ptr 保活 |
future.wait() 永久阻塞 |
global 仲裁总超时 max-memory-arbitration-time(默认 5min),超时强制 abort |
可见性与原子性
- **
nonReclaimableSection_**:tsan_atomic<bool>,配合 release-acquire 内存序,driver 开/关窗对 reclaim 线程立即可见 - **
numThreads_/numSuspensions_**:std::atomic<int32_t>+ Task mutex 双重保护(atomic 用于 fast-path 读,mutex 用于复合修改) - **
pool->capacity_/reservationBytes_**:pool mutex 保护,checkedGrow/shrink在锁内作为原子复合操作 - **
aborted_/pauseRequested_**:bool+ 持锁修改 + 持锁读取(避开复杂内存序)
整体收敛性
| 保证 | 机制 |
|---|---|
| 数据一致性(spill 读到完整状态) | task pause + nonReclaimableSection_ 双闸门 |
| 互斥性(一个 pool 同时只一个 reclaim) | reclaimMutex_ 单调持锁 |
| 串行性(同 participant 仲裁队列) | runningOp_ + waitOps_ 队列 |
| 死锁自由 | 锁层次 + timeout + 自暂停 + shared_ptr 保活 |
| 跨线程上下文 | thread-local + ScopedMemoryArbitrationContext 打包传递 |
| 状态单调性 | aborted_、ArbitrationOperation 状态机不可逆 |
| 可见性 | atomic + mutex 组合,保证读到最新值 |
核心思路:用 task pause 提供大粒度时间窗(所有 driver 都不在跑),用 reclaimMutex_ 提供 participant 级互斥,用 stateMutex_ 提供 arbitrator 自身一致性,用 thread-local context 解决跨线程语义。每一层各管一摊,互不重叠,避免任何一处需要同时持有多把锁的复杂场景。
8.12 关键配置参数
| 参数 | 默认值 | 含义 |
|---|---|---|
memory-pool-initial-capacity |
256MB | 新 pool 初始容量 |
memory-pool-reserved-capacity |
0B | 每 pool 保护容量下限 |
max-memory-arbitration-time |
5min | 仲裁总超时 |
memory-pool-min-free-capacity |
128MB | shrink 后保留的最小空闲字节 |
memory-pool-min-reclaim-bytes |
128MB | 低于此量跳过(不值得 spill) |
fast-exponential-growth-capacity-limit |
512MB | 翻倍增长的上限 |
slow-capacity-grow-pct |
0.25 | 超过上限后的增长比例 |
global-arbitration-enabled |
true | 是否启用后台仲裁线程 |
global-arbitration-memory-reclaim-pct |
10% | 每轮至少回收的容量比例 |
global-arbitration-abort-time-ratio |
0.5 | 超过此比例超时后切换到 abort |
8.13 完整仲裁调用链
1 | Operator allocate(size) |
9. 设计哲学:Pool / Arbitrator / Reclaimer 三位一体
前面八章讲清楚了”怎么做”,这一章退一步看”为什么这样设计”。Velox 内存系统的优雅之处不在某个具体机制,而在于三个抽象的职责切分与协同方式:Pool 提供结构、Arbitrator 提供决策、Reclaimer 提供行为,三者正交又紧密咬合。
9.1 Pool 树 ↔ Query 树:同构但不对称
结构同构
Pool 的四层结构和 Query 执行模型严格 1:1 对应:
1 | Query 执行模型 Memory Pool 树 |
这种同构带来一个直接好处:任何执行层面的对象都能找到对应的内存归属。Driver 调度时拿到 Operator,就能立刻定位到 Operator Pool;Task 析构时,整棵子树跟着一起释放。不需要额外的 ID 映射,不需要 side-table。
但功能严重不对称
结构对称不代表功能对称——这正是设计的精彩处。沿着树的不同位置,职责其实分得很开:
| 层级 | 唯一职责 | 不做什么 |
|---|---|---|
| Root Pool | 持有 capacity 配额,单点 capacity 检查 | 不做分配 |
| Task Pool | 持有 Reclaimer,提供 pause 时间窗 | 不做检查、不做分配 |
| Node Pool | 跨 Driver 的逻辑聚合点,挂 ParallelMemoryReclaimer | 不做检查、不做分配、可能不存在(懒建) |
| Operator Pool | 唯一发起 allocate/free 的层;唯一桥接 Reclaimer.enterArbitration 的层 | 不持有 capacity、不做检查 |
**职责”两端化”**:分配请求从最底层(Operator)发起,配额检查在最顶层(Root)做,中间两层只做记账(reservationBytes_ 透传)。这是一个标准的”末端发起、顶端裁决、中间透传”模式。
为什么这样切?因为这三件事的频次和粒度完全不同:
- 分配 → 高频、细粒度 → 必须在算子内(贴近物理代码路径)
- 检查 → 低频(粗化到 1MB reservation 后)、全局视角 → 必须在 Root(一个 query 一处)
- 聚合 → 完全可被动 → 中间层只需透传
如果把检查也放在每一层做,就要每层加锁、每层维护配额;如果把分配也放在中间层做,算子代码就要绕路。把这两件事都推到极端、中间层完全惰性,整条路径反而最短。
懒构造与共享:树是”逻辑形状”而非物理副本
Node Pool 不是 Task 启动时统一创建,而是 Driver 跑到对应算子时才 getOrAddNodePool(planNodeId)。这意味着:
- 从未执行的 plan node 没有 pool:plan 树常常比执行树大(有些 node 被裁剪),懒建避免无效开销
- 同 planNodeId 多 Driver 共享同一个 Node Pool:跨 Driver 的内存压力天然在 node 粒度聚合可见
- HashJoin Build 和 Probe 共享 join node pool:跨 pipeline 的资源关联也能表达
这让 Pool 树不是 plan 树的死板复刻,而是根据实际执行形状裁剪出的最小骨架。结构同构是为了归属清晰,懒建和共享是为了不浪费。
9.2 Arbitrator:三层 Quota 与”决策/执行分离”
三层 quota 的层次美学
1 | 进程级 arbitrator.capacity_ ← 整个进程内存预算(系统稳定性) |
三层各自的语义目标不同:
- 进程级是物理性的:防止整个进程 OOM 把节点拖垮;启动时一次设定,运行期不动。
- 查询级是契约性的:用户/调度器在提交查询时和系统约定的上限,是公平调度的依据。
- 当前级是动态性的:随负载在 [初值, maxCapacity_] 区间游走,仲裁器在此层做所有动态决策。
三者形成嵌套:当前级 ≤ 查询级 ≤ 进程级,永远满足这个不变量。任意一层都不会单独跨越,所以单 query 不可能拖垮系统,任意 query 不可能超额。
这种分层的优雅在于:把”硬约束”(系统不能挂)、”软约束”(query 不能超)、”调度变量”(capacity 流动)分到三个抽象里,每一层只解决一个问题,决策时只看相邻一层。Arbitrator 几乎从不直接看进程级,只在 query 级判公平、在当前级做流动。
Local vs Global 的分层兜底
仲裁路径不是单一的——它是三条正交路径在效率递减、代价递增的轴上排开:
1 | 路径 1:normal(不仲裁) capacity 足够 → 直接 reserve,无任何协调 |
绝大多数分配走路径 1(无开销);负载偏紧时走路径 2(单 driver 短暂 SUSPENDED);只有真的需要回收别人的物理内存时才走路径 3(跨查询协调)。
设计上的优雅:最坏情况存在并且能 work,但绝大多数情况不会触发。这是典型的”快路径优化、慢路径兜底”。开发者可以放心地把 capacity 设得激进——撞墙了系统能优雅降级,而不是直接 OOM。
Arb 和 Rcl 两个线程的彻底分离
全局仲裁拆出两个 background 线程,是这套设计里”职责切分”最教科书式的范例:
| 维度 | Arb 线程(1 个) | Rcl 线程池(N 个) |
|---|---|---|
| 任务 | 决策 | 执行 |
| 并发模型 | 串行(避免决策竞争) | 并行(per-participant) |
| 视野 | 全局(看所有 participant) | 局部(只看自己的 participant) |
| 代价 | 轻量(排序 + 选择) | 重量(pause + spill + I/O) |
| 失败影响 | 重排队即可 | 单 participant 失败不影响别人 |
为什么必须分开?因为如果 Arb 自己执行 spill:
- 决策被 I/O 拖慢,新进队的 requestor 全部堵塞
- 多个 victim 必须串行 spill(同一线程跑不动)
- 决策日志和 I/O 日志混杂,可观测性极差
而拆开之后:决策永远轻量(毫秒级)、执行可以无限拉宽(多 victim 并行)、两者通过队列解耦,背压机制自然出现(Rcl 慢 → Arb 队列堆积 → 新 requestor 排队)。
Victim 粒度 = Query
仲裁选择 victim 时按 participant(= query = root pool)排序,而不是 task、不是 operator、不是 driver。这是因为:
- capacity 只在 root pool 一级:选 task 没意义,task 没自己的额度
- 公平性是 query 间的:用户付费、调度配额都在 query 级
- abort 失败兜底也在 query 级:一个 query spill 失败可以 abort 整个 query
如果选 task:同 query 多 task 怎么算?task 内部多 op 同时溢出怎么协调?容易陷入碎片化。选 query 一刀切,反正一个 query 的所有 task 用的都是同一份 capacity,spill 谁都行——后续策略层(同 query 内多 task 按 reclaimable 量贪心排序)才决定具体顺序。
9.3 Reclaimer:与 Pool 同构、与算子解耦
结构同构、功能差异化
Reclaimer 的类层次几乎是 Pool 树的镜像:
1 | Pool 层次 ←→ Reclaimer 层次 独有职责 |
注意 Root Pool 挂的是基类实例(MemoryReclaimer::create()),不是 nullptr。基类的 reclaim() 实现了”贪心下钻 children”的默认逻辑——这正好就是 Root 需要的行为(自己不持数据、把请求路由到 Task),所以无需特化。
和 Pool 一样,结构同构 ≠ 功能同构:
- Root 层 Reclaimer:用基类。基类
reclaim()默认实现就是按 reclaimableBytes 降序遍历 children 贪心调用——刚好等于 Root 需要的”扩散到 task”语义。不需要特化。 - Task 层 Reclaimer:只做一件事——
pause和resume。它向上提供”暂停 task 让数据稳定”的时间窗,向下调用基类的贪心 reclaim 找 children 中 reclaimable 最大的下钻。pause 是它独有的能力。 - **Node 层 Reclaimer (Parallel)**:只做并行 fork-join 调度。把同 node 下多个 driver 的 spill 任务 fork 到
spillExecutor,BLOCK 等所有完成。自己不写一行 spill 代码。 - Operator 层 Reclaimer:只做转接(trampoline)。
reclaim()→op->reclaim()→ 落到 HashBuild/OrderBy/HashAggregation 各自的 spill 实现。
每一层都是”一件事原则”。没有一个 Reclaimer 同时做两件事。
基类 no-op + 子类按需重写:最小知识原则
MemoryReclaimer 基类的所有虚函数都是 no-op 默认实现:
1 | class MemoryReclaimer { |
只有需要做事的层才重写:
enterArbitration只在Operator::MemoryReclaimer重写(桥接到 Task::enterSuspended)pause/resume只在Task::MemoryReclaimer重写- 并行调度只在
ParallelMemoryReclaimer重写
Root/Task/Node 层 Pool 默认挂基类,零开销地融入仲裁框架。每个子类只知道自己的那一件事——Operator 不知道 pause、Task 不知道 spill 语法、Node 不知道算子类型。
这是 OO 里”虚函数 + 默认实现”用得最干净的一处:继承层次和职责分布完全对齐。
Pool 拥有 Reclaimer,Reclaimer 反指算子
最巧妙的关系是:
1 | Pool ───持有──→ Reclaimer |
- Pool 是结构:提供层次、提供生命周期。
- Reclaimer 是行为:被仲裁器调用,但行为不在自己身上。
- Task/Operator 是逻辑:知道怎么 pause 自己、怎么 spill 自己的数据。
为什么不让 Pool 自己实现 reclaim?因为 Pool 是内存抽象,不应该知道”算子怎么 spill”。HashBuild 的 spill 涉及 hash table、Operator 的 spill 涉及 sort 缓冲,这些是算子领域知识,不属于 memory 模块。
为什么不让 Operator 直接被仲裁器调用?因为仲裁器需要统一的接口(reclaim(targetBytes)),而 Operator 接口五花八门。Reclaimer 作为适配层把”统一接口”翻译成”具体算子调用”。
这是经典的”有结构无行为、有行为无结构、有逻辑无接口“三体分离,每一方都不耦合到其他两方的细节。
weak_ptr + try/catch RAII:生命周期与异常的双闸门
Reclaimer 设计里两个细节体现了对 corner case 的细致考虑:
weak_ptr 打破循环:
1 | Task ──shared_ptr──→ Pool ──unique_ptr──→ Reclaimer ──weak_ptr──→ Task |
reclaim 时 weak_ptr.lock() 升级为 shared_ptr,临时保活;如果 Task 已经析构,lock 返回 null,直接安全返回 0。reclaim 调用本身永远不会触发 task 析构链。
try/catch 三明治:
1 | task->requestPause(); // 开窗 |
不管 spill 成功、失败、抛异常,pause 状态一定会被 resume。这避免了”spill 失败导致 task 永久卡死”的灾难性后果。
9.4 三者协同:消息序列的优雅
把三者放在一次 global arbitration 的时间线上看:
1 | Operator Pool Reclaimer Arbitrator |
每个箭头都是单向单职责:
- Pool 向 Arbitrator 说”我要 X 字节”
- Arbitrator 向 Reclaimer 说”你提供 X 字节”
- Reclaimer 向 Task/Operator 说”暂停并 spill”
- 反向回包同样清晰
没有一个对象需要知道全链路,每个对象只和相邻层对话。这让任何一层都可以独立替换或扩展:换一个 arbitrator 策略不影响 reclaimer、加一个新算子的 spill 不影响 pool 树。
9.5 设计的”轴心”
如果非要用一句话总结这套设计的精神:
结构对齐执行模型,职责沿生命周期分布,行为通过适配层挂接。
- “结构对齐执行模型” → Pool 树映射 Query 树
- “职责沿生命周期分布” → Capacity 在 Root、Reservation 在 Op、Pause 在 Task、Allocate 在 Operator
- “行为通过适配层挂接” → Reclaimer 适配仲裁器接口与算子 spill 逻辑
这套抽象的代价是初次理解需要爬几层;但一旦理解,新增功能(一种新的 reclaim 策略、一种新的 capacity 分配规则、一种新的算子 spill 方式)几乎都是正交插入、零穿越——这是好抽象的最终标准。
10. 值得学习的工程实践:多线程与代码品味
前面九章解释了”做什么”和”为什么这样做”,这一章总结这套代码里值得借鉴到自己工程中的具体手法。分两类:多线程编程的具体技巧,和代码品味层面的取舍。
10.1 多线程编程:教科书级的范式集合
(1) 锁分层而非锁总线:五种锁各管一摊
整个内存仲裁子系统有五把主要的锁,每一把都有非常明确的辖区:
| 锁 | 保护对象 | 持锁时长 | 谁来持 |
|---|---|---|---|
stateMutex_ (Arbitrator) |
全局 freeCapacity / arbitration 队列 | 极短(毫秒级) | Arb 线程 |
participantLock_ |
单 participant 的 op 队列与状态 | 短 | Arb / Rcl 线程 |
reclaimMutex_ |
单 participant 上的 reclaim 互斥 | 长(整个 spill 周期) | Rcl 线程 |
runningOpLock_ |
单 op 状态切换 | 极短 | requestor driver |
Pool mutex_ |
单 pool 的 reservation 字段 | 极短 | 所有 driver |
关键纪律:从不同时持两把锁。需要跨锁协调时,用 future / atomic / 单向状态机替代锁组合。这避免了死锁分析需求,也让每把锁的争用都极低。
实务启示:当一个模块出现”需要同时持两把锁”时,先不要急着定义”锁顺序”,而是先问能不能让这两把锁的辖区互不相交。
(2) 量化记账:用空间换并发
reservation 不是按实际分配的字节走(那样每次 allocate 都得加锁汇报到根),而是以 1MB 为单位预留:
1 | allocate 1KB → 用本地 freeReservationBytes_(无锁) |
代价是单 driver 多用最多 1MB(永远收敛),收益是 1024 次分配只加 1 次跨层锁。
这是一个普遍可用的模式:高频细粒度操作 → 批量化成低频粗粒度操作 → 锁争用线性下降。文件系统的 dirty page 批量 flush、网络栈的 nagle、内存分配器的 size class 都是同一思路。
(3) 单调状态机消除中间态 race
aborted_ 一旦置 true 永不复位、ArbitrationOperation 的 state 只能朝固定方向迁移、numThreads_ 只能自增/自减,**任何状态机都不允许”撤回”**。
为什么这很重要?因为有”回退”就有”半途的中间态”,多线程下读到中间态就是经典 race。Velox 的方法是:不允许回退。失败了不是 rollback,而是创建一个新的 Operation 重试。
实务启示:在多线程类里加新字段时,先想清楚”这个字段会不会双向变?”如果是单向的,多线程语义会简单 10 倍。
(4) RAII Guard 替代显式 enter/leave
整个仲裁过程不靠裸 enter()/leave() 调用,而是 RAII guard 自动配对:
1 | { |
哪怕中间抛异常、提早 return、深嵌套 if/else,leave 永远会被调用。整个仲裁链路上的 EnterArbitration、ScopedAllocation、ScopedReservation 都是这个模式。
对应的代码品味:永远不要写裸 enter/leave、acquire/release 对——总能用 RAII 包一层。
(5) weak_ptr + lock() 打破生命周期循环
TaskReclaimer 持 weak_ptr<Task> 而非 shared_ptr<Task>:
1 | auto task = task_.lock(); |
避免了 Task → Pool → Reclaimer → Task 的循环引用,又给了 reclaim 线程安全的失败模式:lock 失败就放弃,不会拖延 Task 析构、也不会触发悬挂指针。
这是 weak_ptr 用得最干净的一处:所有权完全归 Task,Reclaimer 只是观察者,观察失败立即让出。
(6) shared_ptr 临时升级保活
ScopedArbitrationParticipant 进入临界区时把 weak_ptr<ArbitrationParticipant> lock 成 shared_ptr 拿在手上:
1 | ScopedArbitrationParticipant::ScopedArbitrationParticipant(weak_ptr<...> weak) { |
reclaim 过程中即使外部把 Participant 注销,这个 reference 保证它活到 reclaim 完成。生命周期的”借出”和”归还”完全自动化。
(7) 决策线程与执行线程分离:天然背压
Arb 一个线程做决策、Rcl 多个 worker 做执行,两者用 queue 解耦:
1 | requestor → [Arb queue] → Arb 线程:从队头取 → 决策 → push 到 [Rcl queue] |
如果 Rcl 堆积(spill 太慢),Arb 自然检测到 queue 长度,新 requestor 就要等更久——背压无需任何特殊代码就出现了。
实务启示:异步任务流水线天然带背压,比”显式监控 + 主动降级”简单且可靠。
(8) atomic 替代 mutex 的边界判断
numThreads_ 是 std::atomic<int>,requestor 进入仲裁前 --numThreads_、离开后 ++numThreads_。没有锁,因为这个字段不需要和其他字段一起 atomic 更新。
但 state + reclaimableBytes 必须一起更新时,仍然用 mutex——不强行 lock-free。判断标准很清晰:
单个字段独立读写 → atomic
多字段必须事务性更新 → mutex
不少代码会过度追求 lock-free 把所有字段都拆成 atomic,结果 ABA 和不一致状态频出。Velox 在这点上很克制。
(9) thread_local 防重入
Arb 线程自己跑 reclaim 时如果 Reclaimer 内部也调 allocate(),会不会再次触发仲裁形成无限嵌套?
1 | thread_local bool kInArbitration = false; |
一行 thread_local 标记就解决了重入。不是用锁、不是用计数器,是用线程身份。
(10) “宽 future + 窄 mutex”协调多线程
requestor 在等仲裁结果时挂在 future 上,没有自旋、没有持锁:
1 | ContinueFuture future; |
mutex 只用来保护 future 列表的 push,等待动作发生在锁外。这避免了”持锁等待”这个最常见的死锁源。
(11) “读后验证”模式
仲裁完成 capacity grow 后,requestor 回到原始 allocate 路径**重新调一次 maybeIncrementReservation()**,而不是相信仲裁器已经准备好。原因:并发 driver 可能在仲裁期间又消耗了一些。重新验证总是廉价且安全。
实务启示:跨线程的”对方答应给我”绝不直接相信,任何 promise 都要 verify。
10.2 代码品味:可借鉴的设计取舍
(1) 命名精确传递语义
reclaimvsshrink:前者是释放内存、后者是归还 capacity。两步分开、动词不同,读到shrink就知道是账面操作。enterArbitrationvsenterSuspended:前者是 driver 主动进入仲裁、后者是泛用的”暂停计数”。命名暗示了语义层级。numThreads_而非activeCount_:明确说”线程数”而不是某种抽象计数。freeNonReservedCapacity_长但极其精确:哪种 free、是否被 reserved、是 capacity 还是 bytes 一目了然。
坏命名会让多线程问题加倍难调试,因为读者要先猜语义;好命名让代码自解释。
(2) 错误信息把运行时数据放尾部
1 | // ❌ |
前者把可变值嵌在句子中间,日志聚合时无法对相同错误 grep;后者错误描述固定、数据在尾部,便于 alerting 系统按”静态前缀”分组统计。这是被 CLAUDE.md 显式要求的纪律。
(3) 不取巧的两阶段提交
reservation 是”先记账后真分配”,但记账阶段就可能失败(capacity 不够)。失败时整个 try block 抛异常、RAII 自动回滚账。没有 manual cleanup、没有 if-else 树。
1 | auto rollback = makeScopedRollback(); |
这是 C++ RAII 用于事务性操作的标准范式。
(4) 接口最小化、默认实现 no-op
MemoryReclaimer 基类的所有虚函数都有 no-op 默认实现,子类按需重写。比起强制每个子类实现所有接口,这种方式:
- 减少了 boilerplate(Root/Task/Node 都不用写空函数)
- 失败模式安全(没重写也不会崩,只是不做事)
- 易于扩展(新增虚函数不破坏现有子类)
代价是”基类接口语义可能不够清晰”——但配合好的文档和命名可以化解。
(5) 数据/行为/逻辑三体分离
1 | Pool ─ 数据:层次结构与生命周期 |
Pool 不知道怎么 spill、Reclaimer 不写 spill 代码、Operator 不知道仲裁器存在。每方只暴露其他方需要的最小接口。这是”单一职责”在大型系统的标准落地形态。
(6) 路径正交而非分支嵌套
仲裁有三条路径(normal / local / global),但代码不是 if (case1) ... else if (case2) ... else ... 的大分支,而是沿同一条主路径自然降级:
1 | if (maybeReserveQuick()) return; // path 1 |
每段路径独立完整、提前 return,主线代码不嵌套。比起一个 switch + 多个 helper 容易读得多。
(7) 量化数字避免魔法常量
1 | constexpr uint64_t kInitialMemoryPoolCapacity = 128 * MB; |
没有裸 128 * 1024 * 1024 散落代码里。每个魔数提到常量上有名字,改起来一改全改、读起来名字即文档。
(8) 测试钩子嵌在生产代码但零开销
1 | TestValue::adjust("facebook::velox::Pool::reserve", &this); |
在 release 编译时是空宏,debug + 测试桩注入时变成 hook 点。生产代码无需为测试做特殊设计、测试也不需要 friend class 破坏封装——这正符合 CLAUDE.md “禁止 friend / FRIEND_TEST” 的要求。
(9) 没有 util/helper/common 类
整个内存子系统找不到 MemoryUtil、PoolHelper、ArbitrationCommon 这种垃圾桶类。功能都收敛在有明确语义的类里:ArbitrationParticipant、MemoryReclaimer、ScopedArbitrationParticipant。
这是命名上的纪律,但更深层是对模块边界的尊重——一旦容忍 util,所有不知放哪的代码都会堆进来。
(10) 注释解释”为什么”而非”是什么”
1 | // Reserve in 1MB chunks to amortize lock acquisition cost. |
不是 // 8 MB(同义反复),而是讲清楚为什么是 8MB——量化思维与权衡。读者读完不仅知道值,还知道怎么改。
10.3 一句话总结
好的多线程代码不是少错,而是让错没法发生。
Velox 在 race 上的纪律性体现在:消除可能性(单调状态机)、限制传染面(锁分层)、自动配对(RAII guard)、安全降级(weak_ptr lock 失败)、自然背压(队列解耦)。这些不是”加注意”能做到的,而是设计上就把出错的可能性 designed away。
代码品味上则是反复体现的”克制”——克制使用 lock-free、克制公开接口、克制中间层职责、克制魔法常量、克制 friend 和 util。抽象力来自删减而非添加。
把这两条内化到自己的工程里,往往比记住任何具体技巧都有用。
十一、核心设计总结
| 关注点 | 设计决策 |
|---|---|
| 内存限额粒度 | 以 query 为单位(Root Pool),单 operator 不设独立上限 |
| 分配计量粒度 | 以 operator 为单位(Leaf Pool),可精确定位内存热点 |
| 中间层(Task/Node) | 纯聚合统计 + Reclaimer 挂载点,不参与实际分配 |
| Node Pool 懒建 | 避免 plan node 多而实际不执行时的无效开销 |
| 同 PlanNodeId 多 Driver | 共享 Node Pool,跨 Driver 的内存压力在节点粒度可见 |
| HashJoin 特殊处理 | Build/Probe 侧共享 join node pool,跨 pipeline 聚合 |
| Reservation 量化 | 减少跨层加锁频次,以少量内存膨胀换取更低竞争 |
| Capacity 检查位置 | 仅在 Root Pool 检查,子孙节点无需持锁向上询问 |
| 仲裁触发点 | Root Pool maybeIncrementReservation() 返回 false 时 |
| QueryCtx cache 存 weak_ptr | cache 不延长 QueryCtx/Root Pool 生命周期,生命周期由 Task 持有的 shared_ptr 决定 |
| Operator Pool 晚于 Task 析构 | childPools_ 统一持有所有子 pool,允许跨 Driver 共享 vector 而不拷贝 |
| pool 名追加单调 poolId | 仲裁过程可能延迟 Root Pool 析构,防止同 queryId 重建时名字冲突 |
| 三层 quota 嵌套约束 | 进程级 arbitrator.capacity_ → query 级 maxCapacity_ → 当前 capacity_,三者目标分别为稳定性 / 公平性 / 性能 |
| arbitrator capacity 启动时固定 | MemoryManager 初始化时从 system-memory-gb 读入,运行期不变;纯逻辑配额,不预占物理内存 |
| 单一 capacity 检查点 | 整条 pool 树只有 Root Pool 一处做 capacity 判断,中间层 Operator/Node/Task 只做记账与传播 |
| capacity 与物理内存正交 | capacity 是仲裁器维护的纯逻辑配额,MemoryAllocator 不感知;capacity 不足时 allocator 还未被调用就已触发仲裁 |
| addPool 预分配初始 capacity | Root Pool 注册时 arbitrator 同步划拨 initMemoryPoolCapacity(默认 128MB),避免第一次分配就触发仲裁 |
| addPool 不足时按现有量给 | freeNonReservedCapacity_ 不够 initCapacity 时按实际可用量给(甚至 0),不阻塞 query 创建 |
| maxCapacity 强制 self-reclaim | Step 2 ensureCapacity 在 query 撞自己上限时优先逼其自身 spill,spill 不动才报 query OOM;与系统是否有空闲无关 |
| 快速扩容无需 spill | arbFree 足够时,Step 1(maybeGrowFromSelf)直接从 freeNonReservedCapacity_ 划拨,driver 仅短暂 suspended,无跨查询影响 |
| 非线性增长策略 | 512MB 内翻倍增长,之后按 25% 增长,减少频繁扩容的仲裁开销 |
| reclaim 与 shrink 分离 | reclaim 释放内存,shrink 归还 capacity,两步解耦使仲裁器精确控制容量分配 |
| 本地优先,全局兜底 | 本地仲裁 4 步无需跨查询协调;全局仲裁后台线程串行化跨查询回收决策 |
| Local 边界:是否 spill 别人 | Local 可 shrink 其他 query 的”空闲 cap”(账面操作),但不让别人 spill;只有 global 才会暂停别的 task 真正 spill |
| 三线程协作执行 global | Requestor driver BLOCK + 后台 arb 线程决策 + reclaim 线程池执行 spill,三类线程角色清晰分离 |
| Arb 与 Rcl 严格分离 | 决策(1 线程、串行、轻量、看全局)与执行(N worker、并行、重量、I/O 密集)放不同线程;磁盘慢不阻塞决策,多 victim 并行执行 |
| 三层并行结构 | Arb 决策 × 1 → memoryReclaimExecutor_ per-participant × N → spillExecutor per-driver × M,三层各管一摊互不阻塞 |
| Victim 粒度 = query (root pool) | sortAndGroup 按 participant 排序,对应 QueryCtx;capacity、公平性、abort 失败语义都在 query 级一致 |
| 同 query 多 task 贪心串行 | 按 reclaimable 量从大到小逐 task pause→spill→resume;早停退出,少 pause 减小影响面 |
| Rcl worker 全程持锁 | 一个 rcl worker 持 reclaimMutex_ 走完整个 pause→reclaim→resume→shrink 周期,确保 participant 上 reclaim 操作互斥 |
| Reclaimer 绑定 pool 但持算子指针 | reclaimer 是 pool 的成员(pool 拥有),内部反向持 Task*/Operator* 让 reclaim 调用能转到真正的 spill 逻辑——pool 提供结构,算子提供行为 |
| Operator::MemoryReclaimer 不写 spill 代码 | 它只做”转接”:调 op->reclaim() 虚函数;具体 spill 由 HashBuild / OrderBy / HashAggregation 等子类各自实现,避免每算子一个 Reclaimer 子类 |
| ParallelMemoryReclaimer 是调度器 | 自己不 spill,只把多 driver 的 spill fork 到 spillExecutor,BLOCK 等 fork-join 完成;真正 IO 跑在 spillExecutor 线程上 |
| 并行 spill 双前提 | ① task 已 pause(数据稳定)② per-driver op pool 独立(无需加锁),两者缺一不可 |
| Driver 内多 op 串行 spill | reclaim chain 每轮挑 max-reclaimable 一个 node 下钻,同 driver 不同 node 的 op 自然落到不同轮次,永不并发 |
| 三种让出机制各管一摊 | SUSPENDED(atomic 计数,物理在跑)— requestor 自己进入仲裁;PAUSED(task 级 future 让出 CPU)— victim 提供 spill 时间窗;BLOCKED on future(单 driver 同步原语)— requestor 等全局结果 |
| Local 不动其他 query | Local 5 步中仅 Step 2 self-reclaim 时 pause 自己的 task;Step 4 仅 shrink 别人的”未用 cap”(账面操作);其他 query 的 driver 全程不受影响 |
| enter/leaveArbitration 四层桥接 | RAII guard → Pool 委托 → Reclaimer 多态 → Task 计数;每层只管自己关注的事,组合起来实现完整协调 |
| Operator::MemoryReclaimer 是唯一具体实现 | 基类 enterArbitration no-op,Root/Task/Node 层无负担;只有算子层重写桥接到 Task::enterSuspended,最小知识原则 |
| enterSuspended 一个原语两种用 | 同时支撑”主动进入仲裁”(requestor)和”响应 pause”(victim driver)两个语义场景;上层语义不同(是否 BLOCK on future)但底层计数需求一致 |
| TaskReclaimer 是 pause 时间窗的提供者 | 自身只负责”开窗 + 关窗”(requestPause + resume),向下找 children 复用基类贪心逻辑;try/catch 三明治结构保证 resume 永远被调用 |
| TaskReclaimer 用 weak_ptr |
避免 Task→Pool→Reclaimer→Task 循环引用;reclaim 入口 lock() 失败即安全降级返回 0 |
| Pause 粒度是 task 而非 driver | 共享数据结构(如 HashJoin build table)跨 driver 共享,必须整个 task 暂停后 spill 才安全 |
| SUSPENDED ≠ PAUSED | Requestor 自己 SUSPENDED 进入仲裁(不计 numThreads_);victim 的 drivers PAUSED 协作式让出;两者机制独立 |
| Spill 由 reclaim 线程执行 | Driver pause 后由 reclaim 线程独占调用 op->reclaim(),避免 driver 在算子中间状态时被读到不一致数据 |
| Spill 状态机对 driver 透明 | 算子内部 spiller_ 自动切换”纯内存”和”merge-spill”模式,driver 不感知 spill 历史 |
| 双闸门保证 spill 数据一致 | task pause(大粒度时间窗)+ nonReclaimableSection_(算子级临界区标记),缺一不可 |
| 同步原语五层分工 | stateMutex / participantLock / reclaimMutex / runningOpLock / pool mutex,各管一摊不重叠,避免持多把锁的复杂场景 |
| 状态机单调性 | aborted_ 不可逆、ArbitrationOperation transition 固定,消除”半途回滚”的复杂状态 |
| shared_ptr 保活 reclaim 期间的 pool | ScopedArbitrationParticipant 升级 weak_ptr 为 shared_ptr,防止 pool 在 reclaim 中被并发析构 |
| Spill 优先,Abort 兜底 | 超时 50% 后才切 abort,尽量保全查询;最年轻 participant 优先被 abort |
| 同 participant 串行仲裁 | ArbitrationParticipant 内部排队,避免同一查询并发仲裁导致过度回收 |
| 防重入 thread-local 标记 | 仲裁线程自身申请内存时跳过 capacity 检查,防止嵌套仲裁死锁 |