1. 1. Spark 监听器架构与原理
    1. 1.1. Spark 监听器是什么,为什么需要它
    2. 1.2. 自顶向下:运行时架构与类图
      1. 1.2.1. 1.1 运行时架构图
      2. 1.2.2. 1.2 三层职责
      3. 1.2.3. 1.3 类图
    3. 1.3. 逐个拆开:协调、派发、队列与流转
      1. 1.3.1. 2.1 LiveListenerBus —— 协调层
      2. 1.3.2. 2.2 AsyncEventQueue —— 队列 + 单线程
      3. 1.3.3. 2.3 SparkListenerBus —— 派发 trait(名字辨析)
      4. 1.3.4. 2.4 SparkListener 与事件体系 —— 回调契约
      5. 1.3.5. 2.5 端到端流转与线程模型
        1. 1.3.5.1. 线程模型 · 串行的范围
      6. 1.3.6. 2.6 装配与喂数据:在 SparkContext 中如何接线
        1. 1.3.6.1. 如何获取事件 · push 模型
        2. 1.3.6.2. 内部架构 · 队列与线程
        3. 1.3.6.3. 在 SparkContext 中的装配顺序
        4. 1.3.6.4. extra listener 注册流程
        5. 1.3.6.5. start / stop 生命周期
    4. 1.4. SQL 监听:架在 Core 总线之上的一层
      1. 1.4.1. 3.1 QueryExecutionListener 接口
      2. 1.4.2. 3.2 与 Core 的关系:同一条总线上的分层
      3. 1.4.3. 3.3 Core 监听器能监听 SQL 事件吗?
    5. 1.5. 如何扩展 Spark 监听器
      1. 1.5.1. 4.1 自定义 SparkListener(Core)
      2. 1.5.2. 4.2 自定义 QueryExecutionListener(SQL)
      3. 1.5.3. 4.3 自定义事件(onOtherEvent 扩展点)
      4. 1.5.4. 4.4 实践注意事项
    6. 1.6. 用 Core Listener 提取血缘:源/汇表与读写路径
      1. 1.6.1. 5.1 思路 · on 哪个 event
      2. 1.6.2. 5.2 提取架构图
      3. 1.6.3. 5.3 方式 A:解析 sparkPlanInfo(自包含,物理计划)
      4. 1.6.4. 5.4 方式 B(推荐):三阶段 union + End 缓存
      5. 1.6.5. 5.5 计划节点 → source / sink 映射
      6. 1.6.6. 5.6 注意事项

Spark listener详解

Apache Spark 监听器架构与原理 · Listener Architecture
Apache Spark · 内核技术档案 监听器专题 · Vol. Listener 自顶向下 / 架构精读

Spark 监听器架构与原理

自顶向下读懂 Spark 的可观测性机制:先看整体架构与类图,再逐个 drill-down 到 LiveListenerBusSparkListenerBusAsyncEventQueue 三个模块;随后是 SQL 层的 QueryExecutionListener 体系、 它与 Core 监听器的关系,以及一个常被问到的问题——Core 监听器能不能监听 SQL 事件;最后讲如何扩展,并实战:用 Core 监听器提取血缘(源/汇表与读写路径)。

00 是什么 / 为什么 01 总体架构 + 类图 02 核心模块 drill-down 03 SQL 体系与关系 04 扩展 05 血缘提取实战
01
总体架构 · Architecture

自顶向下:运行时架构与类图

先建立两张全局视图:一张运行时架构图(事件如何从生产者流到下游消费),一张类图(各类型如何组织)。 把这两张图看懂,后面逐个模块的细节就都有了坐标。

1.1 运行时架构图

事件自上而下流经四层:生产者(调度组件 + SQL 执行)把事件 post协调者 LiveListenerBus,后者广播到每一条异步队列, 每条队列用自己的线程把事件派发给挂在其下的监听器,监听器再驱动下游(UI、事件日志、动态分配、用户审计等)。

① 生产者 · DRIVER DAGScheduler TaskScheduler BlockManager Master SparkContext SQLExecution (SQL 事件) post(event) ② 协调层 LiveListenerBus 广播 post 到每一条队列 · 不按事件类型路由 · 管 start / stop ③ 队列层 · AsyncEventQueue × N(各持线程+缓冲) shared appStatus executorManagement eventLog 线程 + 缓冲 线程 + 缓冲 线程 + 缓冲 线程 + 缓冲 ④ 监听器层 → 下游消费 自定义 Listener ExecutionListenerBus → 用户审计 / SQL 回调 AppStatusListener → 实时 Spark UI ExecutorAllocation HeartbeatReceiver → 动态资源分配 EventLoggingListener → 事件日志 / History 同一个事件会同时进入全部四条队列;队列按“监听器分组”做隔离,而非按“事件类型”分流。 SQL 事件(SQLExecutionStart/End) 也走这条总线 → 因此 shared 队列里的 SparkListener 也能收到(见第 03 节)。
图 1 — 运行时架构:生产者 post → LiveListenerBus 广播 → 各 AsyncEventQueue 用自有线程派发 → 监听器驱动下游

