作者·黃崇遠

『數據蟲巢』

全文共 9000

題圖ssyer.com

 Python訓練VS並行化預測,無痛人流般的解決方案,你值得擁有。

理解本文需要有一定的技術基礎,包括對於Xgboost的基本理解以及使用經驗,基本的Spark開發能力,如果對於Xgboost4j-spark有一定的瞭解就更好了。

01

訴求背景

在這裏,我就不做XGB的科普了,如果不清楚的,請自行谷歌,官網。

首先我們確定我們需要做的事情,那就是嘗試在Python單機的環境下訓練模型,獲取到模型文件,然後加載在Spark環境中做並行預測,涉及到並行預測會用到XGBoost4j-spark框架。

這看起來是一個僞訴求,爲什麼會存在使用單機來訓練,然後跑到Spark上預測的這種訴求,比如存在以下幾個問題。

(1) 爲什麼不直接在單機,比如Python的XGB上進行Train以及Predict?

(2) 爲什麼不直接在XGBoost4j-spark上做Train以及Predict?

上面兩個問題簡直就是靈魂拷問,看似合情合理,無法推翻。來,讓我們來逐一探討一下。

  • 先說單機Python的XGB。

我們使用XGB通常是分類或者回歸場景,是一種相對偏傳統的做法(非深度學習系列),所以帶標的數據樣本量級通常不會太大,幾萬是常見的,幾十萬也能接受,上百萬也能處理的了(單機的內存和核稍微大點),所以在Python單機上訓練是沒有太多壓力的。

並且,Python本來就是一個腳本式的語言,所以代碼非常簡潔,跑起來非常快,離線部署起來也不難。由於Python對應的很多數據處理的相關庫,對於數據探索,特徵挖掘,進而進行模型調優是非常便捷的。

如果是處於一種正常迭代的情況下,你可能會處於反覆調整採樣方式,不斷的增減模型,不斷的調參的這種循環中,這需要一個輕量級並且靈活的環境來支持,而Python的環境恰巧非常符合。這意味着Python對應的XGB環境是比較利於這種良好迭代節奏的。

說完了訓練,說預測,並且這裏說的不是服務化實時預測,說的是批量離線預測的話題,因爲如果是實時,就不存在使不使用Spark做並行預測的話題了。

如果說我們的預測場景是幾十上百萬,又甚至是上千萬的量級,其實單機都能擼的過來,再不行就做多線程嘛(Python無法做多線程,但是邏輯裏可以做多進程的方式來實現並行),再不行就切割數據,分散到多個節點嘛,最後再聚攏數據。

我曾把模型利用自己寫的調度腳本進行並行化,並將預處理和預測錯峯執行,再同時並行三個模型預測,調調一些調度參數,把128GB內存,以及64核的單機服務器打的滿滿的,2個小時3個模型分別做1億多數據的二分類。

OK,這裏是1億數據,如果是2億,3億,5億呢?然後如果不止3個模型,是十個八個模型呢?解決肯定是可以解決的,但是做資源分配,並行處理,甚至多機拆解任務會把人搞死。關鍵一旦跑起來之後這個機器就基本上幹不了其他的了,這意味着這壓根兒不存在啥資源調度的問題。

所以,需要解決這種大規模多模型預測對於資源的消耗問題,甚至是效率問題。

  • 說完了單機,說Spark的多機,指的是XGB的spark框架代表XGBoost4j-spark。

模型並行化,好處當然大家都知道的,大規模的數據不管是訓練還是預測,都“咻咻”的,並且不需要考慮訓練或者預測資源的問題,資源不夠多配點excute就好了,別說3億數據,只要集羣中有資源,10億我都給你分分鐘預測出來。

所以預測這層天然是符合這種大規模離線預測場景的。

回到訓練,除了上述單機場景中的迭代領域這個優點也就是對應Spark的缺點。spark其實相對來說是一個比較笨重的框架,任務提交、任務的響應和執行需要比較費時,如果資源臨時不夠還得排排隊,這對於我們需要快速靈活迭代模型的訴求是相悖的。

