摘要:我們通過spark.sql傳入一段SQL string即可完成數據的調用,需要注意的是,DataFrame也支持RDD的collect或者take等方法。當我們執行pyspark當中的RDD時,spark context會通過Py4j啓動一個使用JavaSparkContext的JVM,所有的RDD的轉化操作都會被映射成Java中的PythonRDD對象。

今天是 spark專題的第五篇 ,我們來看看DataFrame。

用過Python做過機器學習的同學對Python當中pandas當中的DataFrame應該不陌生,如果沒做過也沒有關係,我們簡單來介紹一下。DataFrame翻譯過來的意思是數據幀,但其實它指的是一種特殊的數據結構,使得數據 以類似關係型數據庫當中的表 一樣存儲。使用DataFrame我們可以非常方便地對整張表進行一些類似SQL的一些複雜的處理。Apache Spark在升級到了1.3版本之後,也提供了類似功能的DataFrame,也就是大名鼎鼎的SparkSQL。

關於SparkSQL的前世今生其實是有 一大段歷史 的,這一段歷史除了可以充當吹牛的談資之外,還可以幫助我們理清楚許多技術之間的內在關聯。

從優化到重構的血淚史

在程序開發這個行當, 優化和重構 註定是兩個無法擺脫的問題。

當一個項目啓動的時候,由於投入有限,可能招不到特別匹配的人才,或者是爲了快速滿足業務的需要。往往會採取一些 不是特別合理 的設計來構建項目,這個應該很好理解,爲了圖快犧牲一些性能或者是拓展性。而且有時候由於視野和能力的限制,早期的開發者可能也是無法意識到設計中的不合理性的。但是俗話說得好,出來混早晚是要還的。前面挖了坑,後來早晚也會暴露出來。問題就在於暴露了之後我們怎麼處理。

一般來說,無論是作爲公司也好,還是作爲開發者個人也罷。想的肯定都是怎麼樣 以最小的代價解決問題 ,也就是儘量優化,能不動核心代碼就不動。除了因爲核心代碼太久沒有維護或者是文檔缺失之外,也涉及到成本問題。現在的項目日進斗金,每天都在運行,一旦要下決心把核心代碼翻新一遍,那麼會付出巨大的代價,可能整個項目組要暫停一段時間。而且在上層管理層眼中,往往也是看不到重構的必要性的。因爲 上層都是以業務爲導向的 ,技術做得好不好不重要,能賺錢纔是王道。

但問題是優化並不是無止境的,很多時候核心設計的不合理纔是大頭,邊邊角角的修補只能聊勝於無。這個時候考驗的往往都是技術負責人的擔當了,是當個糊裱匠混一年是一年,還是壯士斷腕,敢叫日月換新天。一般來說 糊裱起到的效果都是有限的 ,總會有撐不下去要重構的那天。

SparkSQL早期的發展就非常好的印證了這點,SparkSQL誕生之初就是當做一個優化項目誕生的。目的是爲了優化Hive中在spark的效率。

這裏的Hive可能很多人不太熟悉,它是Hadoop家族結構化查詢的工具。將hadoop集羣中的數據以表結構的形式存儲,讓程序員可以以類SQL語句來查詢數據。看起來和數據庫有些近似,但原理不太一樣。Hive 底層是以MapReduce驅動的 ,也就是說會把我們寫好的SQL轉化成MapReduce執行。由於Hive易用性很好,使用的人很多,所以spark當中也支持Hive。

但其實那個時候spark興起,MapReduce時代已經逐漸走到了末期。那時的spark是基於前面介紹的RDD的結構處理數據的,性能比MapReduce好得多。但如果在spark上依然使用MapReduce的形式支持Hive,那麼就不能體現出spark計算性能的優越性。所以對於Hive on Spark的優化勢在必行。我個人覺得這有點搶市場的調調。

最好的辦法是對spark徹底重構,重建出一套支持結構化數據查詢的計算框架。但估計那時候主負責人沒能狠下心,或者是爲了趕時間。所以只是對Hive進行了一些優化,大概就是把一些使用MapReduce的計算想辦法儘量改成使用RDD,從而提升整體的效率。這樣做當然是能夠有提升的,但是核心的框架仍然是Hive的那一套機制,這樣的 提升是有限的 。大概過了三年左右的時間,基本上所有能壓榨出來的性能都被壓榨完了,開發組經過激烈的思想鬥爭之後,終於接受現實,徹底拋棄原本的框架,構建出一套新的架構來。

