摘要:通常,您希望在使用Spark時避免急切操作,如果我需要處理大型CSV文件,將首先把數據集轉換爲parquet格式,然後再執行其餘的管道。當將CSV文件讀入數據幀時,Spark以急切模式執行操作,這意味着在下一步開始執行之前將所有數據加載到內存中,而在讀取parquet格式的文件時使用惰性方法。

點擊上方關注,All in AI中國

手把手帶你入門PySpark!

PySpark數據科學入門

PySpark是一種很好的語言,可以大規模地進行探索性數據分析、構建機器學習管道以及爲數據平臺創建ETL。如果您已經熟悉Python和Pandas等庫,那麼PySpark是一種很好的語言,可以用來創建更具擴展性的分析和管道。這篇文章的目的是展示如何啓動和運行PySpark並執行常見任務。

我們將使用Databricks作爲Spark環境,將Kaggle的NHL數據集用作分析的數據源。這篇文章展示瞭如何在Spark 數據幀寫入數據,創建這些幀的轉換和聚合,可視化結果以及執行線性迴歸。我還將展示如何使用Pandas UDF以可擴展的方式將常規Python代碼與PySpark混合。爲了簡單起見,我們將專注於批處理並避免流數據管道出現的一些複雜問題。

這篇文章的完整筆記本可以在github上找到。(https://github.com/bgweber/StartupDataScience/blob/master/EDA/PySpark_NHL.ipynb)

環境

啓動和運行Spark有許多不同的選項:

  • 自託管:您可以使用裸機或虛擬機自行設置羣集。 Apache Ambari是這個選項的一個有用的項目,但它不是我推薦的快速啓動和運行的方法。
  • 雲提供商:大多數雲提供商都提供Spark集羣:AWS具有EMR,GCP具有DataProc,它可以比自託管更快地進入互動環境。
  • 供應商解決方案:包括Databricks和Cloudera在內的公司提供Spark解決方案,使Spark易於啓動和運行。

    使用的解決方案因安全性、成本和現有基礎架構而異。如果您正在嘗試使用並運行一個需要的環境來學習,那麼我建議您使用Databricks Community Edition。

    手把手帶你入門PySpark!

    在Databricks Community Edition中創建PySpark集羣

    使用此環境,可以輕鬆啓動並運行Spark羣集和筆記本環境。在本教程中,我使用Spark 2.4運行時和Python 3創建了一個集羣。要運行本文中的代碼,您至少需要Spark版本2.3,才能獲得Pandas UDF功能。

    Spark 數據幀

    PySpark中使用的關鍵數據類型是Spark 數據幀。此對象可以被視爲分佈在集羣中的表,其功能類似於R和Pandas中的數據幀。如果你想使用PySpark進行分佈式計算,那麼你需要對Spark 數據幀執行操作,而不是其他python數據類型。

    使用Spark時,也可以通過在Spark數據幀上調用toPandas()來使用Pandas數據幀,該數據幀返回一個pandas對象。但是,除了處理小型數據幀之外,通常應該避免使用此函數,因爲它將整個對象拉入單個節點的內存中。

    Pandas和Spark數據幀之間的主要區別之一是急切執行和延遲執行。在PySpark中,操作被延遲,直到管道中實際需要結果。例如,您可以指定從S3加載數據集並對數據幀應用多個轉換的操作,但不會立即應用這些操作。相反,記錄轉換圖,並且一旦實際需要數據,例如當將結果寫回S3時,則轉換被應用爲單個管道操作。此方法用於避免將完整數據幀拉入內存,並在整個計算機集羣中實現更有效的處理。使用Pandas數據幀,一切都被拉入內存,並立即應用每個Pandas操作。

    通常,如果可能的話,最好避免在Spark中進行急切操作,因爲它限制了有效分配的管道數量。

    閱讀數據

    使用Spark時學習的第一步是將數據集加載到數據幀中。將數據加載到數據幀後,您可以應用轉換、執行分析和建模,創建可視化並保留結果。在Python中,您可以使用Pandas直接從本地文件系統加載文件:

    手把手帶你入門PySpark!

    在PySpark中,加載CSV文件要複雜一些。在分佈式環境中,沒有本地存儲,因此需要使用分佈式文件系統(如HDFS,Databricks文件存儲(DBFS)或S3)來指定文件的路徑。

    通常,在使用PySpark時,我使用S3中的數據。許多數據庫都提供了對S3功能的卸載,並且還可以使用AWS控制檯將文件從本地計算機移動到S3。對於這篇文章,我將使用Databricks文件系統(DBFS),它以/ FileStore的形式提供路徑。第一步是上傳您要處理的CSV文件。

    手把手帶你入門PySpark!

    將文件上載到Databricks文件存儲

    下一步是將CSV文件讀入Spark數據幀,如下所示。此代碼段指定CSV文件的路徑,並將許多參數傳遞給read函數以處理該文件。最後一步顯示加載的數據幀的子集,類似於Pandas中的df.head()。

    手把手帶你入門PySpark!

    在使用Spark時,我更喜歡使用 parquet 格式,因爲它是一種文件格式,包含有關列數據類型的元數據,提供文件壓縮,並且是一種旨在與Spark配合使用的文件格式。 AVRO是另一種適用於Spark的格式。下面的代碼段顯示瞭如何從過去的代碼段中獲取數據幀並將其保存爲DBFS上的parquet文件,然後從保存的parquet文件中重新加載數據幀。

    手把手帶你入門PySpark!

    此步驟的結果是相同的,但執行流程明顯不同。當將CSV文件讀入數據幀時,Spark以急切模式執行操作,這意味着在下一步開始執行之前將所有數據加載到內存中,而在讀取parquet格式的文件時使用惰性方法。通常,您希望在使用Spark時避免急切操作,如果我需要處理大型CSV文件,將首先把數據集轉換爲parquet格式,然後再執行其餘的管道。

    通常,您需要處理大量文件,例如位於DBFS中某個路徑或目錄的數百個parquet文件。使用Spark,您可以在路徑中包含通配符來處理文件集合。例如,您可以從S3加載一批parquet文件,如下所示:

    手把手帶你入門PySpark!

    如果您每天都有一個單獨的parquet文件,或者如果您的管道中有一個先前的步驟會輸出數百個parquet文件,則此方法很有用。

    如果要從數據庫(例如Redshift)讀取數據,最好先將數據卸載到S3,然後再使用Spark進行處理。在Redshift中,卸載命令可用於將數據導出到S3進行處理:

    手把手帶你入門PySpark!

    還有用於數據庫的庫,例如spark-redshift,使這個過程更容易執行。

    寫數據

    與使用Spark讀取數據類似,不建議在使用PySpark時將數據寫入本地存儲。相反,您應該使用分佈式文件系統,如S3或HDFS。如果您要使用Spark處理結果,則parquet是用於保存數據框架的良好格式。下面的代碼段顯示瞭如何將數據幀保存爲DBFS和S3作爲parquet。

    手把手帶你入門PySpark!

    以parquet格式保存數據幀時,通常將其劃分爲多個文件,如下圖所示。

    手把手帶你入門PySpark!

    將數據幀保存到DBFS時生成的parquet文件

    如果您需要CSV文件中的結果,則需要稍微不同的輸出步驟。這種方法的主要區別之一是所有數據在輸出到CSV之前將被拉到單個節點。當您需要保存小型數據幀並在Spark之外的系統中處理它時,建議使用此方法。下面的代碼段顯示瞭如何將數據幀保存爲DBFS和S3上的單個CSV文件。

    手把手帶你入門PySpark!

    Spark腳本的另一個常見輸出是NoSQL數據庫,如Cassandra、DynamoDB或Couchbase。這超出了本文的範圍,但我過去看過的一種方法是將數據幀寫入S3,然後啓動一個加載過程,告訴NoSQL系統從S3上的指定路徑加載數據。

    我也省略了對流式輸出源的寫入,如Kafka或Kinesis。這些系統在使用Spark流時更有用。

    轉換數據

    可以在Spark數據幀上執行許多不同類型的操作,就像可以在Pandas數據幀上應用的各種操作一樣。在Spark數據幀上執行操作的方法之一是通過Spark SQL,它可以像查詢表一樣查詢數據幀。下面的代碼段顯示瞭如何在數據集中找到最高得分的玩家。

    手把手帶你入門PySpark!

    結果是玩家ID、遊戲次數和這些遊戲中的總進球數列表。如果我們想要顯示播放器的名稱,那麼我們需要加載一個額外的文件,使其可作爲臨時視圖,然後使用Spark SQL加入它。

    手把手帶你入門PySpark!

    數據集中得分最高的玩家

    在上面的代碼片段中,我使用display命令輸出數據集的樣本,但也可以將結果分配給另一個數據幀,這可以在管道的後續步驟中使用。下面的代碼顯示瞭如何執行這些步驟,其中第一個查詢結果被分配給新的數據幀,然後將其分配給臨時視圖並與一組播放器名稱連接。

    手把手帶你入門PySpark!

    這個過程的結果如下所示,根據Kaggle數據集確定Alex Ovechkin是NHL中的得分最高的球員。

    手把手帶你入門PySpark!

    使用Spark SQL連接數據幀的進程的輸出

    對於常見任務,有Spark數據幀操作,例如添加新列、刪除列、執行連接以及計算聚合和分析統計信息,但是在開始使用時,使用Spark SQL執行這些操作可能更容易。此外,如果您已經使用PandaSQL或framequery等庫來使用SQL操作Pandas數據幀,那麼將代碼從Python移植到PySpark會更容易。

    與Spark數據幀上的大多數操作一樣,Spark SQL操作以延遲執行模式執行,這意味着在需要結果之前不會評估SQL步驟。 Spark SQL提供了一種挖掘PySpark的好方法,而無需首先學習數據幀的新庫。

    如果您正在使用Databricks,您還可以直接在筆記本中創建可視化,而無需顯式使用可視化庫。例如,我們可以使用下面的Spark SQL代碼繪製每個遊戲的平均目標數。

    手把手帶你入門PySpark!

    Databricks筆記本中顯示的初始輸出是結果表,但我們可以使用繪圖功能將輸出轉換爲不同的可視化,例如下面顯示的條形圖。這種方法不支持數據科學家可能需要的每個可視化,但它確實使得在Spark中執行探索性數據分析變得更加容易。如果需要,我們可以使用toPandas()函數在驅動程序節點上創建Pandas數據幀,這意味着任何Python繪圖庫都可用於可視化結果。但是,這種方法應僅用於小型數據幀,因爲所有數據都被急切地提取到驅動程序節點上的內存中。

    手把手帶你入門PySpark!

    2月和3月的平均每場比賽進球數

    對於至少進5球的球員,我也考察了每次射門的平均進球數。

    手把手帶你入門PySpark!

    此轉換的結果如下圖所示。大多數至少有5個進球的球員在4%到12%的時間內完成投籃。

    手把手帶你入門PySpark!

    Kaggle數據集中玩家的每次射門目標

    MLlib

    爲了在MLib中使用一種監督算法,您需要使用特徵向量和標籤作爲標量來設置數據幀。準備好後,您可以使用fit函數來訓練模型。下面的代碼段顯示瞭如何使用VectorAssembler將數據框中的多個列組合成單個要素向量。我們使用結果數據幀調用fit函數,然後生成模型的摘要統計信息。

    手把手帶你入門PySpark!

    該模型根據射擊次數、遊戲時間和其他因素預測玩家將獲得多少目標。然而,該模型的性能較差,導致均方根誤差(RMSE)爲0.375,R平方值爲0.125。具有最大值的係數是鏡頭列,但是這並沒有提供足夠的信號以使模型準確。

    使用PySpark構建ML管道時需要考慮許多其他步驟,包括訓練和測試數據集,超參數調整和模型存儲。上面的代碼段只是開始使用MLlib的起點。

    Pandas UDF

    我最近使用的Spark中的一個功能是Pandas用戶定義函數(UDF),它使您能夠在Spark環境中使用Pandas數據幀執行分佈式計算。這些UDF的一般工作方式是首先使用groupby語句對Spark數據幀進行分區,並將每個分區發送到工作節點並轉換爲傳遞給UDF的Pandas數據幀。然後,UDF返回轉換後的Pandas數據幀,該數據幀與所有其他分區組合,然後轉換回Spark數據幀。最終結果非常有用,您可以使用需要Pandas的Python庫,現在也可以擴展到海量數據集,只要您有一種分區數據幀的好方法。 Pandas UDF是在Spark 2.3中引入的,我將討論在Spark Summit 2019期間我們如何在Zynga使用此功能。

    曲線擬合是我作爲數據科學家執行的常見任務。下面的代碼片段顯示瞭如何執行曲線擬合來描述玩家在遊戲過程中記錄的擊球次數和擊球次數之間的關係。該片段顯示了我們如何通過在過濾到單個播放器的數據集上調用toPandas()來爲單個玩家執行此任務。該步驟的輸出是兩個參數(線性迴歸係數),他們試圖描述這些變量之間的關係。

    手把手帶你入門PySpark!

    如果我們想爲每個玩家計算這條曲線並擁有一個海量數據集,那麼由於內存不足異常,toPandas()調用將失敗。我們可以通過調用player_id上的groupby(),然後應用下面顯示的Pandas UDF,將此操作擴展到整個數據集。該函數將Pandas數據框作爲輸入,描述單個玩家的遊戲統計數據,並返回包含player_id和擬合係數的摘要數據幀。然後將每個摘要Pandas數據幀組合成一個Spark數據幀,該數據幀顯示在代碼片段的末尾。使用Pandas UDF的另一個設置是爲結果數據幀定義模式,其中模式描述了從應用步驟生成的Spark數據幀的格式。

    手把手帶你入門PySpark!

    此過程的輸出如下所示。我們現在有一個數據幀,總結了每個玩家的曲線擬合,並且可以在海量數據集上運行此操作。在處理大量數據集時,選擇或生成分區密鑰,以在數據分區的數量和大小之間實現良好的權衡。

    手把手帶你入門PySpark!

    來自Pandas UDF的輸出,顯示每位玩家的曲線擬合

    最佳實踐

    我已經介紹了使用PySpark的一些常見任務,但也希望提供一些建議,使其更容易從Python到PySpark。以下是我根據在這些環境之間移植一些項目的經驗收集的一些最佳實踐:

  • 避免使用庫,使用數據幀:使用諸如字典之類的Python數據類型意味着代碼可能無法在分佈式模式下執行。與其使用鍵來索引字典中的值,不如考慮將另一列添加到可用作過濾器的數據幀中。
  • 謹慎使用toPandas:調用toPandas()將導致所有數據被加載到驅動程序節點上的內存中,並阻止在分佈式模式下執行操作。當數據已經聚合並且您想要使用熟悉的Python繪圖工具時,可以使用此函數,但它不應該用於大型數據幀。
  • 避免for循環:如果可能,最好使用groupby-apply模式重寫for循環邏輯以支持並行化代碼執行。我注意到,專注於在Python中使用這種模式也導致清理代碼更容易轉換爲PySpark。
  • 嘗試最小化急切操作:爲了使您的管道盡可能具有可擴展性,最好避免將整個數據幀拉入內存的急切操作。我注意到用CSV讀取是一個急切的操作,我的工作是將數據幀保存爲parquet,然後從parquet重新加載以構建更具可擴展性的管道。
  • 使用framequery / pandasql可以更輕鬆地進行移植:如果您正在使用其他人的Python代碼,那麼解讀一些Pandas操作正在實現的內容可能會很棘手。如果您計劃將代碼從Python移植到PySpark,那麼使用Pandas的SQL庫可以使這種轉換更容易。

    我發現在PySpark中編寫代碼的時間也因Python編碼技巧而得到改善。

    結論

    PySpark是數據科學家學習的理想語言,因爲它支持可擴展的分析和ML管道。如果您已經熟悉Python和Pandas,那麼您的大部分知識都可以應用於Spark。我已經展示瞭如何使用PySpark執行一些常見的操作來引導學習過程。我還展示了一些最近使用Pandas UDF的Spark功能,它使Python代碼能夠以分佈式模式執行。有很好的環境可以讓你輕鬆啓動和運行Spark集羣,現在正是學習PySpark的好時機!

    手把手帶你入門PySpark!

  • 相關文章