除了上面的缺點還有缺點嗎?有的,XGBoost4j-spark的訓練過程在數據量少的情況下,其訓練帶來的精度有可能是低於單機的,相當於進一步稀釋了訓練樣本,這在於訓練樣本數本來就不算特別多的場景中,是有一定的影響的。

關於這一點,我們在後面拆解XGBoost4j-spark源碼的時候再來進一步說明。

所以,在這裏,在訓練數據量百萬級以內,離線預測量級在數億以上的場景中,單機訓練Spark並行預測的搭配太合適了,簡直就是天造地設的...

好吧,差點出口成章了。既然如此般配,那麼可以直接Python版的XGB訓練好的Model,直接丟到XGBoost4j-spark中load,然後愉快的預測呢?

答案是不行!竟然是不行。我也很納悶,爲啥不行。做個工作流每天讓Python定時訓練模型,然後丟到Spark環境中每天做預測,然後還一邊用Python來調模型,一旦模型指標非常OK的,離線驗證之後,直接丟到Spark流程中替換。

這個過程是多麼的自然,但結果竟然是不行。說好的XGBoost4j-spark是Xgboost的分支項目的呢,這也不像親兒子啊。

真的是不行,所以纔有了這個文章,和研究方向。我試圖從源碼中探索爲什麼不行,理論上一定行的事,然後找到這個路徑的解決方案。

02

XGBoost單機源碼拆解

代碼來源: git clone --recursivehttps://github.com/dmlc/xgboost

這是XGB在github上的開源代碼,其中也包括了XGBoost4j-spark這個分支的項目代碼,一箭雙鵰。

首先來看下XGB主體部分的代碼目錄:

|--xgboost

|--include

|--xgboost //定義了 xgboost 相關的頭文件

|--src

|--c_api              

|--common //一些通用文件,如對配置文件的處理

|--data //使用的數據結構,如 DMatrix

|--gbm //定義了若分類器,如 gbtree 和 gblinear

|--metric //定義評價函數

|--objective //定義目標函數

|--tree  //對樹的一些列操作

從目錄結構的角度看,代碼層次結構,代碼模塊是非常乾淨利落的。

由於我們的重點不在於XGB的單機代碼,如果對XGB感興趣的,可以沿着 cli_main.cc的執行入口,再到訓練的核心方法 CLITrain(param),再到Learner::UpdateOneIter()的實際樹更新邏輯,再到Learner裏頭實現 ObjFunction::GetGradient() 梯度求解的過程(包括Loss函數,一階和二階的導師計算)。

然後在Tree的執行邏輯裏,提供了上述所說的Updater邏輯,實際建樹的各種策略。

|--src

|--tree

|--updater_basemaker-inl.h  //定義了派生子TreeUpdater的類BaseMaker

|--updater_colmaker.cc  

//ColMaker使用基於枚舉貪婪搜索算法,通過枚舉所有特徵來尋找最佳分裂點

|--updater_skmaker.cc   

//SkMaker派生自BaseMaker的,使用,使用近似sketch方法尋找最佳分裂點

|--updater_refresh.cc  //TreeRefresher用於刷新數據集上樹的統計信息和葉子指

|--updater_prune.cc    //TreePruner是樹的兼職操作

|--updater_hismaker.cc  //HistMaker直方圖法

|--updater_fast_hist.cc  //快速直方圖

這裏對於單機版的源碼就不作過多分析和拆解,重點是Xgboost4j-spark。

03

XGBoost4j-spark的訓練過程

關於XGBoost基於Spark的分佈式實現,實際上是基於RABIT(dmlc開源的底層通信框架)實現。Spark版本的建樹邏輯基本和單機版一致,計算上區別在於:並行的特徵計算在不同的服務器上進行,計算結果需要通過消息在Tracker(負責控制計算邏輯)上進行彙總。這個動作是可以大大加快特徵層並行化的效率的。