這套新開發出的架構就是SparkSQL,也就是DataFrame。

SparkSQL的架構

我們來簡單看下SparkSQL的架構,大概知道內部是怎麼運行的。

整個SparkSQL的模型大概分爲三層,最上面是編程模型層,中間是執行優化層,最後是任務執行引擎。

這些都是術語,我們簡單介紹一下,編程模型層主要有兩塊一塊是SparkSQL一種是DataFrame,這兩者只是語法不一樣,底層執行的邏輯是一樣的。主要做的是對我們寫的一些語法進行解析以及一些基本的處理。執行計劃層是將SQL語句轉化成具體需要執行的邏輯執行計劃,根據一些策略進行優化之後輸出物理執行策略。最後一層是執行層,負責將物理計劃轉化成RDD或者是DAG進行執行。

我們觀察一下這個架構,可能還有很多細節不是很清楚,但是至少整個執行的過程已經很明白了。進一步可以發現,整個架構當中已經 完全沒有MapReduce 的影子了,底層的執行單元就是RDD。也就是說SparkSQL其實是進一步更高層次的封裝。

RDD和DataFrame

我們來簡單看下DataFrame和RDD的差別,最大最直觀的差別就是DataFrame多了schema的概念。也就是多了數據格式的概念,我們拿到DataFrame可以很輕鬆地獲取它其中數據的結構信息。

我們看下下圖做個對比,同樣一份數據在RDD和DataFrame的樣子:

不要小瞧這個schema,有了它之後,我們就 可以做一些結構化數據才支持的操作 了。比如groupby、where、sum等等。這些結構化數據操作的靈活度要比RDD的map、filter等操作大得多。

另外一個好處就是效率,如果我們自己寫RDD來操作數據的話,那麼Python是一定幹不過scala和java的。因爲spark底層是依託Java實現的,spark的所有計算都執行在JVM當中。scala和java都是直接在JVM當中直接運行的語言,而Python不行,所以之前我們使用Python調用RDD處理spark的速度也會慢很多。因爲我們 需要經過多層中轉 ,我們可以看下下面這張圖。

當我們執行pyspark當中的RDD時,spark context會通過Py4j啓動一個使用JavaSparkContext的JVM,所有的RDD的轉化操作都會被映射成Java中的PythonRDD對象。當我們的任務被傳輸到Workder進行執行的時候,PythonRDD會啓動Python的子進程來傳輸代碼和執行的結果。

上面這段話說起來有點繞,簡單理解就是當pyspark調用RDD的時候,Python會轉化成Java調用spark集羣分發任務。每一個任務具體在機器上執行的時候,還是以Python程序的方式執行。執行結束之後,還是通過Python拿回數據給spark中的JVM。JVM執行結束之後,再把結果包裝成Python的類型返回給調用端。

本來Python的執行效率就低,加上中間又經過了若干次轉換以及通信開銷(佔大頭),這就導致了pyspark中的RDD操作效率更低。

而現在有了Catalyst優化器之後,會自動幫助我們進行底層的計算優化。並且即使是非原生的Python語言,也可以使用它,因此會帶來性能的極大提升。甚至經過官方的測量,使用pyspark寫DataFrame的效率已經和scala和java平起平坐了。

所以如果我們要選擇Python作爲操作spark的語言,DataFrame一定是首選。不過Catalyst優化器也有短板,它無法解決跨語言本身帶來的問題。比如我們使用Python寫一些udf(user defined function),還是會帶來性能的損耗。這個時候的整體效率還是會比scala低一些。

寫了這麼多廢話,下面就讓我們實際一點,看看究竟pyspark當中的DataFrame要如何使用吧。

創建DataFrame

和RDD一樣,DataFrame的創建方法有很多,我們可以基於內存當中的數據進行創建,也可以從本地文件或者是HDFS等其他雲存儲系統當中進行讀取。但怎麼讀取不重要,使用方法纔是關鍵,爲了方便演示,我們先來看看如何從內存當中創建DataFrame。