1.2 三层职责

把上图的角色按职责归类,正好是三层——后面的模块详解就沿着这三层展开:

协调层 · Coordinator
LiveListenerBus

持有队列列表,post 广播事件、管 start/stop。实现任何回调,也直接持有监听器。

派发层 · Dispatch
ListenerBus → SparkListenerBus → AsyncEventQueue

把事件按类型变成对应回调调用;AsyncEventQueue 在此之上加了缓冲与线程。

回调契约层 · Callbacks
SparkListenerInterface / SparkListener / 具体监听器

声明并实现 onJobStart 等回调。真正的业务逻辑写在具体监听器里。

1.3 类图

类图把上面三层落到具体类型。左侧是派发/总线这条线,右侧是监听器这条线,二者通过一条「持有监听器列表」的关联连起来。 注意几个容易混的点:SparkListenerBus 不是总线门面,而是 AsyncEventQueue 的父 trait; LiveListenerBus 不在继承链上,它只是组合持有队列。

派发 / 总线 «trait» ListenerBus[L, E] listeners 列表 · postToAll · doPostEvent(抽象) «trait» SparkListenerBus doPostEvent:事件类型 → onXxx «class» AsyncEventQueue + 有界缓冲 + dispatch 线程 × 1 extends extends 按 name 实例化出 shared / appStatus / executorManagement / eventLog 四个对象 «class» LiveListenerBus queues: List[AsyncEventQueue] 持有 * 监听器(回调契约) «trait» SparkListenerInterface 声明 onJobStart / onTaskEnd / onOtherEvent … «abstract» SparkListener 全部回调的空实现 · 供继承 具体监听器 AppStatusListener · EventLoggingListener ExecutionListenerBus · 你的 Listener implements extends SparkListenerInterface --> listeners 0..* (派发对象) 继承 实现 组合 / 关联 桥接点:ExecutionListenerBus 既是一个 SparkListener(右侧),又被注册进 shared 队列(左侧)—— 它在两条线之间架桥,把 SQL 事件转成 QueryExecutionListener 回调(见第 03 节)。
图 2 — 类图:左侧 LiveListenerBus 组合持有 AsyncEventQueue(继承自 SparkListenerBus / ListenerBus);右侧监听器继承链;二者由“listeners 列表”关联
02
核心模块 · Drill-down

逐个拆开:协调、派发、队列与流转

沿着类图从上往下钻:先看协调者 LiveListenerBus,再看真正干活的队列 AsyncEventQueue, 接着辨析容易混淆的 SparkListenerBus 与回调契约,最后把它们串成一条端到端链路并讲清线程模型。

2.1 LiveListenerBus —— 协调层

LiveListenerBus 是个协调者 / 门面,本身是监听器、也实现任何回调。它做三件事: 持有一个 CopyOnWriteArrayList[AsyncEventQueue];把 post(event) 广播到列表里的每一条队列;管理队列的 start / stop。

注册监听器时(addToSharedQueue / addToStatusQueue / …)它按 name 查找队列,找不到就 new 一个—— 所以队列是按需懒创建的,实际存在哪几条取决于注册了什么。

// 队列名只是常量, 不是类型
val SHARED_QUEUE = "shared"; val APP_STATUS_QUEUE = "appStatus"
val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"; val EVENT_LOG_QUEUE = "eventLog"

private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

// post:广播到每一条队列(不按类型路由)
private def postToQueues(event: SparkListenerEvent): Unit = {
  val it = queues.iterator()
  while (it.hasNext) it.next().post(event)
}

// 注册:按 name 找队列, 没有才新建(懒创建)
def addToQueue(listener: SparkListenerInterface, name: String) = synchronized {
  queues.asScala.find(_.name == name) match {
    case Some(q) => q.addListener(listener)
    case None    => val q = new AsyncEventQueue(name, conf, metrics, this); q.addListener(listener); queues.add(q)
  }
}

2.2 AsyncEventQueue —— 队列 + 单线程

这是真正干活的类,本质就是「一个有界事件队列 + 一个处理线程」的封装,再叠加从父 trait 继承来的「事件→回调」派发能力。 它持有一个有界 LinkedBlockingQueue(容量默认 10000,由 spark.scheduler.listenerbus.eventqueue.capacity 控制), 和一个专属守护线程 spark-listener-group-{name}