並且,由於項目結構上的關係,比如XGBoost4j-spark本來就是作爲XGBoost的分支項目而存在的,所以一定不會大規模造輪子,能用的輪子儘量用,這是項目構建原則,所以,我們要找到Spark分佈式版本與單機版本的關聯性。

這張圖,在分析很多XGBoost並行化原理時都用到,具體原圖出自於哪,無法考究了,先借用下。

我們可以看到這是一個典型的Spark任務的邏輯結構圖,以Tracker爲殼子的Task在最外層,內層是單個RABIT節點,然後下面掛着一個看着很熟悉的XGBoost訓練任務。沒錯,不是很熟悉,是真的單機XGBoost訓練任務。

trainWithRDD方法內部會檢查cpu、nWorkers等參數,並完成資源分配計劃。接着啓動rabit tracker(用於管理實際計算的rabit worker),調用buildDistributedBoosters進行分佈式初始化和計算。

buildDistributedBoosters方法是核心部分。先根據參數nWorkers對數據進行repartition(nWorkers)操作,進行數據重分佈,接着,直接調用mapPartion進行partition層面的數據邏輯處理。可以看出,nWorkers決定了最終數據的partition情況,以及rabit worker的數量。在RDD partition內部,數據會先進行稀疏化處理,然後再轉爲JXGBoost接受的數據結構。接着,啓動rabit worker 和JXGBoost進行訓練。

至此spark 的分佈式模型轉化爲JXGBoost的分佈式訓練。從結果上看,spark就像是數據和資源管理的框架,核心的訓練過程還是封裝在rabit中。上述的接口調用的整體流程如下圖:

這圖依然是借用的,甚至上面兩段話的總結大部分都是參考過來的,跟我直接看源碼的結論差不多,所以總結湊合着看。下面,我們來拆解代碼。

  • 訓練的入口是XGBoostClassifier.fit,但實際上調用的是XGBoostClassificationModel.train

在這一步裏,有幾個關鍵動作,先對一些參數進行判斷。

=>判斷一些關鍵參數

+判斷 evalMetric

+判斷objectiveType

+判斷numClass

+判斷weightCol

+判斷baseMarginCol

然後一個是如上面總結所說的,先對數據進行了轉換,轉換入口是DataUtils.convertDataFrameToXGBLabeledPointRDDs,輸出是trainingSet,相當於數據已經轉換ready好。

=>轉換RDD類型:

DataUtils def convertDataFrameToXGBLabeledPointRDDs(

labelCol: Column,

featuresCol: Column,

weight: Column,

baseMargin: Column,

group: Option[Column],

dataFrames: DataFrame*)

PS:統一轉換爲dataset爲float類型

+分爲有特徵權重與無特徵權重兩種

+內部轉換分爲sparseVector和DenseVector兩個處理 value.map(_.tofloat)

+XGBLabeledPoint  //序列化?

更重要的動作在於:

val (_booster, _metrics) = XGBoost.trainDistributed(trainingSet, derivedXGBParamMap,  hasGroup = false, evalRDDMap)

val model = new XGBoostClassificationModel(uid, _numClasses, _booster)

這意味着,通過這個函數調用,最終拿到了model。我們進一步來看 trainDistributed內部的實現。

  • trainDistributed(XGBoost類)

核心的解釋是:通過Spark的Partitions到Rabit分佈式執行,返回booster。

其中包括parameterFetchAndValidation做關鍵參數處理,包括這些參數num_workers,num_round,use_external_memory,custom_obj,custom_eval,missing。

還有結合了Spark以及Rabit的斷點管理類:CheckpointManager方法。

對於對trainDf進行repartitionForTraining的composeInputData方法。

通過checkpointManager.loadCheckpointAsBooster來構建一個Booster。

