Spark Application Lifecycle

一个用户写的Spark任务的Jar被提交之后,是怎么运行的,为什么查看问题的时候看到是am log而不是driver log?为什么有时候任务明明是执行成功的,但是最后显示的是失败?为什么任务还没开始跑就已经失败了?带着这些问题本文梳理了一个Spark Application的生命周期。

目前大多数公司的Spark任务都是跑在Yarn集群上,且部署模式是Cluster模式,如上图所示,客户端提交之后,首先启动的是AM(Application Master)然后启动Driver(Driver是AM的一个线程,这里知道为什么看的是am log了吧),然后去申请资源启动Container来运行Executor进程,执行被分发的task。

下面是ApplicationMaster.scala中的几个关键的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
...
master = new ApplicationMaster(amArgs)
System.exit(master.run())
}

final def run(): Int = {
doAsUser {
runImpl()
}
exitCode
}

private def runImpl(): Unit = {
try {
val appAttemptId = client.getAttemptId()

...

// This shutdown hook should run *after* the SparkContext is shut down.
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts

if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
ApplicationMaster.EXIT_EARLY,
"Shutdown hook called before final status was reported.")
}

if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir()
}
}
}

if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
try {
metricsSystem.foreach { ms =>
ms.report()
ms.stop()
}
} catch {
case e: Exception =>
logWarning("Exception during stopping of the metric system: ", e)
}
}
}

在ApplicationMaster的入口main函数中会new一个ApplicationMaster的对象然后在runImpl中调用runDriver。这里需要注意的是在runDriver之前,会向ShutdownHookManager中注册一个hook,有两段逻辑,一段是当用户代码中有System.exit时,提前结束了AM进程,会记录错误码和debug信息,遇到这个log,就要检查代码中是否有System.exit了。

1
INFO ApplicationMaster: Final app status: FAILED, exitCode: 16, (reason: Shutdown hook called before final status was reported.)

还有一段是判断Application是否执行成功,如果成功,就向RM发送unregister请求,只有unregister完成了才算任务真正成功,之前线上有因为ShutdownHookManager的超时设置偏小,造成unregister还没成功,进程就退出了,最后被判定位任务失败。

继续看runDriver的逻辑,runDriver也在ApplicationMaster.scala中,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private def runDriver(): Unit = {
addAmIpFilter(None)
userClassThread = startUserApplication()

// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
...
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
resumeDriver()
}
}

在这个函数中可以看到,startUserApplication这个函数返回了一个线程,这里就是上文提到的,driver是am的一个线程。下面会await一个sparkContextPromise.future,超时阈值是spark.yarn.am.waitTime配置的(默认100s),那这里是等待什么呢?这里需要看startUserApplication函数在做什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
......
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])

val userThread = new Thread {
override def run() {
try {

mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)

}
}
.......
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread

可以看到在startUserApplication中,在获取用户类也就是提交的Spark任务的main函数,然后创建一个线程,线程的run方法中调用main函数,结束后设置Application状态为SUCCEEDED(如果有失败,会在异常处理中设置状态,这里不展开)。在用户写的Spark Application中,不过是旧的直接创建SparkContext,还是新使用SparkSession的方式都会创建SparkContext。在SparkContext初始化时,会有以下几个逻辑

1
2
3
4
5
6
7
8
9
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
......
_taskScheduler.start()
.......
// Post init
_taskScheduler.postStartHook()

_taskScheduler是YarnClusterScheduler的实例(具体在后续文章详解),在初始化结束时,_taskScheduler会调用post回调,在YarnClusterScheduler类的postStartHook方法中,会调用ApplicationMaster的sparkContextInitialized方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
......
}
}

ApplicationMaster.scala

private def sparkContextInitialized(sc: SparkContext) = {
sparkContextPromise.synchronized {
// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
// Pause the user class thread in order to make proper initialization in runDriver function.
sparkContextPromise.wait()
}
}

sparkContextInitialized函数向sparkContextPromise发送了notify, 上文提到的runDriver等待的正好是这个通知,如果超过spark.yarn.am.waitTime配置的时长,任务就会失败,并看到类似下面的log

1
ERROR ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application.

碰到这样的log,大概率就是用户在初始化SparkContext之前做了很耗时的操作,比如,查询mysql,通过上述代码分析,可以看到在Yarn cluster模式下,一个Spark Application的生命周期是由Yarn的ApplicationMaster控制,SparkContext初始化详细过程,YarnClusterScheduler具体是什么,在后续的文章中会展开讲。