postoffer 入队,非阻塞;队列满则丢弃事件、累加计数并打印限速告警(宁可丢事件也不阻塞调度)。 停止时向队列投递一枚哨兵(poison pill),等队列排空后再 join 线程。shared / appStatus / executorManagement / eventLog 就是这个类的四个实例,仅靠 name 区分。

private class AsyncEventQueue(val name: String, /* … */) extends SparkListenerBus {
  private val eventQueue     = new LinkedBlockingQueue[SparkListenerEvent](capacity)
  private val dispatchThread = new Thread(s"spark-listener-group-$name") { /* dispatch() */ }

  def post(event: SparkListenerEvent): Unit =
    if (!eventQueue.offer(event)) onDropEvent(event)   // 满则丢弃, 不阻塞

  private def dispatch(): Unit = {
    var next = eventQueue.take()                       // 阻塞取事件
    while (next != POISON_PILL) { super.postToAll(next); next = eventQueue.take() }
  }
}

2.3 SparkListenerBus —— 派发 trait(名字辨析)

命名陷阱 SparkListenerBus 这名字容易让人以为它就是那个「总线门面」。其实不是——它是 AsyncEventQueue父 trait, 封装的是「把事件按类型分发到具体回调」这件事。真正的门面/协调者是 LiveListenerBus

它的父 ListenerBus 持有监听器列表、提供 postToAll(遍历本实例的 listeners,逐个 doPostEvent,并 try/catch 隔离、单独计时)。 SparkListenerBus 实现的 doPostEvent 就是那段按事件类型的模式匹配——这是「事件→具体回调」的转换核心。 另外,History Server 回放日志用的 ReplayListenerBus 也继承它,只是同步重放、没有线程。

// SparkListenerBus:事件类型 → 调具体回调
protected def doPostEvent(listener: SparkListenerInterface,
                          event: SparkListenerEvent): Unit = event match {
  case e: SparkListenerTaskEnd => listener.onTaskEnd(e)
  case e: SparkListenerJobEnd  => listener.onJobEnd(e)
  // … 其余 core 事件类型 …
  case _                       => listener.onOtherEvent(e)   // 扩展/SQL 事件走这里
}

2.4 SparkListener 与事件体系 —— 回调契约

回调契约由 SparkListenerInterface 声明(onJobStart / onTaskEnd / … / onOtherEvent), 抽象类 SparkListener 给它们空实现供继承,具体监听器只重写自己关心的。其中 onOtherEvent扩展点: 自定义事件和 SQL 模块事件都从这里进来。所有事件都实现 SparkListenerEvent,并携带各自的上下文数据。

事件类别代表事件触发时机
应用ApplicationStart / ApplicationEndSparkContext 启动 / 应用终止
作业JobStart / JobEnd一次 Action 触发的 Job 提交 / 完成
阶段StageSubmitted / StageCompletedStage 提交执行 / 全部 Task 结束
任务TaskStart / TaskGettingResult / TaskEnd单个 Task 的生命周期节点
执行器ExecutorAdded / ExecutorRemovedExecutor 注册 / 退出或被移除
存储BlockManagerAdded / BlockUpdated / UnpersistRDD块管理器注册、缓存块变更
环境EnvironmentUpdate配置、类路径、环境信息就绪
扩展自定义 / SQL 事件经 onOtherEvent 分发

2.5 端到端流转与线程模型

把前面拼起来,追踪一个 SparkListenerTaskEnd 的完整旅程。它分两段:投递段(同步,在生产者线程上)与 派发段(异步,在队列分发线程上)。

投递段 bus.post(event)postToQueues同一事件投递到每一条队列(不按类型路由),各队列 offer 进缓冲。 派发段 每条队列的线程 take() 出事件 → postToAll 遍历本队列 listeners → 对每个 doPostEvent 按类型匹配 → 命中 TaskEnd 就调 onTaskEnd