到了關鍵之處,結合CheckPoint做樹訓練輪次的遍歷,checkpointManager.getCheckpointRounds,並且這個爲下一個關鍵實現節點,我們在拆細到內部來看。

  • checkpointRound循環內部

startTracker,實際上一個RabitTracker,啓停拿到work等待資源等操作。然後通過SparkParallelismTracker將core等資源給到tracker,總之是屬於Spark相關邏輯範疇。

然後到了關鍵有點迷的地方:

val boostersAndMetrics = if (hasGroup) {

trainForRanking(transformedTrainingData.left.get, overriddenParams, rabitEnv,

checkpointRound, prevBooster, evalSetsMap)

} else {

trainForNonRanking(transformedTrainingData.right.get, overriddenParams, rabitEnv,

checkpointRound, prevBooster, evalSetsMap)

}

我們看到有兩個分支,關鍵判斷節點在於hasGroup,我們追蹤這個變量,發現在XGBoost.trainDistributed方法調用是,默認就是false,然後在後續的處理邏輯中,壓根兒就沒有被賦值過了,那麼這裏做兩條分支的意義在哪?暫時沒有找到答案。

嚴格意義上說,我們看trainForNonRanking的核心執行邏輯就行了。

  • trainForNonRanking內部邏輯

首先是buildWatches,內部把trainPoints分爲train部分和test部分,根據外部傳參train_test_ratio控制,調用Watches(有點像是對train數據集的另外一個包裝)。

然後肉戲又來了,關鍵的分佈式預測過程,buildDistributedBooster,底層依賴Rabit,先拿到對應的train數據集的Partitions。

  • buildDistributedBooster內部邏輯

val booster = SXGBoost.train(watches.toMap("train"), overridedParams, round,watches.toMap, metrics, obj, eval,earlyStoppingRound = numEarlyStoppingRounds, prevBooster)

但實際上更底層調用的是JXGBoost.train。

xgboostInJava=JXGBoost.train(dtrain.jDMatrix,params.filter(_._2 != null).mapValues(_.toString.asInstanceOf[AnyRef]).asJava,round, jWatches, metrics, obj, eval, earlyStoppingRound, jBooster)

  • JXGBoost.train邏輯

更底層的邏輯:

+begin to train

{

+booster.update(dtrain, obj); //更新booster。

//調用Booster原生類,內部進行葉子節點的predict,然後算train數據和predict(XGBoostJNI.XGBoosterPredict)出來的梯度getGradient(梯度計算類IObjective)

//然後更新公式中的Grad和Hess,boost(dtrain, gradients.get(0)=grad, gradients.get(1)=hess),實際調用底層XGBoostJNI.XGBoosterBoostOneIter

}

+evaluation //評估,使用metricts

+booster.saveRabitCheckpoint()

+Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)  //在watches範圍內循環迭代

在這裏,比如XGB一階和二階求解的時候,調用了更加底層的原生接口XGBoostJNI.XGBoosterBoostOneIter。

在執行完train動作之後:

+val (booster, metrics) =def postTrackerReturnProcessing(trackerReturnVal: Int,distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],sparkJobThread: Thread): (Booster, Map[String, Array[Float]])

//重要:distributedBoostersAndMetrics最終整合成一個booster與metrics元組,處理過程val (booster, metrics) = distributedBoostersAndMetrics.first()

=>Any of "distributedBoostersAndMetrics" can be used to create the model.  ?

最讓我迷的地方來了,在PostTracker之後,其實就是各個Tracker通訊邏輯了,我們上面可以看到實際上每個Tracker都訓練了一個XGB,並且從返回值也可以看到是一個distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],這種數據結構的數據。

實際上就是一個Booster與混淆矩陣的集合,但該方法的返回值只有一個booster和metrics的元組對,所以一定在內部做了選擇或者合併或者相關邏輯。

通過深入可以看到,實際上從這個方法註釋上有一句可以看到端倪“Any of "distributedBoostersAndMetrics" can be used to create the model. ”。這意味着他是選擇其中的一個booster作爲結果的,再一看,我擦,直接.first(),讓我有點難以接受。

