摘要:6.由於我們這裏是Standalone模式,會映射到 org.apache.spark.deploy.Client 直接調用main()函數,代碼如下:。接上篇文章: Spark2.2.0精通:詳解Master端任務調度schedule()函數 ,這裏從client出發,詳細用戶通過spark-submit提交任務後,Spark集羣如何進行提交命令處理。

接上篇文章: Spark2.2.0精通:詳解Master端任務調度schedule()函數 ,這裏從client出發,詳細用戶通過spark-submit提交任務後,Spark集羣如何進行提交命令處理;由於上篇文章主要只提到了Master端如何進行Executor的啓動,沒有講解Driver的啓動,這裏結合spark-submit任務提交,把Driver的提交和啓動一塊講了。

1.spark-submit提交任務,調用的腳本spark-submit.sh,腳本基本沒啥操作,直接調用org.apache.spark.deploy.SparkSubmit類,比較簡單;

if [ -z "${SPARK_HOME}" ]; then

source "$(dirname "$0")"/find-spark-home

fi

# disable randomized hash for string in Python 3.3+

export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

2.直接找到SparkSubmit的main()函數,代碼如下:

override def main(args: Array[String]): Unit = {

val appArgs = new SparkSubmitArguments(args)

if (appArgs.verbose) {

// scalastyle:off println

printStream.println(appArgs)

// scalastyle:on println

}

appArgs.action match {

//調用submit()函數,參數是用戶提交指定參數

case SparkSubmitAction.SUBMIT => submit(appArgs)

case SparkSubmitAction.KILL => kill(appArgs)

case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)

}

}

3.submit()函數,主要是進行了一些初始化,然後提交了Driver

@tailrec

private def submit(args: SparkSubmitArguments): Unit = {

//prepareSubmitEnvironment()函數代碼比較多,主要乾了以下幾件事:

// 1.根據用戶參數,主要是master和deploye-mode,設置任務的提交模式

// 2.根據提交模式實例化主類,

//childArgs: 主要就是一些參數的

//childClasspath:這個就是classPath,jvm運行的class路徑

//sysProps:一些系統參數

// childMainClass:接下來將要運行的主類

val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)


def doRunMain(): Unit = {

// proxyUser 一般爲null 直接看下面

if (args.proxyUser != null) {

val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,

UserGroupInformation.getCurrentUser())

try {

proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {

override def run(): Unit = {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

})

} catch {

case e: Exception =>

// Hadoop's AuthorizationException suppresses the exception's stack trace, which

// makes the message printed to the output by the JVM not very helpful. Instead,

// detect exceptions with empty stack traces here, and treat them differently.

if (e.getStackTrace().length == 0) {

// scalastyle:off println

printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")

// scalastyle:on println

exitFn(1)

} else {

throw e

}

}

} else {

// 根據返回的數據,調用runMain,主要是打印了一些日誌信息

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

}


// In standalone cluster mode, there are two submission gateways:

// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper

// (2) The new REST-based gateway introduced in Spark 1.3

// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over

// to use the legacy gateway if the master endpoint turns out to be not a REST server.

if (args.isStandaloneCluster && args.useRest) {

try {

// scalastyle:off println

printStream.println("Running Spark using the REST application submission protocol.")

// scalastyle:on println

doRunMain()

} catch {

// Fail over to use the legacy submission gateway

case e: SubmitRestConnectionException =>

printWarning(s"Master endpoint ${args.master} was not a REST server. " +

"Falling back to legacy submission gateway instead.")

args.useRest = false

submit(args)

}

// In all other modes, just run the main class as prepared

} else {

// 轉了一大圈,最後調用和doRunMain()去執行

doRunMain()

}

}

4.這裏纔是spark-submit的核心,調用doRunMain(),他裏面沒啥東西,一般我們不設置代理,直接執行else:

/* 在standalone集羣模式下,有兩個提交網關:

* 1.使用org.apache.spark.deploy.Client作爲包裝器來使用傳統的RPC網關

* 2.Spark 1.3中引入的基於rest的網關

* 第二種方法是Spark 1.3的默認行爲,但是Spark submit將會失敗

* 如果master不是一個REST服務器,那麼它將無法使用REST網關。

*/

def doRunMain(): Unit = {

if (args.proxyUser != null) {

val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,

UserGroupInformation.getCurrentUser())

try {

proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {

override def run(): Unit = {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

})

} catch {

case e: Exception =>

// Hadoop's AuthorizationException suppresses the exception's stack trace, which

// makes the message printed to the output by the JVM not very helpful. Instead,

// detect exceptions with empty stack traces here, and treat them differently.

if (e.getStackTrace().length == 0) {

// scalastyle:off println

printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")

// scalastyle:on println

exitFn(1)

} else {

throw e

}

}

} else {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

}

5.最後調用runMain()函數,代碼如下:

//當deploy mode爲client時,執行用戶自己編寫的主方法

// 當deploy mode爲cluster時,需要判斷是否爲REST提交,如果是則執行

// org.apache.spark.rest.RestSubmissionClient的主方法,如果不是則執行

// org.apache.spark.deploy.Client的主方法

private def runMain(

childArgs: Seq[String],

childClasspath: Seq[String],

sysProps: Map[String, String],

childMainClass: String,

verbose: Boolean): Unit = {

// scalastyle:off println

if (verbose) {

printStream.println(s"Main class:\n$childMainClass")

printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")

// sysProps may contain sensitive information, so redact before printing

printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")

printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")

printStream.println("\n")

}

// scalastyle:on println

//由於默認情況下,優先級SPARK_HOME/lib/jar包 > 用戶程序中的jar包,

// 如果想讓用戶程序jar優先執行,那麼要使用 spark.yarn.user.classpath.first (spark1.3以前)或者

// spark.executor.userClassPathFirst 和spark.driver.userClassPathFirst 參數。

val loader =

if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {

new ChildFirstURLClassLoader(new Array[URL](0),

Thread.currentThread.getContextClassLoader)

} else {

new MutableURLClassLoader(new Array[URL](0),

Thread.currentThread.getContextClassLoader)

}

Thread.currentThread.setContextClassLoader(loader)

//使用URLClassLoader加載jar包

for (jar <- childClasspath) {

addJarToClasspath(jar, loader)

}


for ((key, value) <- sysProps) {

System.setProperty(key, value)

}


var mainClass: Class[_] = null

try {

//獲取用戶指定的Main函數

mainClass = Utils.classForName(childMainClass)

} catch {

case e: ClassNotFoundException =>

e.printStackTrace(printStream)

if (childMainClass.contains("thriftserver")) {

// scalastyle:off println

printStream.println(s"Failed to load main class $childMainClass.")

printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")

// scalastyle:on println

}

System.exit(CLASS_NOT_FOUND_EXIT_STATUS)

case e: NoClassDefFoundError =>

e.printStackTrace(printStream)

if (e.getMessage.contains("org/apache/hadoop/hive")) {

// scalastyle:off println

printStream.println(s"Failed to load hive class.")

printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")

// scalastyle:on println

}

System.exit(CLASS_NOT_FOUND_EXIT_STATUS)

}


// SPARK-4170

if (classOf[scala.App].isAssignableFrom(mainClass)) {

printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")

}


val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

if (!Modifier.isStatic(mainMethod.getModifiers)) {

throw new IllegalStateException("The main method in the given main class must be static")

}


@tailrec

def findCause(t: Throwable): Throwable = t match {

case e: UndeclaredThrowableException =>

if (e.getCause() != null) findCause(e.getCause()) else e

case e: InvocationTargetException =>

if (e.getCause() != null) findCause(e.getCause()) else e

case e: Throwable =>

e

}


try {

//mainMethod.invoke 是通過反射來調用的 對應的主函數

mainMethod.invoke(null, childArgs.toArray)

} catch {

case t: Throwable =>

findCause(t) match {

case SparkUserAppException(exitCode) =>

System.exit(exitCode)


case t: Throwable =>

throw t

}

}

}