① 生产者:DAGScheduler / TaskScheduler … val event = SparkListenerTaskEnd(…) post(event) ② LiveListenerBus.post(event) postToQueues:遍历 queues 列表逐条投递 ★ 关键:同一事件被复制投递到每一条队列 —— 队列按 listener 分组隔离,不按事件类型路由。 TaskEnd 会同时进入 shared / appStatus / executorManagement / eventLog 四条队列的缓冲。 shared appStatus executorManagement eventLog offer(buffer) offer(buffer) offer(buffer) offer(buffer) AsyncEventQueue.post:offer 进有界缓冲,非阻塞,满则丢弃 异步边界 · 以下在各队列 dispatchThread 上执行 appStatus / executorManagement / eventLog:各自 dispatchThread 并行,流程完全相同 ③ shared 队列 · dispatchThread event = take(); super.postToAll(event) ④ ListenerBus.postToAll(event) 遍历本队列 listeners → 逐个 doPostEvent(listener, event) 每个回调 try/catch 隔离 + 单独计时 ⑤ SparkListenerBus.doPostEvent(listener, event) 按事件类型分发 event match { case e: SparkListenerTaskEnd => listener.onTaskEnd(e) ◀ 命中 case e: SparkListenerJobEnd => listener.onJobEnd(e) case _ => listener.onOtherEvent(e) } ⑥ 你的 Listener.onTaskEnd(taskEnd) 执行用户逻辑:读取 TaskMetrics、累加指标 …
图 3 — 端到端流转:post 把同一事件广播到每条队列(第一级) → 各队列线程 postToAll 遍历本队列 listeners(第二级) → doPostEvent 按类型选中具体回调

线程模型 · 串行的范围

串行,但范围是「单条队列内」:队列里那个线程一次只处理一个事件(FIFO),且对一个事件会按注册顺序逐个同步调用本队列的 listeners—— 上一个回调返回了才调下一个。所以「事件之间」和「同一事件的多个 listener 之间」在一条队列里都是串行。 但队列之间是并行的:各跑各的线程,互不等待。副作用是:一个慢 listener 会同时拖住本队列其他 listener 与后续事件(head-of-line 阻塞)。

生产者线程 · 并发 POST dag-scheduler-event-loop heartbeat-receiver SparkContext 主线程 … AsyncEventQueue · shared LinkedBlockingQueue · 容量 10000 e₅ e₄ e₃ e₂ e₁ take() 分发线程 × 1 spark-listener-group-shared · 循环派发 dispatch 监听器 · 同线程依次调用 Listener A Listener B Listener C 1 2 3 其余队列 appStatus · executorManagement · eventLog 各持独立分发线程,与上图并行运行、互不阻塞。 一条队列被慢监听器拖住时,其他队列不受影响 —— 这就是多队列隔离的价值。
图 4 — 单队列线程模型:队列内单线程串行 FIFO(事件之间、监听器之间都串行);队列之间各持线程,并行隔离
关注点线程行为
生产者线程多个(调度 / 心跳 / 主线程),并发 post,线程安全
post() 是否阻塞否 —— 仅入队;队列满则丢弃,不阻塞生产者
每队列分发线程1 个专属守护线程(spark-listener-group-*)
队列内执行顺序FIFO,监听器串行调用,不并发
队列之间关系并行,各持线程,互相隔离
监听器内部加锁单队列注册可免;跨队列 / 共享状态需自理
背压方式丢弃 + 计数 + 限速告警(非阻塞)

2.6 装配与喂数据:在 SparkContext 中如何接线

到这里运行时机制清楚了,还剩两个工程问题:这套东西怎么在 SparkContext 里被组装起来,事件又是怎么进到总线的。

如何获取事件 · push 模型

LiveListenerBus 是个被动的 sink,只暴露一个 post()——它不订阅、不轮询任何东西。 各生产者组件在创建时被注入同一个 bus 引用,事件发生时各自构造对应的 SparkListenerXxx 并调用 post()。 也就是说:是组件主动 push,不是总线主动 pull。下表是「谁 post 什么」。

生产者投递的事件
DAGSchedulerJobStart / JobEnd · StageSubmitted / StageCompleted · TaskStart / TaskEnd …
SchedulerBackendExecutorAdded / ExecutorRemoved(Driver 端点)
BlockManagerMasterEndpointBlockManagerAdded / Removed · BlockUpdated
SparkContextApplicationStart / ApplicationEnd · EnvironmentUpdate
HeartbeatReceiverExecutorMetricsUpdate(心跳携带指标)
SQLExecution(SQL 模块)SQLExecutionStart / SQLExecutionEnd

post() 内部还有个时序细节:总线未 start 时,事件先进一个 queuedEvents 缓冲; start() 之后才走 postToQueues 广播到各队列。这保证了早注册的监听器不会漏掉启动早期的事件。

内部架构 · 队列与线程

LiveListenerBus 剖开看:它持有一个 queues 列表,列表里每个元素是一个 AsyncEventQueue 实例; 而每个实例内部 = 一个有界缓冲LinkedBlockingQueue)+ 一个专属线程 + 一个 listeners 列表post 把事件 offer 进每条队列的缓冲,每条队列的线程各自 take 出来跑 postToAll

