摘要:6.由于我们这里是Standalone模式,会映射到 org.apache.spark.deploy.Client 直接调用main()函数,代码如下:。接上篇文章: Spark2.2.0精通:详解Master端任务调度schedule()函数 ,这里从client出发,详细用户通过spark-submit提交任务后,Spark集群如何进行提交命令处理。

接上篇文章: Spark2.2.0精通:详解Master端任务调度schedule()函数 ,这里从client出发,详细用户通过spark-submit提交任务后,Spark集群如何进行提交命令处理;由于上篇文章主要只提到了Master端如何进行Executor的启动,没有讲解Driver的启动,这里结合spark-submit任务提交,把Driver的提交和启动一块讲了。

1.spark-submit提交任务,调用的脚本spark-submit.sh,脚本基本没啥操作,直接调用org.apache.spark.deploy.SparkSubmit类,比较简单;

if [ -z "${SPARK_HOME}" ]; then

source "$(dirname "$0")"/find-spark-home

fi

# disable randomized hash for string in Python 3.3+

export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

2.直接找到SparkSubmit的main()函数,代码如下:

override def main(args: Array[String]): Unit = {

val appArgs = new SparkSubmitArguments(args)

if (appArgs.verbose) {

// scalastyle:off println

printStream.println(appArgs)

// scalastyle:on println

}

appArgs.action match {

//调用submit()函数,参数是用户提交指定参数

case SparkSubmitAction.SUBMIT => submit(appArgs)

case SparkSubmitAction.KILL => kill(appArgs)

case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)

}

}

3.submit()函数,主要是进行了一些初始化,然后提交了Driver

@tailrec

private def submit(args: SparkSubmitArguments): Unit = {

//prepareSubmitEnvironment()函数代码比较多,主要干了以下几件事:

// 1.根据用户参数,主要是master和deploye-mode,设置任务的提交模式

// 2.根据提交模式实例化主类,

//childArgs: 主要就是一些参数的

//childClasspath:这个就是classPath,jvm运行的class路径

//sysProps:一些系统参数

// childMainClass:接下来将要运行的主类

val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)


def doRunMain(): Unit = {

// proxyUser 一般为null 直接看下面

if (args.proxyUser != null) {

val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,

UserGroupInformation.getCurrentUser())

try {

proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {

override def run(): Unit = {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

})

} catch {

case e: Exception =>

// Hadoop's AuthorizationException suppresses the exception's stack trace, which

// makes the message printed to the output by the JVM not very helpful. Instead,

// detect exceptions with empty stack traces here, and treat them differently.

if (e.getStackTrace().length == 0) {

// scalastyle:off println

printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")

// scalastyle:on println

exitFn(1)

} else {

throw e

}

}

} else {

// 根据返回的数据,调用runMain,主要是打印了一些日志信息

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

}


// In standalone cluster mode, there are two submission gateways:

// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper

// (2) The new REST-based gateway introduced in Spark 1.3

// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over

// to use the legacy gateway if the master endpoint turns out to be not a REST server.

if (args.isStandaloneCluster && args.useRest) {

try {

// scalastyle:off println

printStream.println("Running Spark using the REST application submission protocol.")

// scalastyle:on println

doRunMain()

} catch {

// Fail over to use the legacy submission gateway

case e: SubmitRestConnectionException =>

printWarning(s"Master endpoint ${args.master} was not a REST server. " +

"Falling back to legacy submission gateway instead.")

args.useRest = false

submit(args)

}

// In all other modes, just run the main class as prepared

} else {

// 转了一大圈,最后调用和doRunMain()去执行

doRunMain()

}

}

4.这里才是spark-submit的核心,调用doRunMain(),他里面没啥东西,一般我们不设置代理,直接执行else:

/* 在standalone集群模式下,有两个提交网关:

* 1.使用org.apache.spark.deploy.Client作为包装器来使用传统的RPC网关

* 2.Spark 1.3中引入的基于rest的网关

* 第二种方法是Spark 1.3的默认行为,但是Spark submit将会失败

* 如果master不是一个REST服务器,那么它将无法使用REST网关。

*/

def doRunMain(): Unit = {

if (args.proxyUser != null) {

val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,

UserGroupInformation.getCurrentUser())

try {

proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {

override def run(): Unit = {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

})

} catch {

case e: Exception =>

// Hadoop's AuthorizationException suppresses the exception's stack trace, which

// makes the message printed to the output by the JVM not very helpful. Instead,

// detect exceptions with empty stack traces here, and treat them differently.

if (e.getStackTrace().length == 0) {

// scalastyle:off println

printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")

// scalastyle:on println

exitFn(1)

} else {

throw e

}

}

} else {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

}

5.最后调用runMain()函数,代码如下:

//当deploy mode为client时,执行用户自己编写的主方法

// 当deploy mode为cluster时,需要判断是否为REST提交,如果是则执行

// org.apache.spark.rest.RestSubmissionClient的主方法,如果不是则执行

// org.apache.spark.deploy.Client的主方法