Booster都拿到了,整個Train的過程也就結束了。我們可以看到分佈式的訓練過程並沒有想象中複雜,也沒有想象中起到那麼牛逼的作用,特徵並行化這部分沒有細說,但是從切分TrainSet到通過RDD進行類似單機的XGB訓練,這部分的邏輯是相對清晰的,只不過最終選擇Booster的過程是如此的隨意有點難以接受。

04

XGBoost4j-spark的預測過程

本質上是把加載或者訓練好的booster進行廣播,減少通訊的時間,然後rdd進行按row預測,實際調用的是JVM原生Predict接口,簡單的要命。

XGBoostJNI.XGBoosterPredict

dataset.map{

rowIterator =>

+missing值填充

+feature格式化

輸出=>val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) = producePredictionItrs(bBooster, dm)

過程{

+Broadcast[Booster]  //model=>booster對訓練好的booster進行廣播   =》 broadcastBooster.value.predict  =》 Booster:booster.predict(data.jDMatrix, outPutMargin, treeLimit) =》XGBoostJNI.XGBoosterPredict

}

}

結論:本質上通過RDD進行分佈式切割,然後逐行Row讀取,通過對Booster的廣播避免變量傳遞通訊,然後做逐個predict,其實我們自己也可以寫這個過程的,只要有了Booster之後,重點是如何解決Booster的生成。

05

XGBoost4j-spark的模型保存過程

重點關注Xgboost4j保存過程與單機保存過程不一致的地方,將有助於我們解決開頭部分無法直接加載單機訓練的model的問題。

class XGBoostClassificationModelWriter(instance: XGBoostClassificationModel) extends MLWriter {

override protected def saveImpl(path: String): Unit = {

// Save metadata and Params

implicit val format = DefaultFormats

implicit val sc = super.sparkSession.sparkContext

DefaultXGBoostParamsWriter.saveMetadata(instance, path, sc)

// Save model data

val dataPath = new Path(path, "data").toString

val internalPath = new Path(dataPath, "XGBoostClassificationModel")

val outputStream = internalPath.getFileSystem(sc.hadoopConfiguration).create(internalPath)

outputStream.writeInt(instance.numClasses)

instance._booster.saveModel(outputStream)

outputStream.close()

}

}

代碼解釋:

(1) instance,實際上就是train好的model

(2) save動作分三部分,第一部分保存saveMetadata,第二部分往4j版的model文件寫instance.numClasses,第三步,保存booster,instance._booster.saveModel(outputStream)

重點關注metadata的內容是什麼,booster保存的跟單機保存差別不大,實際也是調用的 outputStream.write(this.toByteArray())。

def saveMetadata(

instance: Params,   //instance重點也是取裏頭的Params

path: String,

sc: SparkContext,

extraMetadata: Option[JObject] = None,    

//實際上看調用,後面兩個參數使用的是默認參數

paramMap: Option[JValue] = None): Unit = {

val metadataPath = new Path(path, "metadata").toString

val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)   

//重點函數

sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)

}

=》getMetadataToSave方法:實際就是把模型的參數表(就是XGB的參數表,轉換成json格式)+uid,classname,timestamp,sparkVersion。

可以看到,實際上metadata保存的確實是一些元數據信息,並且其保存的內容確實“別具特色”,難怪單機版的model無法直接加載,但實際上我們只需要instance._booster.saveModel(outputStream)這部分。

06

XGBoost4j-spark的模型加載過程

說完保存,我們來研究xgboost4j具有獨特的模型加載過程,單本質上還是爲了獲取booster。