post(event) 来自 DAGScheduler · SQLExecution · BlockManagerMaster … queuedEvents 启动前缓冲 · start() 时 flush start 前 post → 广播 (offer) 到每条队列 LiveListenerBus queues: List[AsyncEventQueue](按需创建) AsyncEventQueue · name = "shared" LinkedBlockingQueue · 有界 10000 FIFO dispatchThread × 1 take() 循环 listeners 列表 Listener A Listener B take postToAll AsyncEventQueue · name = "appStatus" LinkedBlockingQueue · 有界 10000 FIFO dispatchThread × 1 take() 循环 listeners 列表 AppStatusListener take postToAll … executorManagement / eventLog 两条队列结构完全相同(各自缓冲 + 各自线程 + 各自 listeners),此处省略。
图 5 — LiveListenerBus / AsyncEventQueue 内部:总线持有队列列表,每条队列 = 有界缓冲 + 单线程 + listeners;post 广播 offer,各线程 take 后 postToAll

在 SparkContext 中的装配顺序

总线在 SparkContext 构造早期就被 new 出来,然后按固定顺序接线。关键是 AppStatusListener 要在 SparkEnv 之前注册好,以便捕获最早的事件:

步骤动作
new LiveListenerBus(conf) —— 创建总线(未启动)
注册 AppStatusListener 到 appStatus 队列(早于 env,确保拿到全部事件)
createSparkEnv(conf, isLocal, listenerBus) —— bus 引用注入 env,内部组件从此可 post
创建 TaskScheduler、DAGScheduler(持有 bus 引用,后续 post 调度事件)
按需注册 EventLoggingListener(eventLog) / HeartbeatReceiver · ExecutorAllocation(executorManagement)
setupAndStartListenerBus() —— 注册 extra listeners 到 shared,然后 start()
start() 之后:postEnvironmentUpdate()、postApplicationStart() 投出最早的事件

extra listener 注册流程

第 ⑥ 步的 setupAndStartListenerBus() 读取 spark.extraListeners(逗号分隔类名),用反射实例化每个类 (优先找接收 SparkConf 的构造器,否则用无参构造器),注册到 shared 队列,最后 start()。 所以你的 extra listener 类必须有一个无参构造器或一个接收 SparkConf 的构造器

// SparkContext.setupAndStartListenerBus()(简化)
private def setupAndStartListenerBus(): Unit = {
  conf.get(EXTRA_LISTENERS).foreach { classNames =>        // spark.extraListeners
    val listeners = Utils.loadExtensions(
      classOf[SparkListenerInterface], classNames, conf)       // 反射实例化
    listeners.foreach { l =>
      listenerBus.addToSharedQueue(l)                        // 注册到 shared 队列
      logInfo(s"Registered listener ${l.getClass.getName}")
    }
  }
  listenerBus.start(this, _env.metricsSystem)                  // 启动:拉起各队列线程 + flush 缓冲
}

// Utils.loadExtensions:优先 (SparkConf) 构造器, 否则无参构造器
val ext = Try(klass.getConstructor(classOf[SparkConf])) match {
  case Success(ctor) => ctor.newInstance(conf)
  case Failure(_)    => klass.getConstructor().newInstance()
}

start / stop 生命周期

start() 拉起每条队列的 dispatchThread,并把 queuedEvents 里启动前缓冲的事件 flush 进各队列—— 早注册的监听器因此不漏早期事件。stop() 向每条队列投递哨兵(poison pill),等队列排空后 join 线程,确保 UI 状态与事件日志完整落盘。

时序要点 extra listener 在 start() 之前注册,而 ApplicationStartstart() 之后才 post—— 所以 extra listener 一定能收到 ApplicationStart。但若你在应用运行中途addSparkListener 注册,则只能收到注册之后的事件。
03
SQL 体系 · QueryExecutionListener

SQL 监听:架在 Core 总线之上的一层

SQL 层提供 QueryExecutionListener,观测查询级别——每一次 Dataset 动作完成时拿到执行计划与耗时。 它不是独立系统,而是分层架设在第 02 节那条 Core 总线之上。

3.1 QueryExecutionListener 接口

接口极简,只有成功 / 失败两个回调,观测单位是一次完整的 Dataset 动作

回调 · onSuccess
funcName, qe, durationNs

动作名(如 collect)、整个 QueryExecution、纳秒级耗时。

回调 · onFailure
funcName, qe, exception

动作名、查询对象、抛出的异常,用于捕获执行期失败。

回调里的 QueryExecution 封装了一条查询从逻辑到物理的全部计划阶段 (解析 → 分析 → 优化 → 物理计划 executedPlan),所以监听器不仅知道耗时,还能拿到完整计划树做血缘、计划审查或慢查询诊断。