前文當中曾經說過,DataFrame當中的數據以表結構的形式存儲。也就是說我們讀入的一般都是結構化的數據,我們經常使用的結構化的存儲結構就是json,所以我們先來看看如何 從json字符串當中創建DataFrame

首先,我們創建一個json類型的RDD。

jsonstr = sc.parallelize(("""
{'name': 'xiaoming', 'age': 13, 'score': 100}""",
"""{'name': 'xiaohong', 'age': 15, 'score': 98}"""
))

接着,我們用 spark.read.json 將它轉化成一個DataFrame。需要注意的是,如果數據量很大,這個執行會需要一點時間,但是它仍然是一個轉化操作。數據其實並沒有真正被我們讀入,我們讀入的只是它的schema而已,只有當我們執行執行操作的時候,數據纔會真正讀入處理。

studentDf = spark.read.json(jsonstr)

執行完這一句之後,RDD轉DataFrame的工作就完成了。嚴格說起來這是讀取操作,並不是真正的轉化操作。RDD轉DataFrame稍微複雜一些,我們晚點再說。

如果我們想要查看DataFrame當中的內容,我們可以執行show方法,這是一個行動操作。和pandas中的head類似,執行之後,會展示出DataFrame當中前20條數據。我們也可以傳入參數,指定我們要求展示的數據條數。

我們來運行一下,看看展示出來的結果:

我們也collect一下原本的RDD作爲一下對比:

這下一對比我們就發現了,json格式的字符串果然可以被解析,並且RDD被轉化成了表格格式的DataFrame。

查詢

我們再來看下DataFrame的簡單查詢功能,其實Dataframe當中的查詢功能很多。我們今天先來看其中用得比較多的兩種。

先來看第一種,第一種是 通過select接口 查詢數據。這裏的select其實對應的是SQL語句當中的select,含義也基本相同,不同的是我們是通過函數進行調用的而已。

我們可以在select當中傳入我們想要查找的列名。

我們可以加上where或者filter函數進行條件判斷, where和filter函數是一個意思 ,兩者的用法也完全一樣。官方提供了兩個名字,爲了不同習慣的人使用方便而已。我們把下圖當中的函數換成filter結果也是一樣的。

另外一種操作方式稍稍複雜一些,則是將DataFrame 註冊成pyspark中的一張視圖 。這裏的視圖和數據庫中的視圖基本上是一個概念,spark當中支持兩種不同的視圖。第一種是臨時視圖,第二種是全局視圖。兩者的用法基本一致,不同的是作用範圍。臨時視圖的作用範圍是當前的session,如果當前的session關閉,或者是另外開啓了新的session,這個視圖就會作廢。而全局視圖則是跨session的,所有session都可以使用。

如果搞不清楚session的概念也沒有關係,在之後的文章當中我們還會遇到的。我們先有這麼個印象即可。

我們調用createOrReplaceTempView方法創建一個臨時視圖,有了視圖之後,我們就可以通過 SQL語句 來查詢數據了。

studentDf.createOrReplaceTempView("student")

我們通過spark.sql傳入一段SQL string即可完成數據的調用,需要注意的是,DataFrame也支持RDD的collect或者take等方法。如果這裏的結果我們調用的是collect,那麼spark會將所有數據都返回。如果數據集很大的情況下可能會出現問題,所以要 注意show和collect的使用範圍和區別 ,在一些場景下搞錯了會很危險。

結尾

今天這篇文章我們一起來看了pyspark當中目前爲止最常用的數據處理工具——DataFrame,還簡單瞭解了一下它和RDD相比的性能優勢以及它簡單的查詢語法的使用方法。

從上面的方法我們也看得出來,相比之前RDD中介紹的那些方法,DataFrame中封裝的API提供了更多高級的功能,比寫RDD處理數據也要方便很多。再加上性能原因,我們在處理數據時必然首選使用DataFrame。相信大家通過本文對於DataFrame也應該有了一個最初的印象,後續還會有更多文章詳細地介紹DataFrame的使用以及內部機制的一些細節,敬請期待吧。

今天的文章就到這裏,原創不易, 掃碼關注我 ,獲取更多精彩文章。

相關文章