defmain(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = newApplicationMasterArguments(args) ... master = newApplicationMaster(amArgs) System.exit(master.run()) }
finaldefrun(): Int = { doAsUser { runImpl() } exitCode }
privatedefrunImpl(): Unit = { try { val appAttemptId = client.getAttemptId()
...
// This shutdown hook should run *after* the SparkContext is shut down. val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 ShutdownHookManager.addShutdownHook(priority) { () => val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) { // The default state of ApplicationMaster is failed if it is invoked by shut down hook. // This behavior is different compared to 1.x version. // If user application is exited ahead of time by calling System.exit(N), here mark // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call // System.exit(0) to terminate the application. finish(finalStatus, ApplicationMaster.EXIT_EARLY, "Shutdown hook called before final status was reported.") }
if (!unregistered) { // we only want to unregister if we don't want the RM to retry if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { unregister(finalStatus, finalMsg) cleanupStagingDir() } } }
if (isClusterMode) { runDriver() } else { runExecutorLauncher() } } catch { case e: Exception => // catch everything else if not specifically handled logError("Uncaught exception: ", e) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) } finally { try { metricsSystem.foreach { ms => ms.report() ms.stop() } } catch { case e: Exception => logWarning("Exception during stopping of the metric system: ", e) } } }
privatedefrunDriver(): Unit = { addAmIpFilter(None) userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { ... } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. thrownewIllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkExceptionif e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
privatedefstartUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") ...... val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]])
privatedefsparkContextInitialized(sc: SparkContext) = { sparkContextPromise.synchronized { // Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) // Pause the user class thread in order to make proper initialization in runDriver function. sparkContextPromise.wait() } }
ERRORApplicationMaster: SparkContext did not initialize after waiting for100000 ms. Please check earlier log output for errors. Failing the application.