3.2 与 Core 的关系:同一条总线上的分层

触发链路如下:Dataset 动作经 SQLExecution.withNewExecutionId 包裹,向 LiveListenerBus 投递 SparkListenerSQLExecutionStartSparkListenerSQLExecutionEnd 两个 SQL 事件。 关键在于:ExecutionListenerBus 本身就是一个 SparkListener,注册在 shared 队列上,订阅 SQLExecutionEnd, 收到后再按成功 / 失败把调用派发给注册的 QueryExecutionListener。同一批 SQL 事件还被 SQLAppStatusListener 消费,渲染成 UI 的 SQL 标签页。

Dataset 动作 collect() · save() · count() · show() … SQLExecution.withNewExecutionId(qe) ① post Start ② 执行计划 (产生 Job/Stage/Task) ③ post End 投递 SQL 事件 LiveListenerBus · shared 队列 SparkListenerSQLExecutionStart / End 在此异步分发 ExecutionListenerBus 一个特殊的 SparkListener · 订阅 SQLExecutionEnd SQLAppStatusListener 消费同一批事件 · 构建 SQL 状态 QueryExecutionListener onSuccess(...) / onFailure(...) Spark UI · SQL 标签页 物理计划图 + SQL Metrics 用户自定义审计 / 血缘 / 慢查询 可视化诊断
图 6 — SQL 监听器分层:Dataset 动作经 withNewExecutionId 投递 SQL 事件到内核总线,ExecutionListenerBus 与 SQLAppStatusListener 作为消费者各取所需

3.3 Core 监听器能监听 SQL 事件吗?

能。这正是「分层在同一条总线上」带来的直接结论。SparkListenerSQLExecutionStart / End 也是 SparkListenerEvent,经同一个 LiveListenerBus.post 广播到所有队列(含 shared)。 任何注册到 shared 队列的 SparkListener(比如你用 addSparkListener 注册的)都会收到它们。

只是——这些 SQL 事件不在 doPostEvent 的 core 类型匹配里(它们是 SQL 模块的类型),所以会落到 case _ => onOtherEvent(event)。因此:在你的 SparkListeneronOtherEvent 里对这些事件做模式匹配,就能监听 SQL 执行。 框架自己的 SQLAppStatusListenerExecutionListenerBus 正是这么干的——它们都是 SparkListener,靠 onOtherEvent 处理 SQL 事件。

import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart, SparkListenerSQLExecutionEnd}

class SqlAwareListener extends SparkListener {
  // SQL 事件不在 core 的类型匹配里, 统一从 onOtherEvent 进来
  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    case e: SparkListenerSQLExecutionStart =>
      println(s"SQL 开始 execId=${e.executionId}: ${e.description}")
    case e: SparkListenerSQLExecutionEnd =>
      println(s"SQL 结束 execId=${e.executionId}")
    case _ => // 忽略其他扩展事件
  }
}
spark.sparkContext.addSparkListener(new SqlAwareListener)
不对称性 反过来不成立:QueryExecutionListener 只能看到查询级的 onSuccess / onFailure,看不到 Job / Stage / Task 等 core 事件。 所以——要细粒度调度信息 + 顺带 SQL 事件,用 Core SparkListener;只要查询级计划与耗时,用 QueryExecutionListener。
维度SparkListener (Core)QueryExecutionListener (SQL)
观测粒度Job / Stage / Task / Executor / 存储一次 Dataset 动作(查询级)
能否看 SQL 事件(经 onOtherEvent)本身即 SQL 维度
能否看 Job/Task不能
回调入口onJobStart / onTaskEnd / onOtherEvent …onSuccess / onFailure
携带信息TaskMetrics、StageInfo、JobResultQueryExecution(四阶段计划)+ 耗时
事件来源调度器直接 post构建在总线之上(订阅 SQLExecutionEnd)
05
实战 · Lineage

用 Core Listener 提取血缘:源/汇表与读写路径

目标:用一个通过 spark.extraListeners 注册的 SparkListener,在每条 SQL/Dataset 执行时提取它读了哪些 source 表 / 输入路径、写了哪些 sink 表 / 输出路径。这正是 OpenLineage、Spline、Atlas Spark connector 的核心做法。

5.1 思路 · on 哪个 event

血缘信息藏在 SQL 的执行计划里。production 做法是在 onOtherEvent 里 hook SparkListenerSQLExecutionEnd——它带 errorMessage,能据此只上报成功的执行;再用 executionId 回查本次的 QueryExecution