6.由於我們這裏是Standalone模式,會映射到 org.apache.spark.deploy.Client 直接調用main()函數,代碼如下:

def main(args: Array[String]) {

// scalastyle:off println

if (!sys.props.contains("SPARK_SUBMIT")) {

println("WARNING: This client is deprecated and will be removed in a future version of Spark")

println("Use ./bin/spark-submit with \"--master spark://host:port\"")

}

// scalastyle:on println

val conf = new SparkConf()

val driverArgs = new ClientArguments(args)


if (!conf.contains("spark.rpc.askTimeout")) {

conf.set("spark.rpc.askTimeout", "10s")

}

Logger.getRootLogger.setLevel(driverArgs.logLevel)


//準備rpcEnv環境,之後通過master的地址獲取masterEndpoints端點相關信息,

// 通過rpcEnv註冊相關clientEndPoint端點信息,同時需要注意,這裏會把masterEndpoints上

//端點信息也作爲構造ClientEndpoint端點的參數,也就是說這個ClientEndpoint會和masterEndpoints通信

val rpcEnv =

RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))


val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).

map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))

//之前說過只要調用setupEndpoint就會調用Onstart()函數,由於我們是提交任務

// 這裏會直接調用“launch”事件處理函數

rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))


rpcEnv.awaitTermination()

}

7.我們直接看onStart()函數,代碼如下:

case "launch" =>

//這裏會啓動DriverWrapper類

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

//下面都是指定了一些classPath libpath 不再講解

val classPathConf = "spark.driver.extraClassPath"

val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>

cp.split(java.io.File.pathSeparator)

}


val libraryPathConf = "spark.driver.extraLibraryPath"

val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>

cp.split(java.io.File.pathSeparator)

}


val extraJavaOptsConf = "spark.driver.extraJavaOptions"

val extraJavaOpts = sys.props.get(extraJavaOptsConf)

.map(Utils.splitCommandString).getOrElse(Seq.empty)

val sparkJavaOpts = Utils.sparkJavaOpts(conf)

val javaOpts = sparkJavaOpts ++ extraJavaOpts

val command = new Command(mainClass,

Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,

sys.env, classPathEntries, libraryPathEntries, javaOpts)

//DriverWrapper封裝到command中,然後封裝到driverDescription中

val driverDescription = new DriverDescription(

driverArgs.jarUrl,

driverArgs.memory,

driverArgs.cores,

driverArgs.supervise,

command)

//這裏會調用RequestSubmitDriver,向Master發送RequestSubmitDriver消息

//注意這裏Master接收消息,處理完成後會進行回覆的,回覆SubmitDriverResponse消息

//會在receive()函數進行處理

ayncSendToMasterAndForwardReply[SubmitDriverResponse](

RequestSubmitDriver(driverDescription))

8.Master端接收到 RequestSubmitDriver 消息,在receiveAndReply()函數進行對應事件的處理,代碼如下:

case RequestSubmitDriver(description) =>

//如果數據恢復狀態,直接跳過

if (state != RecoveryState.ALIVE) {

val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

"Can only accept driver submissions in ALIVE state."

context.reply(SubmitDriverResponse(self, false, None, msg))

} else {

//創建driver,然後加入持久化引擎中,將driver添加到等待隊列

logInfo("Driver submitted " + description.command.mainClass)

val driver = createDriver(description)

persistenceEngine.addDriver(driver)

waitingDrivers += driver

drivers.add(driver)

//資源調度函數已經在上一篇文章講解:Spark2.2.0精通:詳解Master端任務調度schedule()函數

schedule()


// TODO: It might be good to instead have the submission client poll the master to determine

// the current status of the driver. For now it's simply "fire and forget".

context.reply(SubmitDriverResponse(self, true, Some(driver.id),

s"Driver successfully submitted as ${driver.id}"))

}

至此,用戶通過spark-submit提交任務到Driver啓動的整個流程基本都梳理清楚了。

相關文章