Spark2.x精通:從spark-submit提交到driver啓動
摘要:6.由於我們這裏是Standalone模式,會映射到 org.apache.spark.deploy.Client 直接調用main()函數,代碼如下:。接上篇文章: Spark2.2.0精通:詳解Master端任務調度schedule()函數 ,這裏從client出發,詳細用戶通過spark-submit提交任務後,Spark集羣如何進行提交命令處理。
接上篇文章: Spark2.2.0精通:詳解Master端任務調度schedule()函數 ,這裏從client出發,詳細用戶通過spark-submit提交任務後,Spark集羣如何進行提交命令處理;由於上篇文章主要只提到了Master端如何進行Executor的啓動,沒有講解Driver的啓動,這裏結合spark-submit任務提交,把Driver的提交和啓動一塊講了。
1.spark-submit提交任務,調用的腳本spark-submit.sh,腳本基本沒啥操作,直接調用org.apache.spark.deploy.SparkSubmit類,比較簡單;
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
2.直接找到SparkSubmit的main()函數,代碼如下:
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
//調用submit()函數,參數是用戶提交指定參數
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
3.submit()函數,主要是進行了一些初始化,然後提交了Driver
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
//prepareSubmitEnvironment()函數代碼比較多,主要乾了以下幾件事:
// 1.根據用戶參數,主要是master和deploye-mode,設置任務的提交模式
// 2.根據提交模式實例化主類,
//childArgs: 主要就是一些參數的
//childClasspath:這個就是classPath,jvm運行的class路徑
//sysProps:一些系統參數
// childMainClass:接下來將要運行的主類
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
// proxyUser 一般爲null 直接看下面
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
// 根據返回的數據,調用runMain,主要是打印了一些日誌信息
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
// 轉了一大圈,最後調用和doRunMain()去執行
doRunMain()
}
}
4.這裏纔是spark-submit的核心,調用doRunMain(),他裏面沒啥東西,一般我們不設置代理,直接執行else:
/* 在standalone集羣模式下,有兩個提交網關:
* 1.使用org.apache.spark.deploy.Client作爲包裝器來使用傳統的RPC網關
* 2.Spark 1.3中引入的基於rest的網關
* 第二種方法是Spark 1.3的默認行爲,但是Spark submit將會失敗
* 如果master不是一個REST服務器,那麼它將無法使用REST網關。
*/
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
5.最後調用runMain()函數,代碼如下:
//當deploy mode爲client時,執行用戶自己編寫的主方法
// 當deploy mode爲cluster時,需要判斷是否爲REST提交,如果是則執行
// org.apache.spark.rest.RestSubmissionClient的主方法,如果不是則執行
// org.apache.spark.deploy.Client的主方法
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
//由於默認情況下,優先級SPARK_HOME/lib/jar包 > 用戶程序中的jar包,
// 如果想讓用戶程序jar優先執行,那麼要使用 spark.yarn.user.classpath.first (spark1.3以前)或者
// spark.executor.userClassPathFirst 和spark.driver.userClassPathFirst 參數。
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
//使用URLClassLoader加載jar包
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
//獲取用戶指定的Main函數
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
//mainMethod.invoke 是通過反射來調用的 對應的主函數
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
6.由於我們這裏是Standalone模式,會映射到 org.apache.spark.deploy.Client 直接調用main()函數,代碼如下:
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//準備rpcEnv環境,之後通過master的地址獲取masterEndpoints端點相關信息,
// 通過rpcEnv註冊相關clientEndPoint端點信息,同時需要注意,這裏會把masterEndpoints上
//端點信息也作爲構造ClientEndpoint端點的參數,也就是說這個ClientEndpoint會和masterEndpoints通信
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//之前說過只要調用setupEndpoint就會調用Onstart()函數,由於我們是提交任務
// 這裏會直接調用“launch”事件處理函數
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
7.我們直接看onStart()函數,代碼如下:
case "launch" =>
//這裏會啓動DriverWrapper類
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
//下面都是指定了一些classPath libpath 不再講解
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
//DriverWrapper封裝到command中,然後封裝到driverDescription中
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
//這裏會調用RequestSubmitDriver,向Master發送RequestSubmitDriver消息
//注意這裏Master接收消息,處理完成後會進行回覆的,回覆SubmitDriverResponse消息
//會在receive()函數進行處理
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
8.Master端接收到 RequestSubmitDriver 消息,在receiveAndReply()函數進行對應事件的處理,代碼如下:
case RequestSubmitDriver(description) =>
//如果數據恢復狀態,直接跳過
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
//創建driver,然後加入持久化引擎中,將driver添加到等待隊列
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
//資源調度函數已經在上一篇文章講解:Spark2.2.0精通:詳解Master端任務調度schedule()函數
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
至此,用戶通過spark-submit提交任務到Driver啓動的整個流程基本都梳理清楚了。