但有个坑:SQLExecution.getQueryExecution(id) 在执行结束后会被移除,End 上直接取常拿到 null(见 §5.6)。稳妥姿势是——Start 时缓存 QE,End 时按 id 回查。拿到 QE 后,对它的 logical / analyzed / optimizedPlan 三个逻辑计划阶段做 best-effort 遍历:只看单一阶段语义简单但容易漏(写端目标与读端 relation 暴露在不同阶段),三阶段 union 覆盖面更广。两种取数路线:

方式 A · 轻量自包含
event.sparkPlanInfo

Start 事件自带的物理计划信息树,无需回查、始终可用;但元数据字符串化,提取表名较糙,适合只要粗粒度路径。

方式 B · production 推荐
QueryExecution · 三阶段 union

End 缓存回查 QueryExecution,遍历三个逻辑计划阶段并 union 去重,拿干净的表名/路径、覆盖面广(OpenLineage / Spline 即此类)。

一个前提:这条路只看得到 SQL / DataFrame / Dataset 执行(事件来自 SQLExecution);纯 RDD 作业不产生 SQL 事件,拿不到血缘。

5.2 提取架构图

SparkListenerSQLExecutionStart 缓存 QE → map(execId → QueryExecution) SparkListenerSQLExecutionEnd(触发提取) 执行结束时投递 ① 按 execId 回查 QE ② 读 errorMessage 判成败 execId 回查缓存 同一个 QueryExecution · best-effort 遍历三阶段逻辑计划 logical 写端目标(InsertIntoStatement…) 可能未 resolve(仅名字) analyzed 读写都最全 resolved relations + commands optimizedPlan pushed location / filter 但可能裁掉 source 每阶段都识别:读端 relation → sources | 写端 command·target → destinations 三阶段 union 候选 去重 + 归一化 · LineageDataset 表↔path 同源 · 路径归一 · 库表名 … 输出:sources[] · destinations[] 去重排序后 · 仅成功执行
图 7 — 多阶段血缘提取设计:Start 缓存 QE → End 触发并回查、按 errorMessage 判成败 → 对 logical/analyzed/optimizedPlan 三阶段 best-effort 遍历 → union → 按 LineageDataset 去重归一 → 输出

5.3 方式 A:解析 sparkPlanInfo(自包含,物理计划)

只用事件自带的 sparkPlanInfo 递归遍历,无需别的依赖。读节点的输入路径在 metadata("Location"),写节点信息在 simpleString 里——胜在简单稳定,弱在表名要靠字符串解析。

import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
import scala.collection.mutable.ListBuffer

class LineageListener extends SparkListener {
  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    case s: SparkListenerSQLExecutionStart =>            // 计划信息在 Start 事件上
      val in, out = ListBuffer.empty[String]
      walk(s.sparkPlanInfo, in, out)
      println(s"execId=${s.executionId} in=${in.distinct} out=${out.distinct}")
    case _ =>
  }

  // 递归遍历物理计划信息树
  private def walk(p: SparkPlanInfo, in: ListBuffer[String], out: ListBuffer[String]): Unit = {
    val n = p.nodeName
    if (n.contains("Scan"))                              // 读:FileScan / Scan parquet / HiveTableScan
      p.metadata.get("Location").foreach(in += _)        //   metadata 里有输入路径
    else if (n.contains("Insert") || n.contains("Write"))   // 写命令
      out += p.simpleString                            //   simpleString 含输出路径/表
    p.children.foreach(walk(_, in, out))
  }
}

5.4 方式 B(推荐):三阶段 union + End 缓存

End 触发提取;Start 时把 QE 缓存进自己的 map、End 时按 executionId 回查(规避注册表移除竞态),并用 errorMessage 门控只上报成功的执行。 拿到 QE 后,对 logical / analyzed / optimizedPlan 三个阶段各遍历一次——写端目标与读端 relation 分布在不同阶段,并扫覆盖更全;同一节点跨阶段重复,靠 LineageDataset 的集合天然去重。

import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.{SQLExecution, QueryExecution}
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart, SparkListenerSQLExecutionEnd}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable

class LineageListener extends SparkListener {
  // Start 缓存 QE:规避 End 上 getQueryExecution 已被移除的竞态
  private val cache = new ConcurrentHashMap[Long, QueryExecution]()

  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    case s: SparkListenerSQLExecutionStart =>
      Option(SQLExecution.getQueryExecution(s.executionId)).foreach(cache.put(s.executionId, _))

    case e: SparkListenerSQLExecutionEnd =>                    // End 触发
      val qe = Option(cache.remove(e.executionId))
        .orElse(Option(SQLExecution.getQueryExecution(e.executionId)))
      if (e.errorMessage.isEmpty) qe.foreach(emit)             // 仅成功才上报

