Spark listener详解
Spark 监听器架构与原理
自顶向下读懂 Spark 的可观测性机制:先看整体架构与类图,再逐个 drill-down 到 LiveListenerBus、
SparkListenerBus、AsyncEventQueue 三个模块;随后是 SQL 层的 QueryExecutionListener 体系、
它与 Core 监听器的关系,以及一个常被问到的问题——Core 监听器能不能监听 SQL 事件;最后讲如何扩展,并实战:用 Core 监听器提取血缘(源/汇表与读写路径)。
Spark 监听器是什么,为什么需要它
Spark 的调度器(Driver 端)在运行过程中持续产生大量内部事件:一个 Job 被提交、一个 Stage 完成、一个 Task 失败重试、一个 Executor 上线下线…… 这些事件是调度逻辑的副产物,却对监控、审计、UI 展示、动态资源分配至关重要。
若让每个需要这些信息的组件都去侵入调度器代码,耦合会失控。Spark 的解法是经典的观察者模式(发布—订阅): 调度核心只负责把事件投递到一条总线上,谁来消费、消费后做什么,由订阅者——监听器(Listener)——各自决定。 生产者与消费者彻底解耦,这就是整套机制的设计基石。
先用一个动图直观感受一个事件的旅程——它如何从调度器流入总线、被广播分发进队列、再由队列线程交给注册的监听器处理(下面各节会把每一步拆开细讲):
两套监听体系
回调契约
命名队列
LiveListenerBus
整套体系分两层:Core 层的 SparkListener 观测调度全生命周期(作业 / 阶段 / 任务 / 执行器 / 存储),
SQL 层的 QueryExecutionListener 观测查询级别(每一次 Dataset 动作的计划与耗时)。
关键是:两层共享同一条事件总线——后文会看到,SQL 监听只是架设在 Core 总线之上的一层。下面从全局架构开始。
自顶向下:运行时架构与类图
先建立两张全局视图:一张运行时架构图(事件如何从生产者流到下游消费),一张类图(各类型如何组织)。 把这两张图看懂,后面逐个模块的细节就都有了坐标。
1.1 运行时架构图
事件自上而下流经四层:生产者(调度组件 + SQL 执行)把事件 post 给
协调者 LiveListenerBus,后者广播到每一条异步队列,
每条队列用自己的线程把事件派发给挂在其下的监听器,监听器再驱动下游(UI、事件日志、动态分配、用户审计等)。
1.2 三层职责
把上图的角色按职责归类,正好是三层——后面的模块详解就沿着这三层展开:
持有队列列表,post 广播事件、管 start/stop。不实现任何回调,也不直接持有监听器。
把事件按类型变成对应回调调用;AsyncEventQueue 在此之上加了缓冲与线程。
声明并实现 onJobStart 等回调。真正的业务逻辑写在具体监听器里。
1.3 类图
类图把上面三层落到具体类型。左侧是派发/总线这条线,右侧是监听器这条线,二者通过一条「持有监听器列表」的关联连起来。
注意几个容易混的点:SparkListenerBus 不是总线门面,而是 AsyncEventQueue 的父 trait;
LiveListenerBus 不在继承链上,它只是组合持有队列。
逐个拆开:协调、派发、队列与流转
沿着类图从上往下钻:先看协调者 LiveListenerBus,再看真正干活的队列 AsyncEventQueue,
接着辨析容易混淆的 SparkListenerBus 与回调契约,最后把它们串成一条端到端链路并讲清线程模型。
2.1 LiveListenerBus —— 协调层
LiveListenerBus 是个协调者 / 门面,本身不是监听器、也不实现任何回调。它做三件事:
持有一个 CopyOnWriteArrayList[AsyncEventQueue];把 post(event) 广播到列表里的每一条队列;管理队列的 start / stop。
注册监听器时(addToSharedQueue / addToStatusQueue / …)它按 name 查找队列,找不到就 new 一个——
所以队列是按需懒创建的,实际存在哪几条取决于注册了什么。
// 队列名只是常量, 不是类型 val SHARED_QUEUE = "shared"; val APP_STATUS_QUEUE = "appStatus" val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"; val EVENT_LOG_QUEUE = "eventLog" private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() // post:广播到每一条队列(不按类型路由) private def postToQueues(event: SparkListenerEvent): Unit = { val it = queues.iterator() while (it.hasNext) it.next().post(event) } // 注册:按 name 找队列, 没有才新建(懒创建) def addToQueue(listener: SparkListenerInterface, name: String) = synchronized { queues.asScala.find(_.name == name) match { case Some(q) => q.addListener(listener) case None => val q = new AsyncEventQueue(name, conf, metrics, this); q.addListener(listener); queues.add(q) } }
2.2 AsyncEventQueue —— 队列 + 单线程
这是真正干活的类,本质就是「一个有界事件队列 + 一个处理线程」的封装,再叠加从父 trait 继承来的「事件→回调」派发能力。
它持有一个有界 LinkedBlockingQueue(容量默认 10000,由 spark.scheduler.listenerbus.eventqueue.capacity 控制),
和一个专属守护线程 spark-listener-group-{name}。
post 用 offer 入队,非阻塞;队列满则丢弃事件、累加计数并打印限速告警(宁可丢事件也不阻塞调度)。
停止时向队列投递一枚哨兵(poison pill),等队列排空后再 join 线程。shared / appStatus / executorManagement / eventLog
就是这个类的四个实例,仅靠 name 区分。
private class AsyncEventQueue(val name: String, /* … */) extends SparkListenerBus { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity) private val dispatchThread = new Thread(s"spark-listener-group-$name") { /* dispatch() */ } def post(event: SparkListenerEvent): Unit = if (!eventQueue.offer(event)) onDropEvent(event) // 满则丢弃, 不阻塞 private def dispatch(): Unit = { var next = eventQueue.take() // 阻塞取事件 while (next != POISON_PILL) { super.postToAll(next); next = eventQueue.take() } } }
2.3 SparkListenerBus —— 派发 trait(名字辨析)
SparkListenerBus 这名字容易让人以为它就是那个「总线门面」。其实不是——它是 AsyncEventQueue 的父 trait,
封装的是「把事件按类型分发到具体回调」这件事。真正的门面/协调者是 LiveListenerBus。
它的父 ListenerBus 持有监听器列表、提供 postToAll(遍历本实例的 listeners,逐个 doPostEvent,并 try/catch 隔离、单独计时)。
SparkListenerBus 实现的 doPostEvent 就是那段按事件类型的模式匹配——这是「事件→具体回调」的转换核心。
另外,History Server 回放日志用的 ReplayListenerBus 也继承它,只是同步重放、没有线程。
// SparkListenerBus:事件类型 → 调具体回调 protected def doPostEvent(listener: SparkListenerInterface, event: SparkListenerEvent): Unit = event match { case e: SparkListenerTaskEnd => listener.onTaskEnd(e) case e: SparkListenerJobEnd => listener.onJobEnd(e) // … 其余 core 事件类型 … case _ => listener.onOtherEvent(e) // 扩展/SQL 事件走这里 }
2.4 SparkListener 与事件体系 —— 回调契约
回调契约由 SparkListenerInterface 声明(onJobStart / onTaskEnd / … / onOtherEvent),
抽象类 SparkListener 给它们空实现供继承,具体监听器只重写自己关心的。其中 onOtherEvent 是扩展点:
自定义事件和 SQL 模块事件都从这里进来。所有事件都实现 SparkListenerEvent,并携带各自的上下文数据。
| 事件类别 | 代表事件 | 触发时机 |
|---|---|---|
| 应用 | ApplicationStart / ApplicationEnd | SparkContext 启动 / 应用终止 |
| 作业 | JobStart / JobEnd | 一次 Action 触发的 Job 提交 / 完成 |
| 阶段 | StageSubmitted / StageCompleted | Stage 提交执行 / 全部 Task 结束 |
| 任务 | TaskStart / TaskGettingResult / TaskEnd | 单个 Task 的生命周期节点 |
| 执行器 | ExecutorAdded / ExecutorRemoved | Executor 注册 / 退出或被移除 |
| 存储 | BlockManagerAdded / BlockUpdated / UnpersistRDD | 块管理器注册、缓存块变更 |
| 环境 | EnvironmentUpdate | 配置、类路径、环境信息就绪 |
| 扩展 | 自定义 / SQL 事件 | 经 onOtherEvent 分发 |
2.5 端到端流转与线程模型
把前面拼起来,追踪一个 SparkListenerTaskEnd 的完整旅程。它分两段:投递段(同步,在生产者线程上)与
派发段(异步,在队列分发线程上)。
投递段 bus.post(event) → postToQueues 把同一事件投递到每一条队列(不按类型路由),各队列 offer 进缓冲。
派发段 每条队列的线程 take() 出事件 → postToAll 遍历本队列 listeners → 对每个 doPostEvent 按类型匹配 → 命中 TaskEnd 就调 onTaskEnd。
线程模型 · 串行的范围
串行,但范围是「单条队列内」:队列里那个线程一次只处理一个事件(FIFO),且对一个事件会按注册顺序逐个同步调用本队列的 listeners—— 上一个回调返回了才调下一个。所以「事件之间」和「同一事件的多个 listener 之间」在一条队列里都是串行。 但队列之间是并行的:各跑各的线程,互不等待。副作用是:一个慢 listener 会同时拖住本队列其他 listener 与后续事件(head-of-line 阻塞)。
| 关注点 | 线程行为 |
|---|---|
| 生产者线程 | 多个(调度 / 心跳 / 主线程),并发 post,线程安全 |
| post() 是否阻塞 | 否 —— 仅入队;队列满则丢弃,不阻塞生产者 |
| 每队列分发线程 | 1 个专属守护线程(spark-listener-group-*) |
| 队列内执行顺序 | FIFO,监听器串行调用,不并发 |
| 队列之间关系 | 并行,各持线程,互相隔离 |
| 监听器内部加锁 | 单队列注册可免;跨队列 / 共享状态需自理 |
| 背压方式 | 丢弃 + 计数 + 限速告警(非阻塞) |
2.6 装配与喂数据:在 SparkContext 中如何接线
到这里运行时机制清楚了,还剩两个工程问题:这套东西怎么在 SparkContext 里被组装起来,事件又是怎么进到总线的。
如何获取事件 · push 模型
LiveListenerBus 是个被动的 sink,只暴露一个 post()——它不订阅、不轮询任何东西。
各生产者组件在创建时被注入同一个 bus 引用,事件发生时各自构造对应的 SparkListenerXxx 并调用 post()。
也就是说:是组件主动 push,不是总线主动 pull。下表是「谁 post 什么」。
| 生产者 | 投递的事件 |
|---|---|
| DAGScheduler | JobStart / JobEnd · StageSubmitted / StageCompleted · TaskStart / TaskEnd … |
| SchedulerBackend | ExecutorAdded / ExecutorRemoved(Driver 端点) |
| BlockManagerMasterEndpoint | BlockManagerAdded / Removed · BlockUpdated |
| SparkContext | ApplicationStart / ApplicationEnd · EnvironmentUpdate |
| HeartbeatReceiver | ExecutorMetricsUpdate(心跳携带指标) |
| SQLExecution(SQL 模块) | SQLExecutionStart / SQLExecutionEnd |
post() 内部还有个时序细节:总线未 start 时,事件先进一个 queuedEvents 缓冲;
start() 之后才走 postToQueues 广播到各队列。这保证了早注册的监听器不会漏掉启动早期的事件。
内部架构 · 队列与线程
把 LiveListenerBus 剖开看:它持有一个 queues 列表,列表里每个元素是一个 AsyncEventQueue 实例;
而每个实例内部 = 一个有界缓冲(LinkedBlockingQueue)+ 一个专属线程 + 一个 listeners 列表。
post 把事件 offer 进每条队列的缓冲,每条队列的线程各自 take 出来跑 postToAll。
在 SparkContext 中的装配顺序
总线在 SparkContext 构造早期就被 new 出来,然后按固定顺序接线。关键是
AppStatusListener 要在 SparkEnv 之前注册好,以便捕获最早的事件:
| 步骤 | 动作 |
|---|---|
| ① | new LiveListenerBus(conf) —— 创建总线(未启动) |
| ② | 注册 AppStatusListener 到 appStatus 队列(早于 env,确保拿到全部事件) |
| ③ | createSparkEnv(conf, isLocal, listenerBus) —— bus 引用注入 env,内部组件从此可 post |
| ④ | 创建 TaskScheduler、DAGScheduler(持有 bus 引用,后续 post 调度事件) |
| ⑤ | 按需注册 EventLoggingListener(eventLog) / HeartbeatReceiver · ExecutorAllocation(executorManagement) |
| ⑥ | setupAndStartListenerBus() —— 注册 extra listeners 到 shared,然后 start() |
| ⑦ | start() 之后:postEnvironmentUpdate()、postApplicationStart() 投出最早的事件 |
extra listener 注册流程
第 ⑥ 步的 setupAndStartListenerBus() 读取 spark.extraListeners(逗号分隔类名),用反射实例化每个类
(优先找接收 SparkConf 的构造器,否则用无参构造器),注册到 shared 队列,最后 start()。
所以你的 extra listener 类必须有一个无参构造器或一个接收 SparkConf 的构造器。
// SparkContext.setupAndStartListenerBus()(简化) private def setupAndStartListenerBus(): Unit = { conf.get(EXTRA_LISTENERS).foreach { classNames => // spark.extraListeners val listeners = Utils.loadExtensions( classOf[SparkListenerInterface], classNames, conf) // 反射实例化 listeners.foreach { l => listenerBus.addToSharedQueue(l) // 注册到 shared 队列 logInfo(s"Registered listener ${l.getClass.getName}") } } listenerBus.start(this, _env.metricsSystem) // 启动:拉起各队列线程 + flush 缓冲 } // Utils.loadExtensions:优先 (SparkConf) 构造器, 否则无参构造器 val ext = Try(klass.getConstructor(classOf[SparkConf])) match { case Success(ctor) => ctor.newInstance(conf) case Failure(_) => klass.getConstructor().newInstance() }
start / stop 生命周期
start() 拉起每条队列的 dispatchThread,并把 queuedEvents 里启动前缓冲的事件 flush 进各队列——
早注册的监听器因此不漏早期事件。stop() 向每条队列投递哨兵(poison pill),等队列排空后 join 线程,确保 UI 状态与事件日志完整落盘。
start() 之前注册,而 ApplicationStart 在 start() 之后才 post——
所以 extra listener 一定能收到 ApplicationStart。但若你在应用运行中途用 addSparkListener 注册,则只能收到注册之后的事件。
SQL 监听:架在 Core 总线之上的一层
SQL 层提供 QueryExecutionListener,观测查询级别——每一次 Dataset 动作完成时拿到执行计划与耗时。
它不是独立系统,而是分层架设在第 02 节那条 Core 总线之上。
3.1 QueryExecutionListener 接口
接口极简,只有成功 / 失败两个回调,观测单位是一次完整的 Dataset 动作:
动作名(如 collect)、整个 QueryExecution、纳秒级耗时。
动作名、查询对象、抛出的异常,用于捕获执行期失败。
回调里的 QueryExecution 封装了一条查询从逻辑到物理的全部计划阶段
(解析 → 分析 → 优化 → 物理计划 executedPlan),所以监听器不仅知道耗时,还能拿到完整计划树做血缘、计划审查或慢查询诊断。
3.2 与 Core 的关系:同一条总线上的分层
触发链路如下:Dataset 动作经 SQLExecution.withNewExecutionId 包裹,向 LiveListenerBus 投递
SparkListenerSQLExecutionStart 与 SparkListenerSQLExecutionEnd 两个 SQL 事件。
关键在于:ExecutionListenerBus 本身就是一个 SparkListener,注册在 shared 队列上,订阅 SQLExecutionEnd,
收到后再按成功 / 失败把调用派发给注册的 QueryExecutionListener。同一批 SQL 事件还被 SQLAppStatusListener 消费,渲染成 UI 的 SQL 标签页。
3.3 Core 监听器能监听 SQL 事件吗?
能。这正是「分层在同一条总线上」带来的直接结论。SparkListenerSQLExecutionStart / End 也是
SparkListenerEvent,经同一个 LiveListenerBus.post 广播到所有队列(含 shared)。
任何注册到 shared 队列的 SparkListener(比如你用 addSparkListener 注册的)都会收到它们。
只是——这些 SQL 事件不在 doPostEvent 的 core 类型匹配里(它们是 SQL 模块的类型),所以会落到
case _ => onOtherEvent(event)。因此:在你的 SparkListener 的 onOtherEvent 里对这些事件做模式匹配,就能监听 SQL 执行。
框架自己的 SQLAppStatusListener、ExecutionListenerBus 正是这么干的——它们都是 SparkListener,靠 onOtherEvent 处理 SQL 事件。
import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart, SparkListenerSQLExecutionEnd} class SqlAwareListener extends SparkListener { // SQL 事件不在 core 的类型匹配里, 统一从 onOtherEvent 进来 override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionStart => println(s"SQL 开始 execId=${e.executionId}: ${e.description}") case e: SparkListenerSQLExecutionEnd => println(s"SQL 结束 execId=${e.executionId}") case _ => // 忽略其他扩展事件 } } spark.sparkContext.addSparkListener(new SqlAwareListener)
QueryExecutionListener 只能看到查询级的 onSuccess / onFailure,看不到 Job / Stage / Task 等 core 事件。
所以——要细粒度调度信息 + 顺带 SQL 事件,用 Core SparkListener;只要查询级计划与耗时,用 QueryExecutionListener。
| 维度 | SparkListener (Core) | QueryExecutionListener (SQL) |
|---|---|---|
| 观测粒度 | Job / Stage / Task / Executor / 存储 | 一次 Dataset 动作(查询级) |
| 能否看 SQL 事件 | 能(经 onOtherEvent) | 本身即 SQL 维度 |
| 能否看 Job/Task | 能 | 不能 |
| 回调入口 | onJobStart / onTaskEnd / onOtherEvent … | onSuccess / onFailure |
| 携带信息 | TaskMetrics、StageInfo、JobResult | QueryExecution(四阶段计划)+ 耗时 |
| 事件来源 | 调度器直接 post | 构建在总线之上(订阅 SQLExecutionEnd) |
如何扩展 Spark 监听器
三种扩展方式:写一个 Core SparkListener、写一个 SQL QueryExecutionListener、或自定义事件经 onOtherEvent 传播。每种都有「启动配置」与「运行时」两条注册途径。
4.1 自定义 SparkListener(Core)
继承 SparkListener,重写关心的回调即可。注册:启动配置用 spark.extraListeners(随 SparkContext 实例化),运行时用 addSparkListener(挂 shared 队列)。
class JobMetricsListener extends SparkListener { override def onJobStart(e: SparkListenerJobStart): Unit = println(s"Job ${e.jobId} 启动, 含 ${e.stageIds.size} 个 Stage") override def onTaskEnd(e: SparkListenerTaskEnd): Unit = println(s"Task 结束: 运行 ${e.taskMetrics.executorRunTime}ms") } spark.sparkContext.addSparkListener(new JobMetricsListener) // 运行时 // 或 --conf spark.extraListeners=com.example.JobMetricsListener // 启动配置
4.2 自定义 QueryExecutionListener(SQL)
实现 onSuccess / onFailure。注册:启动配置用 spark.sql.queryExecutionListeners,运行时用 spark.listenerManager.register(...)。
import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.sql.execution.QueryExecution class AuditListener extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = println(s"[$funcName] 成功 ${durationNs/1e9}s\n${qe.executedPlan}") override def onFailure(funcName: String, qe: QueryExecution, ex: Exception): Unit = println(s"[$funcName] 失败: ${ex.getMessage}") } spark.listenerManager.register(new AuditListener) // 运行时 // 或 --conf spark.sql.queryExecutionListeners=com.example.AuditListener
4.3 自定义事件(onOtherEvent 扩展点)
需要在框架内传播自有信号时,定义一个实现 SparkListenerEvent 的事件并 post 到总线,由各监听器经 onOtherEvent 接收——
这与 SQL 事件走的是完全相同的机制。
case class MyMetricEvent(name: String, value: Long) extends SparkListenerEvent // 投递 spark.sparkContext.listenerBus.post(MyMetricEvent("rows", 1000)) // 接收 override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: MyMetricEvent => println(s"${e.name} = ${e.value}") case _ => }
4.4 实践注意事项
回调在队列分发线程上同步执行,重 IO 会拖慢整条队列、引发积压与丢弃。重活异步化或批量化。
队列满即丢事件。逻辑不能假设事件“一定收到、顺序完整”;关键场景需容忍缺失或调大队列容量。
回调抛出的异常会被总线捕获记录,但反复抛错会刷屏、干扰别的监听器。务必在回调内部处理。
只在单条队列注册时内部状态无需加锁;跨队列注册或与外部线程共享状态时需自行保证。
LiveListenerBus 管「事件进哪条队列」(广播给每条),doPostEvent 管「调监听器的哪个方法」(按类型分发),
监听器自己管「方法里干什么」。SQL 监听只是同一条总线上、借 onOtherEvent 与 ExecutionListenerBus 架出的一层。
用 Core Listener 提取血缘:源/汇表与读写路径
目标:用一个通过 spark.extraListeners 注册的 SparkListener,在每条 SQL/Dataset 执行时提取它读了哪些
source 表 / 输入路径、写了哪些 sink 表 / 输出路径。这正是 OpenLineage、Spline、Atlas Spark connector 的核心做法。
5.1 思路 · on 哪个 event
血缘信息藏在 SQL 的执行计划里。production 做法是在 onOtherEvent 里 hook SparkListenerSQLExecutionEnd——它带 errorMessage,能据此只上报成功的执行;再用 executionId 回查本次的 QueryExecution。
但有个坑:SQLExecution.getQueryExecution(id) 在执行结束后会被移除,End 上直接取常拿到 null(见 §5.6)。稳妥姿势是——Start 时缓存 QE,End 时按 id 回查。拿到 QE 后,对它的 logical / analyzed / optimizedPlan 三个逻辑计划阶段做 best-effort 遍历:只看单一阶段语义简单但容易漏(写端目标与读端 relation 暴露在不同阶段),三阶段 union 覆盖面更广。两种取数路线:
Start 事件自带的物理计划信息树,无需回查、始终可用;但元数据字符串化,提取表名较糙,适合只要粗粒度路径。
End 缓存回查 QueryExecution,遍历三个逻辑计划阶段并 union 去重,拿干净的表名/路径、覆盖面广(OpenLineage / Spline 即此类)。
一个前提:这条路只看得到 SQL / DataFrame / Dataset 执行(事件来自 SQLExecution);纯 RDD 作业不产生 SQL 事件,拿不到血缘。
5.2 提取架构图
5.3 方式 A:解析 sparkPlanInfo(自包含,物理计划)
只用事件自带的 sparkPlanInfo 递归遍历,无需别的依赖。读节点的输入路径在 metadata("Location"),写节点信息在 simpleString 里——胜在简单稳定,弱在表名要靠字符串解析。
import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart import scala.collection.mutable.ListBuffer class LineageListener extends SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case s: SparkListenerSQLExecutionStart => // 计划信息在 Start 事件上 val in, out = ListBuffer.empty[String] walk(s.sparkPlanInfo, in, out) println(s"execId=${s.executionId} in=${in.distinct} out=${out.distinct}") case _ => } // 递归遍历物理计划信息树 private def walk(p: SparkPlanInfo, in: ListBuffer[String], out: ListBuffer[String]): Unit = { val n = p.nodeName if (n.contains("Scan")) // 读:FileScan / Scan parquet / HiveTableScan p.metadata.get("Location").foreach(in += _) // metadata 里有输入路径 else if (n.contains("Insert") || n.contains("Write")) // 写命令 out += p.simpleString // simpleString 含输出路径/表 p.children.foreach(walk(_, in, out)) } }
5.4 方式 B(推荐):三阶段 union + End 缓存
End 触发提取;Start 时把 QE 缓存进自己的 map、End 时按 executionId 回查(规避注册表移除竞态),并用 errorMessage 门控只上报成功的执行。
拿到 QE 后,对 logical / analyzed / optimizedPlan 三个阶段各遍历一次——写端目标与读端 relation 分布在不同阶段,并扫覆盖更全;同一节点跨阶段重复,靠 LineageDataset 的集合天然去重。
import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.{SQLExecution, QueryExecution} import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart, SparkListenerSQLExecutionEnd} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable class LineageListener extends SparkListener { // Start 缓存 QE:规避 End 上 getQueryExecution 已被移除的竞态 private val cache = new ConcurrentHashMap[Long, QueryExecution]() override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case s: SparkListenerSQLExecutionStart => Option(SQLExecution.getQueryExecution(s.executionId)).foreach(cache.put(s.executionId, _)) case e: SparkListenerSQLExecutionEnd => // End 触发 val qe = Option(cache.remove(e.executionId)) .orElse(Option(SQLExecution.getQueryExecution(e.executionId))) if (e.errorMessage.isEmpty) qe.foreach(emit) // 仅成功才上报 case _ => } private def emit(qe: QueryExecution): Unit = { val sources, dests = mutable.LinkedHashSet.empty[LineageDataset] // best-effort:三个逻辑计划阶段都扫, 结果合并(集合天然去重) Seq(qe.logical, qe.analyzed, qe.optimizedPlan).foreach(scan(_, sources, dests)) report(Lineage(sources.toSeq.sorted, dests.toSeq.sorted)) // 去重 + 排序 } // 单阶段提取:读端 relation → sources,写端 command/target → dests private def scan(plan: LogicalPlan, sources: mutable.Set[LineageDataset], dests: mutable.Set[LineageDataset]): Unit = plan.foreach { // —— 读端 relation → source —— case LogicalRelation(h: HadoopFsRelation, _, cat, _) => sources ++= asDataset(h, cat) case r: HiveTableRelation => sources += asTable(r.tableMeta) case r: DataSourceV2Relation => sources += asV2(r.table) // —— 写端 command / target → destination —— case c: InsertIntoHadoopFsRelationCommand => dests ++= asDataset(c) case c: InsertIntoHiveTable => dests += asTable(c.table) case i: InsertIntoStatement => dests ++= asTarget(i.table) // 未 resolve 的写 case w: V2WriteCommand => dests += asV2(w.table) // Append/Overwrite… case c: CreateTableAsSelect => dests ++= asCreate(c) // CTAS case r: ReplaceTableAsSelect => dests ++= asCreate(r) // RTAS case _ => } // asDataset/asTable/asV2/asTarget/asCreate:把各 connector 的 path·identifier·name 归一成 // LineageDataset(字段访问随 Spark 版本而异, 这里只示意分流逻辑) }
5.5 计划节点 → source / sink 映射
| 计划节点 | 角色 | 提取什么 |
|---|---|---|
| LogicalRelation(HadoopFsRelation) | source | location.rootPaths(路径)/ catalogTable(表) |
| HiveTableRelation | source | tableMeta.identifier(表)/ storage.locationUri(路径) |
| DataSourceV2Relation | source | table.name() / identifier(V2 表) |
| InsertIntoHadoopFsRelationCommand | sink | outputPath(路径)/ catalogTable(表) |
| InsertIntoHiveTable | sink | table.identifier(表)/ storage(路径) |
| InsertIntoStatement(未 resolve) | sink | target relation → 名字(多见于 logical 阶段) |
| AppendData / OverwriteByExpression(V2 写) | sink | table(V2 表 / 路径) |
| CreateTableAsSelect / ReplaceTableAsSelect | sink | 目标 identifier(CTAS / RTAS) |
5.6 注意事项
withNewExecutionId 在 post End 后立即移除 QE,监听线程异步处理 End 时往往已被移除。稳妥:Start 缓存 QE、End 按 id 回查(Start 时 QE 在整段执行期内有效,几乎总能拿到)。
FileSource / Hive 是 V1 节点;Delta / Iceberg / Kafka / JDBC 多用 DataSourceV2(DataSourceV2Relation / AppendData …),需补对应 case。
LogicalRelation / InsertIntoHadoopFsRelationCommand / SQLExecution 等是 Spark 内部 API;建议封装并按版本做兼容。
三阶段 union 后,LineageDataset 的归一化是关键:表↔底层 path 是否同源、fully-qualified vs 相对路径、结尾斜杠、hdfs://nn vs hdfs://nn:8020、库表名大小写——键不归一化会“看着像两个其实是一个”。
rootPaths 是数据源根路径(分区裁剪后实际读取的不一定都在此),写出常先落 staging/临时目录再 rename——按需归一。
要稳,优先用成熟方案(OpenLineage spark-listener · Spline · Atlas Spark connector),它们已处理好各版本与各数据源的差异。
另外:若不强求走 Core 监听器这条路,QueryExecutionListener 会被直接递交 QueryExecution(无注册表竞态),提取血缘更省心——本节坚持用 SQLExecutionEnd + 缓存回查 QueryExecution,是因为命题就是「用 Core Listener 扩展」。