private class XGBoostClassificationModelReader extends MLReader[XGBoostClassificationModel] {

/** Checked against metadata when loading model */

private val className = classOf[XGBoostClassificationModel].getName

override def load(path: String): XGBoostClassificationModel = {

implicit val sc = super.sparkSession.sparkContext

val metadata = DefaultXGBoostParamsReader.loadMetadata(path, sc, className)

val dataPath = new Path(path, "data").toString

val internalPath = new Path(dataPath, "XGBoostClassificationModel")

val dataInStream = internalPath.getFileSystem(sc.hadoopConfiguration).open(internalPath)

val numClasses = dataInStream.readInt()

val booster = SXGBoost.loadModel(dataInStream)

val model = new XGBoostClassificationModel(metadata.uid, numClasses, booster)

DefaultXGBoostParamsReader.getAndSetParams(model, metadata)

model

}

}

XGBoostClassificationModelReader類的load方法入口。傳入的Model路勁,實際上是個文件夾:

+metadata    //保存了元數據信息

+data/XGBoostClassificationModel   //保存了二進制的Booster

Booster的實際load過程:

val booster = SXGBoost.loadModel(dataInStream)

=》new Booster(JXGBoost.loadModel(in))

=> Booster.loadModel(in);

=>XGBoostJNI.XGBoosterLoadModelFromBuffer

PS:最終可以看到,實際調用的依然是XGBoostJNI原生接口。

metadata的作用:

入口:DefaultXGBoostParamsReader.getAndSetParams(model, metadata)

/**

* Extract Params from metadata, and set them in the instance.

* This works if all Params implement [[org.apache.spark.ml.param.Param.jsonDecode()]].

* TODO: Move to [[Metadata]] method

*/

def getAndSetParams(instance: Params, metadata: Metadata): Unit = {

implicit val format = DefaultFormats

metadata.params match {

case JObject(pairs) =>

pairs.foreach { case (paramName, jsonValue) =>

val param = instance.getParam(handleBrokenlyChangedName(paramName))

val value = param.jsonDecode(compact(render(jsonValue)))

instance.set(param, handleBrokenlyChangedValue(paramName, value))

}

case _ =>

throw new IllegalArgumentException(

s"Cannot recognize JSON metadata: ${metadata.metadataJson}.")

}

}

所以,由於booster內部已經保存了我們所需要的各種樹狀結構,以及相關的參數,所以 instance.set(param, handleBrokenlyChangedValue(paramName, value))的動作實際上是沒有本質意義的。

07

基於 XGBoost4j-spark的解決方案

從上面幾個大步驟的分析,我們可以得出以下幾個結論。

首先,從目前看,在數據量不大的時候,還對樣本進行RDD的切分,在做分佈式的Booster訓練,在做選擇,其實是把樣本進一步切分小了,在樣本本來就不多的情況下,並不是一個好事情,還不如用單機來訓練。

其次,單機預測的Booster到底能不能在Spark中用,答案這回是肯定的。我們通過源碼分析可以知道,XGBoost4j-spark只要涉及到底層的邏輯,都是調用原生的 XGBoostJNI入口,不管是計算一階二階導數,還是直接predict還是train,包括求解梯度等等,都是如此。

我就說嘛,XGBoost4j-spark不可能脫離開XGBoost單獨造這麼多輪子的,,所以更多的是利用Spark的資源調度的能力,相當於包了層殼子。

基於這個結論,單機拿到的Booster一定能在XGBoost4j-spark上用來predict。

結合模型save和load的分析,其實要解決加載的問題很簡單,那就是構造一個load入口。

如果要做侵入式的解決,那就修改源碼中的load函數,直接把什麼metadata虛頭巴腦的過程給咔嚓掉,只留下SXGBoost.loadModel(dataInStream)的精華部分。然後重新編譯打包,做成你的第三方外部依賴。

我個人認爲這種方案是一個次的方案,比較死板,並且這種侵入式的更改邏輯非常麻煩,需要編譯打包。

還有一種無侵入式的解決方案就是,我叫他“劫持接口”,即我們不調用XGBoost4j-spark的load方法,我們調用我們自己的load方法,然後修改邏輯在這裏修改,然後剩下的底層部分使用XGBoost4j-spark的邏輯,相當於我們只是劫持了這一部分,然後按需修改。