private def runMain(

childArgs: Seq[String],

childClasspath: Seq[String],

sysProps: Map[String, String],

childMainClass: String,

verbose: Boolean): Unit = {

// scalastyle:off println

if (verbose) {

printStream.println(s"Main class:\n$childMainClass")

printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")

// sysProps may contain sensitive information, so redact before printing

printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")

printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")

printStream.println("\n")

}

// scalastyle:on println

//由于默认情况下,优先级SPARK_HOME/lib/jar包 > 用户程序中的jar包,

// 如果想让用户程序jar优先执行,那么要使用 spark.yarn.user.classpath.first (spark1.3以前)或者

// spark.executor.userClassPathFirst 和spark.driver.userClassPathFirst 参数。

val loader =

if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {

new ChildFirstURLClassLoader(new Array[URL](0),

Thread.currentThread.getContextClassLoader)

} else {

new MutableURLClassLoader(new Array[URL](0),

Thread.currentThread.getContextClassLoader)

}

Thread.currentThread.setContextClassLoader(loader)

//使用URLClassLoader加载jar包

for (jar <- childClasspath) {

addJarToClasspath(jar, loader)

}


for ((key, value) <- sysProps) {

System.setProperty(key, value)

}


var mainClass: Class[_] = null

try {

//获取用户指定的Main函数

mainClass = Utils.classForName(childMainClass)

} catch {

case e: ClassNotFoundException =>

e.printStackTrace(printStream)

if (childMainClass.contains("thriftserver")) {

// scalastyle:off println

printStream.println(s"Failed to load main class $childMainClass.")

printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")

// scalastyle:on println

}

System.exit(CLASS_NOT_FOUND_EXIT_STATUS)

case e: NoClassDefFoundError =>

e.printStackTrace(printStream)

if (e.getMessage.contains("org/apache/hadoop/hive")) {

// scalastyle:off println

printStream.println(s"Failed to load hive class.")

printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")

// scalastyle:on println

}

System.exit(CLASS_NOT_FOUND_EXIT_STATUS)

}


// SPARK-4170

if (classOf[scala.App].isAssignableFrom(mainClass)) {

printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")

}


val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

if (!Modifier.isStatic(mainMethod.getModifiers)) {

throw new IllegalStateException("The main method in the given main class must be static")

}


@tailrec

def findCause(t: Throwable): Throwable = t match {

case e: UndeclaredThrowableException =>

if (e.getCause() != null) findCause(e.getCause()) else e

case e: InvocationTargetException =>

if (e.getCause() != null) findCause(e.getCause()) else e

case e: Throwable =>

e

}


try {

//mainMethod.invoke 是通过反射来调用的 对应的主函数

mainMethod.invoke(null, childArgs.toArray)

} catch {

case t: Throwable =>

findCause(t) match {

case SparkUserAppException(exitCode) =>

System.exit(exitCode)


case t: Throwable =>

throw t

}

}

}

6.由于我们这里是Standalone模式,会映射到 org.apache.spark.deploy.Client 直接调用main()函数,代码如下:

def main(args: Array[String]) {

// scalastyle:off println

if (!sys.props.contains("SPARK_SUBMIT")) {

println("WARNING: This client is deprecated and will be removed in a future version of Spark")

println("Use ./bin/spark-submit with \"--master spark://host:port\"")

}

// scalastyle:on println

val conf = new SparkConf()

val driverArgs = new ClientArguments(args)


if (!conf.contains("spark.rpc.askTimeout")) {

conf.set("spark.rpc.askTimeout", "10s")

}

Logger.getRootLogger.setLevel(driverArgs.logLevel)


//准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,

// 通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints上

//端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信

val rpcEnv =

RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))


val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).

map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))

//之前说过只要调用setupEndpoint就会调用Onstart()函数,由于我们是提交任务

// 这里会直接调用“launch”事件处理函数

rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))


rpcEnv.awaitTermination()

}

7.我们直接看onStart()函数,代码如下:

case "launch" =>

//这里会启动DriverWrapper类

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

//下面都是指定了一些classPath libpath 不再讲解

val classPathConf = "spark.driver.extraClassPath"

val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>

cp.split(java.io.File.pathSeparator)

}


val libraryPathConf = "spark.driver.extraLibraryPath"

val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>

cp.split(java.io.File.pathSeparator)

}


val extraJavaOptsConf = "spark.driver.extraJavaOptions"

val extraJavaOpts = sys.props.get(extraJavaOptsConf)

.map(Utils.splitCommandString).getOrElse(Seq.empty)

val sparkJavaOpts = Utils.sparkJavaOpts(conf)

val javaOpts = sparkJavaOpts ++ extraJavaOpts

val command = new Command(mainClass,

Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,

sys.env, classPathEntries, libraryPathEntries, javaOpts)

//DriverWrapper封装到command中,然后封装到driverDescription中

val driverDescription = new DriverDescription(

driverArgs.jarUrl,

driverArgs.memory,

driverArgs.cores,

driverArgs.supervise,

command)

//这里会调用RequestSubmitDriver,向Master发送RequestSubmitDriver消息

//注意这里Master接收消息,处理完成后会进行回复的,回复SubmitDriverResponse消息

//会在receive()函数进行处理

ayncSendToMasterAndForwardReply[SubmitDriverResponse](

RequestSubmitDriver(driverDescription))

8.Master端接收到 RequestSubmitDriver 消息,在receiveAndReply()函数进行对应事件的处理,代码如下:

case RequestSubmitDriver(description) =>

//如果数据恢复状态,直接跳过

if (state != RecoveryState.ALIVE) {

val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

"Can only accept driver submissions in ALIVE state."

context.reply(SubmitDriverResponse(self, false, None, msg))

} else {

//创建driver,然后加入持久化引擎中,将driver添加到等待队列

logInfo("Driver submitted " + description.command.mainClass)

val driver = createDriver(description)

persistenceEngine.addDriver(driver)

waitingDrivers += driver

drivers.add(driver)

//资源调度函数已经在上一篇文章讲解:Spark2.2.0精通:详解Master端任务调度schedule()函数

schedule()


// TODO: It might be good to instead have the submission client poll the master to determine

// the current status of the driver. For now it's simply "fire and forget".

context.reply(SubmitDriverResponse(self, true, Some(driver.id),

s"Driver successfully submitted as ${driver.id}"))

}

至此,用户通过spark-submit提交任务到Driver启动的整个流程基本都梳理清楚了。

相关文章