    case _ =>
  }

  private def emit(qe: QueryExecution): Unit = {
    val sources, dests = mutable.LinkedHashSet.empty[LineageDataset]
    // best-effort:三个逻辑计划阶段都扫, 结果合并(集合天然去重)
    Seq(qe.logical, qe.analyzed, qe.optimizedPlan).foreach(scan(_, sources, dests))
    report(Lineage(sources.toSeq.sorted, dests.toSeq.sorted))   // 去重 + 排序
  }

  // 单阶段提取:读端 relation → sources,写端 command/target → dests
  private def scan(plan: LogicalPlan, sources: mutable.Set[LineageDataset],
                   dests: mutable.Set[LineageDataset]): Unit = plan.foreach {
    // —— 读端 relation → source ——
    case LogicalRelation(h: HadoopFsRelation, _, cat, _) => sources ++= asDataset(h, cat)
    case r: HiveTableRelation    => sources += asTable(r.tableMeta)
    case r: DataSourceV2Relation => sources += asV2(r.table)
    // —— 写端 command / target → destination ——
    case c: InsertIntoHadoopFsRelationCommand => dests ++= asDataset(c)
    case c: InsertIntoHiveTable               => dests += asTable(c.table)
    case i: InsertIntoStatement               => dests ++= asTarget(i.table)   // 未 resolve 的写
    case w: V2WriteCommand                    => dests += asV2(w.table)        // Append/Overwrite…
    case c: CreateTableAsSelect               => dests ++= asCreate(c)         // CTAS
    case r: ReplaceTableAsSelect              => dests ++= asCreate(r)         // RTAS
    case _ =>
  }
  // asDataset/asTable/asV2/asTarget/asCreate:把各 connector 的 path·identifier·name 归一成
  // LineageDataset(字段访问随 Spark 版本而异, 这里只示意分流逻辑)
}

5.5 计划节点 → source / sink 映射

计划节点角色提取什么
LogicalRelation(HadoopFsRelation)sourcelocation.rootPaths(路径)/ catalogTable(表)
HiveTableRelationsourcetableMeta.identifier(表)/ storage.locationUri(路径)
DataSourceV2Relationsourcetable.name() / identifier(V2 表)
InsertIntoHadoopFsRelationCommandsinkoutputPath(路径)/ catalogTable(表)
InsertIntoHiveTablesinktable.identifier(表)/ storage(路径)
InsertIntoStatement(未 resolve)sinktarget relation → 名字(多见于 logical 阶段)
AppendData / OverwriteByExpression(V2 写)sinktable(V2 表 / 路径)
CreateTableAsSelect / ReplaceTableAsSelectsink目标 identifier(CTAS / RTAS)

5.6 注意事项

End 竞态
End 上直接取 QE 常为 null

withNewExecutionId 在 post End 后立即移除 QE,监听线程异步处理 End 时往往已被移除。稳妥:Start 缓存 QE、End 按 id 回查(Start 时 QE 在整段执行期内有效,几乎总能拿到)。

V1 vs V2
节点类型不同

FileSource / Hive 是 V1 节点;Delta / Iceberg / Kafka / JDBC 多用 DataSourceV2(DataSourceV2Relation / AppendData …),需补对应 case。

内部 API
跨版本可能变动

LogicalRelation / InsertIntoHadoopFsRelationCommand / SQLExecution 等是 Spark 内部 API;建议封装并按版本做兼容。

去重归一化
最容易出脏数据的地方

三阶段 union 后,LineageDataset 的归一化是关键:表↔底层 path 是否同源、fully-qualified vs 相对路径、结尾斜杠、hdfs://nn vs hdfs://nn:8020、库表名大小写——键不归一化会“看着像两个其实是一个”。

生产建议 路径还有两个细节:rootPaths 是数据源根路径(分区裁剪后实际读取的不一定都在此),写出常先落 staging/临时目录再 rename——按需归一。 要稳,优先用成熟方案(OpenLineage spark-listener · Spline · Atlas Spark connector),它们已处理好各版本与各数据源的差异。 另外:若不强求走 Core 监听器这条路,QueryExecutionListener 会被直接递交 QueryExecution(无注册表竞态),提取血缘更省心——本节坚持用 SQLExecutionEnd + 缓存回查 QueryExecution,是因为命题就是「用 Core Listener 扩展」。
Apache Spark · 监听器架构与原理
协调层 LiveListenerBus · 派发层 ListenerBus / SparkListenerBus / AsyncEventQueue · 回调契约 SparkListenerInterface / SparkListener
SQL 层 QueryExecutionListener / ExecutionListenerBus / SQLAppStatusListener — 类名与配置项基于 Spark 2.3+(多队列总线)与 3.x(SQL 监听分层)架构整理。