如果需要達到這個目的,我們就需要寫一個類似XGBoostClassificationModelReader的類,比如我們稱之爲XGBoostClassificationModelReaderBlogchong類,然後同樣繼承MLReader[XGBoostClassificationModel]。這樣對於底層的實現就不需要再重複實現了。

private class XGBoostClassificationModelReader extends MLReader[XGBoostClassificationModel] {

private val className = classOf[XGBoostClassificationModel].getName

override def load(path: String): XGBoostClassificationModel = {

implicit val sc = super.sparkSession.sparkContext

val metadata = DefaultXGBoostParamsReader.loadMetadata(path, sc, className)

val dataPath = new Path(path, "data").toString

val internalPath = new Path(dataPath, "XGBoostClassificationModel")

val dataInStream = internalPath.getFileSystem(sc.hadoopConfiguration).open(internalPath)

val numClasses = dataInStream.readInt()

val booster = SXGBoost.loadModel(dataInStream)

val model = new XGBoostClassificationModel(metadata.uid, numClasses, booster)

DefaultXGBoostParamsReader.getAndSetParams(model, metadata)

model

}

}

其中,我們把metadata部分移除,還需要注意XGBoost4j-spark在二進制文件的頭部“摻了點沙子”,即分類的類別數numClasses,統統幹掉。只留下loadModel的部分。

然後調用預測的transform方法,愉快的玩耍吧。

08

總結

整個優化方案其實非常簡單,但是要做出這個方案需要很多的判斷,比如Booster是否一致性,這決定了根本性的二進制文件是否能遷移的問題。

此外,對於實際上兩種方式的模型保存的差異性要搞清楚,搞清楚了差異才能回到統一的解決路子上,所以纔會拆解Save和Load的過程。

不過話又說回來,本質上Predict的過程並不難,也是拆解完其執行邏輯之後,發現其實就是RDD切割數據,然後Booster廣播,然後類似單機的predict預測。

我們自己也可以不用框架實現這個目標,比如使用Java的XGB,然後在RDD內部做單機預測,其實邏輯也是類似的,結論真是有點讓人悲傷。

不過,既然提供了XGBoost4j-spark框架,我們還是可以用起來的,畢竟是官方推薦版本,內部做了流程上的優化,而且要我們自己寫一大段代碼也是很複雜的(目測了下transform的邏輯近千行代碼還是有的),何必呢。

至此,本文核心內容就結束了。

其實這個研究的起點,正式之前我所說的,大規模離線預測單機受限,且實操下來發現XGBoost4j-spark的預測過程對於網絡通信要求非常高,並不是十分的穩定,經常出現RPC通訊相關的錯誤。此外就是,對於train的資源配置,需要花點耐心。總之,這些都是不太好的地方。

基於這種特殊的場景訴求,所以把源碼擼了一遍,然後解決方案竟然是如此簡單,大有喫力不討好的味道?其實不是的,任何一步的改動都需要有足夠的理論支撐,而看源碼,尋找代碼的邏輯結構是找理論依據,依據尋找解決方案。

至於上面說到的“接口劫持”這種無侵入的解決方案,其實非常適用類似的場景,比如需要修改一些知名框架的時候,又不想大動干戈的修改編譯源碼,這種方式就非常輕量級了,簡直就是代碼界的“無痛人流”。

參考文獻

【01】 https://github.com/dmlc/xgboost

文章都看完了,來個賞ba~

--凌晨1點,求知使我快樂並動力十足。

對於新手朋友,或者一些處於開發分析狀態想了解一些數據挖掘相關技能的朋友,或者是剛畢業將要畢業想從事數據方向的應屆朋友,以及對廣告感興趣的朋友(職業廣告數據挖掘從業人員),歡迎加入我的知識星球。

相關文章