供稿 |  eBay ADI-Carmel Team

作者 | 金瀾濤

編輯 | 顧欣怡

本文7309字,預計閱讀時間22分鐘

更多幹貨請關注“eBay技術薈”公衆號

導讀

本文介紹eBay Carmel團隊利用Delta Lake,使Spark SQL支持Teradata的Update/Delete語法 主要 從源碼角度介紹了CRUD操作的具體實現和優化,以及delta表的管理工作。希望對同業人員有所啓發和幫助。

大數據處理技術朝傳統數據庫領域靠攏已經成爲行業趨勢,目前開源的大數據處理引擎,如Apache Spark、Apache Hadoop、Apache Flink等等都已經支持SQL接口,且SQL的使用往往佔據主導地位。各個公司使用以上開源軟件構建自己的ETL框架和OLAP技術,但在OLTP技術上,仍然是傳統數據庫的強項。其中的一個主要原因是傳統數據庫對ACID的支持。具有ACID能力的傳統商用數據庫基本都實現了完整的CRUD操作。 而在大數據技術領域,由於缺少ACID的支持,基本只實現了C/R操作,對U/D操作很少涉及。

eBay 數據倉庫 的部分基礎設施是構建在商用數據產品 Teradata 之上的,近年來,隨着公司整體朝開源技術遷移,數據倉庫的基礎設施已基本遷移到Apache Hadoop、Apache Spark平臺。但要完全從Teradata上遷移下來,必須構建具有相同能力的SQL處理引擎。在Teradata上的分析型SQL,有超過5%的查詢使用Update/Delete操作,目前Apache Spark並不具備這個能力。

本文介紹eBay Carmel團隊利用Delta Lake,使Spark SQL支持Teradata的Update/Delete語法。 對比標準SQL的Update/Delete語法,以及目前尚未正式發佈的Apache Spark 3.0 提供的語法(不含實現),我們還實現了Teradata的擴展語法,可以進行跨表更新和刪除的SQL操作。

1.

簡介

Carmel Spark是Carmel團隊基於Apache Spark進行魔改的SQL-on-Hadoop引擎。主要改善了交互式分析的使用體驗,提供即席查詢(ad-hoc)服務。Carmel Spark是 “Teradata退出” 項目的重要組成部分,在功能性和性能上,都做了大量開發和優化。例如全新的CBO、併發調度、物化視圖、索引、臨時表、Extended Adaptive Execution、Range Partition、列級訪問權限控制,以及各類監控和管理功能等,目前已經在線上使用且滿足業務需求。

但由於Apache Spark缺少ACID事務能力,並沒有提供Update/Delete語法。去年年初,Databricks開源了 存儲層Delta Lake ,爲 Apache Spark 提供可伸縮的 ACID 事務 ,提供事務管理、統一流批、元數據管理、版本回溯等數據庫領域常見功能。一年過去了,Delta Lake的版本也更新到了 0.5.0 ,但開源版本始終沒有提供Update/Delete的SQL實現。目前只提供Dataframe API,用戶需通過編寫代碼來對數據進行更新和刪除等操作。此外,根據 Apache Spark 3.0 分支上提供的SQL語法接口,也只支持基本的單表Update/Delete操作,對於複雜的帶有join語義的跨表操作,則完全不支持。而Teradata用戶已經在廣泛使用擴展的SQL語法對數據進行更新和刪除操作。

基於Delta Lake存儲層提供的ACID事務能力, Carmel Spark實現了Update/Delete的SQL語法,且該語法完全兼容Teradata的擴展語義 ,即能進行跨表的更新和刪除。同時, 我們拓展了delta表的數據分佈, 支持bucket delta表 ,並對其進行了bucket join等優化。此外,由於Carmel Spark集羣部署是多租戶的,所以同一套代碼會長期運行在YARN的不同隊列中。雖然Delta Lake存儲層提供了良好的事務隔離性,但仍會出現重複操作的風險(非同一事務)。因此, 我們使用delta表本身來治理delta表 ,即將所有delta表的元信息存儲在一張delta表中,通過對該元數據表的增刪改查操作,來對用戶使用的所有delta表進行管理。

本文的組織結構如下:第二節介紹相關技術和產品;第三節闡述項目的整體架構和實現;第四節詳細介紹如何利用Delta Lake使SparkSQL支持CRUD操作;第五節介紹delta表的bucket優化;第六節介紹delta表的自治和管理;最後兩節分別談一下未來的工作和對本文的總結。

2.

相關工作

2.1

Spark SQL

Apache Spark [1] 是一款開源的分佈式計算框架,誕生於 2009年 加州大學伯克利分校AMPLab的一個研究項目,於 2013 捐贈給了Apache軟件基金會。在處理結構化數據上,Spark提供了DataFrame API和Spark SQL模塊。DataFrame API允許用戶通過表、行和列的概念對數據進行操作。

同樣,用戶可以使用SQL來操作它們。Spark SQL模塊將SQL查詢轉成一棵 查詢計劃樹 (query plan tree)。給定一個原始SQL查詢,該查詢首先經詞法分析和解析,轉換爲 邏輯查詢計劃 (logical plan)。該邏輯查詢計劃經過查詢優化器,產生 優化的查詢計劃 (optimized plan)。最終,優化的查詢計劃被轉換爲 物理計劃 (physical plan),物理計劃會被轉成 job和task 最終提交到集羣上執行。

Apache Spark 3.0開始,SQL模塊提供了Update/Delete的語法定義,定義在Antlr4的語法文件裏,但並沒有具體實現,而是交由第三方實現。如圖1所示:

圖1 Spark 3.0 的Update/Delete語法

(點擊可查看大圖)

2.2

Teradata

Teradata [2] 是Teradata Corp.開發的可橫向擴展的關係型數據庫管理系統,設計用於分析型查詢,主要用於數據倉庫領域,採用大規模並行處理(MPP)架構。 Teradata對Update/Delete等語法支持非常完備 ,除了ANSI SQL: 2011定義的標準Update/Delete語法,Teradata還做了大量擴展,如跨表更新和刪除。其所提供的豐富的語法也給我們遷移到Spark帶來了挑戰。圖2所示爲Teradata支持的更新和刪除語法:

圖2 Teradata的單表以及跨表 Update/Delete語法 (點擊可查看大圖)

2.3

Delta Lake

2018年初 ,Databricks開源了 存儲層 Delta Lake [3] ,爲Apache Spark 提供可伸縮的 ACID 事務,提供事務管理、統一流批、元數據管理、版本回溯等數據庫領域常見功能。Delta Lake將其數據存儲在Parquet文件中,並提供ACID事務。它使用名爲Delta Log的事務日誌來跟蹤對錶所做的所有更改。

與開源的Delta Lake相比,Databricks內部版本可以通過SQL來進行Update/Delete操作,而目前開源版本只支持DataFrame的API,只能通過 Parquet [4] 文件推斷表的Schema信息,對 Hiv e Metastore [5] 的支持較弱,且不支持bucket表等等。 Apache Iceberg [6]Apache Hudi [7] 雖然實現形式與Delta Lake不同,但在Update/Delete的SQL語法支持上,目前都不完善。表1給出了這三個系統的對比(截止2019年11月)。

表1 三個ACID庫的簡單對比

(點擊可查看大圖)

3.

項目概述

有了Delta Lake在存儲層提供ACID事務保障,我們的主要工作就是利用Delta Lake,在我們的Spark版本上實現和Teradata相同的Update/Delete功能。要達到這個目標,有以下任務有待完成:

1. Delta Lake目前只支持 Apache Spark 2.4+版本 ,而Carmel團隊使用的Spark版本是基於 2.3版本 的,所以我們改了Delta Lake的部分實現併爲我們的Spark版本打了一些補丁。

2. Spark 3.0中雖然沒有Update/Delete語法的具體實現,但仍然在 Catalyst [8] 中加入了相關的邏輯計劃節點。不過這些新增的接口都是基於 DataSourceV2 的,我們需要將這部分代碼在 DataSourceV1 上進行重寫:

(點擊可查看大圖)

3. Teradata支持跨表的Update/Delete語法,目前Delta Lake和Spark都不支持,我們需要自己實現 帶join的跨表連接更新和刪除操作

4. Delta Lake目前對 Catalog [9] 的訪問還不成熟,delta表的schema是通過Parquet文件推斷出來的, 通過Catalog訪問Hive Metastore 是使用SQL訪問delta表的重要一環。

5. 由於上述原因,delta表無法識別bucket信息,更沒有考慮讀寫bucket表時的分佈(distribution)。

6. 在以上3,4,5步驟完成之後,還要對跨表操作進行優化,這裏將主要介紹 bucket join的優化

7. 開源版本的Delta Lake缺少一定的管理機制,需要實現一些自動化管理功能,如自動清理和合並文件等。

4.

CRUD的實現

4.1

前置工作

首先,要在我們的Spark 2.3內部版本中使用Delta Lake,就需要從社區打一些補丁。這裏重點說一下SPARK-28303。

SPARK-28303引入了基於DataSource V2的DELETE / UPDATE / MERGE語法。由於Spark 2.3不支持DataSource V2,因此我們需要將此功能移植到V1版本,在ddl.scala中增加了 UpdateTableStatement和DeleteFrom Statement。 Ant lr4 [10] 的語法結構如下所示:

(點擊可查看大圖)

4.2

實現單表更新

Delta Lake目前不支持Update/Delete SQL的解析,我們增加了兩個類:DeltaSqlResolution和PreprocessTableUpdateDelete,通過SparkSessionExtensions注入到Analyzer:

(點擊可查看大圖)

DeltaSqlResolution主要是用於解析condition和assignments表達式:

(點擊可查看大圖)

再由PreprocessTableUpdateDelete生成RunnableCommand。如果是delta表的話,這裏可以從LogicalRelation中拿出delta表的TahoeFileIndex(在DataSource.scala的resolveRelation中添加的),如果是非delta表,則會拋出AnalysisException。

(點擊可查看大圖)

UpdateCommand是Delta Lake自帶的類,我們對其改動不多,主要改了如下幾個地方:

一個是鑑於目前Update操作不會更新表的統計信息(Statistics),造成delta表在進行join等操作時無法正確判斷是走SortMergeJoin還是BroadcastJoin, 我們增加了catalog的訪問使delta表的CRUD操作都能更新表的統計信息。

第二個改動是增加了update/delete的row級別metrics信 息。 Delta Lake已經發布的 0.5.0 版本 update和delete缺少row級別的metrics。社區最新的代碼已經做了添加,但當更新或刪除單個partition或全表時仍舊是缺少的,而我們的實現在無論何種情況下都做了收集。

4.3

實現跨表更新

目前 Spar k3.0 定義的Update/Delete語法不支持跨表操作,而跨表更新和刪除操作卻十分普遍,比如更新目標表中具有(在inner join情況下)或可能沒有(在left outer join情況下)另一個表匹配行的行。

許多數據庫都提供跨表更新和刪除的語法。下面給出了幾種常用數據庫的跨表更新的例子。

MYSQL [11] 跨表更新:

Teradata的跨表更新:

PostgreSQL [12] 的跨表更新:

Teradata的語法和PostgreSQL的基本一致,只是FROM子句和SET子句順序調換了一下,而MYSQL支持在一條SQL裏同時更新多張表。 Carmel Spark 目前參考的是 Teradata 的語法,同時在DeltaSqlResolution中增加了帶join的解析:

(點擊可查看大圖)

和單表Update一樣,首先對condition和SET子句進行解析。不同的是,除了被更新的target是一個LogicalRelation以外,這裏的source可以是一個LogicalRelation,也可以是多張表連接在一起的join plan。

我們從WHERE條件的condition中分離出哪些是target和source之間的join criteria,哪些是source中自身的join criteria(source可以是多表join的plan),以及哪些是分別作用在target或source上的普通Filter。同樣地,再由PreprocessTableUpdateDelete生成Runnable Command:

(點擊可查看大圖)

上述代碼中,跨表更新和單表更新的區別是多構建了一個DeltaMergeAction。可見跨表更新的實現參考了MergeInto。

UpdateWithJoinCommand是跨表更新的主要執行類,一共分爲三步:

1. 通過將需要被更新的target表和source(可以是一個帶join的plan)進行 內連接 (inner join)找出所有會被更新的行所涉及的文件,標記爲 removeFiles 。這一步還能簡化後續的步驟,例如不涉及任何文件或者只涉及partition目錄時,不用全表執行第2步。

2. 將target和source使用 左外連接 (left outer join),對於join條件匹配的行,使用build side iterator的數據(右表),不匹配的行使用stream side iterator的數據(左表)。將數據寫出到target表,寫出的數據文件標記爲 addedFiles

3. 將1中removeFiles和2中的addedFiles寫入transaction log中,即 delta log

刪除操作和更新操作基本類似,可以視爲更新操作的簡化版,這裏就不展開了。

4.4

實現SELECT/INSERT

對delta表的讀操作(SELECT)實際上是對delta表的解析。Delta表是DataSource表的一種。在FindDataSourceTable這條rule中,通過resolveRelation方法對delta表進行特殊處理:

(點擊可查看大圖)

這裏我們把catalogTable對象傳入到DeltaDataSource的createRelation方法裏。補充一點,之所以這個case可以匹配到DeltaDataSource,是因爲我們在ConvertToDelta Command裏,通過alterTable,把provider從parquet改成了delta:

(點擊可查看大圖)

回到createRelation。通過傳入的catalogTable對象,我們在DeltaLog.scala裏將表的信息填到HadoopFsRelation裏面:

(點擊可查看大圖)

Delta表的INSERT操作也很簡單。在DataSourceStrategy中添加InsertIntoData SourceCommand:

(點擊可查看大圖)

普通delta表的insert我們沒有進行修改,這裏就不展開了,下一節講bucket表的insert時再詳細闡述這部分的改動。

4.5

創建Delta表

創建delta表(CREATE操作)目前完全複用了普通Parquet表的CREATE,只是需要在建完表後執行CONVERT TO DELTA命令。我們簡單做了一些修改,使其可以CONVERT一張空的Parquet表,目前社區版是不支持的。其他的修改主要是針對管理上的,在第六節會詳細介紹。

到此,CRUD功能的SQL實現已經基本完成。在這一節裏,我們引入了跨表更新操作,但是跨表更新涉及到join算子,這在大表之間進行更新操作時會有性能問題。在下一節中會介紹如何針對bucket表進行優化。

5.

Bucket優化

跨表更新操作中,會有多次連接算子,當進行連接操作的表是上 TB 數據量的大表時,整個更新操作就會變得非常慢。甚至,大量數據的SortMergeJoin可能拋出OutOfMemory。事實上,在我們實際的業務場景中,就存在着大量的大表更新。例如被更新的表往往是一張幾個 TB 的大表,然後和另一張或幾張中型表進行連接操作。 爲了優化這類SQL,最容易想到的方法是通過bucket join來避免大表數據的shuffle。 現實中,我們用戶的許多大表也的確做了分桶(bucket)。

然而目前delta表並不支持分桶表,相關代碼的BucketSpec都被默認填了None,對更新和刪除的操作也沒有考慮數據的分佈(Distribution)。那麼該如何實現bucket表的數據分佈呢?

5.1

創建delta bucket表和讀取

首先和Parquet表一樣,我們需要在建表時 指定分桶字段 。形如: CLUSTERED BY (col*) [SORTED BY (col*) ] INTO number BUCKETS。

4.3小節 中我們提到了在ResolveRelation時將CatalogTable對象傳入了HadoopFsRelation。 有了這個CatalogTable對象,就可以幫我們在後續的各類操作中識別bucket表了。

5.2

插入數據到delta bucket表

上一步只是告訴Spark,這是一張bucket表, 真正寫入數據的時候發現數據並沒有分桶分佈 。這是因爲Insert操作在delta表上是走InsertIntoDataSource -> InsertIntoDataSourceCommand的,而不是通過DataWritingCommand,所以也就走不到ensureDistributionAndOrdering的邏輯。以下代碼是社區版InsertIntoDataSourceCommand的實現:

(點擊可查看大圖)

如上代碼所示,它的實現非常簡單,將需要insert的 邏輯計劃“query” 封裝成一個data frame,然後傳入到實現類的insert方法裏。在Delta Lake中這個data frame會被傳入到TransactionalWrite的writeFiles方法中。最終從這個data frame中取出physical plan並傳入DataFormatWriter的write方法。之後就是真正的生成job並分發執行了。

從整個流程可以看出,從一開始的邏輯計劃對象“query”到最後的物理計劃,並沒有機會進行數據分佈的實現。所以 不管 在建表時是否指定分桶,插入數據時都不會滿足數據分佈。

鑑於目前DataSource並沒有考慮數據分佈的問題,我們在resolution階段就需要進行處理。大體就是在Catalyst裏增加一個InsertIntoDataSource的邏輯計劃節點和一個InsertIntoDataSourceExec的物理計劃節點。在InsertIntoDataSourceExec這個物理計劃中實現了requiredChildDistribution和requiredChildOrdering方法(代碼可以參考InsertIntoHadoopFsRelationCommand的requiredDistribution和requiredOrdering方法)。

這裏說一下整體流程。首先,DataSourceStrategy 原本是匹配到了InsertIntoTable就會將邏輯計劃“query”原封不動地傳入InsertIntoDataSource Command。 我們現在做出如下改變:增加一個新的邏輯計劃節點InsertIntoDataSource,爲其添加partition,bucket等信息,並將“query”作爲該新節點的child:

(點擊可查看大圖)

然後在SparkStrategy.scala的BasicOperators裏將InsertIntoDataSource節點轉成物理計劃節點InsertIntoDataSourceExec,通過planLater(i.query)得到物理計劃作爲該物理節點的child。這樣InsertIntoDataSourceExec的requiredChildDistribution和requiredChildOrdering方法就可以對數據進行分佈了:

(點擊可查看大圖)

5.3

在跨表更新或刪除操作中利用bucket join

到目前爲止,對delta表的改造已經使其具有了bucketSpec字段和數據分佈的特性。在跨表更新或刪除時,無論是inner join還是left outer join,只要target和source都是bucket表且滿足bucket join條件,就能走bucket join而不是SortMergeJoin。這就解決了大表之間join產生大量shuffle帶來的性能問題。

下面這個例子是跨表更新一張 3.9TB 的表,source則是一張 5.2TB 的表。圖3所示是left outer join階段,右表雖然有一個Filter,但是仍然不滿足broadcast join閾值,這個更新操作在非bucket join的情況下,會造成大量Executor OOM,最終導致job失敗。通過引入bucket join,該job在 2分鐘 左右就能順利完成。從圖3可以看到在SortMergeJoin的前後,已經沒有ShuffleExchange了。

圖3 跨表更新中利用bucket join避免shuffle

(點擊可查看大圖)

但是,這裏仍然可能存在問題,因爲被更新的表仍然是一張bucket表,而圖3的輸出沒有考慮數據的分佈。對於bucket表尚不滿足數據分佈的情況,我們需要在SortMergeJoin之後增加一輪HashRepartition,以保證最終的結果輸出符合被更新表的數據分佈特性:

(點擊可查看大圖)

6.

Delta的自治和管理

介紹完CRUD的功能和相關優化,這一節講一下我們是如何管理delta表的,主要包括:如何統計delta的使用情況,如何自動進行文件清理,如何管理 TimeTraval [13] 等。

在這之前我們需要簡單介紹一下eBay Carmel Spark的基本架構。eBay的Carmel Spark平臺是計算存儲分離的。數據存儲有一個專門的Hadoop集羣(Apollo),Carmel Spark集羣(Hermes)主要是由大內存加SSD的計算節點組成,通過 YARN [14] 進行調度。除了本地SSD以外,也有一部分存儲容量搭建了一個小容量的HDFS,主要是拿來做Relation Cache和物化視圖,這部分以後有機會另起一篇文章進行介紹。

我們使用Spark Thriftserver來提供JDBC和ODBC服務,但所有的Thriftserver並不是固定在某個機器上的,而是通過YARN進行調度,通過cluster mode將Spark Thriftserver提交到集羣內部。同時,根據Budget Group對YARN集羣分queue,不同的Budget Group有一個YARN的queue,例如廣告部門有一個queue,數據部門有一個queue,每個queue可以有多個Spark Thriftserver。Carmel Spark對scheduler模塊做過大量併發優化,經過壓測,一個Driver調度起來的任務能把 200臺 物理機的所有CPU壓滿。所以Driver調度並不是瓶頸,目前最大的一個queue僅使用一個Thriftserver就可以調度近 7000個 executors。

圖4 Carmel Spark集羣部署

(點擊可查看大圖)

目前有多少個queue,就有多少個Thriftserver,也就有多少個Application。但不同的Thriftserver仍然共享了一些組件,例如HDFS,Hive Metastore等。這就要求我們對所有的queue做一些管理。例如在物化視圖功能中,當對一張基礎表構建物化視圖後,所有的queue都需要在內存裏構建一些邏輯計劃樹。delta表的管理也類似,不過相比物化視圖簡單的多。例如我們要對所有的delta表進行自動化的文件清理工作,一種方式是起一個後臺線程遍歷Hive Metastore的所有表,對provider是delta的表進行處理。這樣的好處是不需要跨Thriftserver進行任何消息的同步,壞處自然是不斷遍歷Hive Metastore帶來的壓力(多集羣公用的Hive Metastore壓力已經比較大了)。 所以我們使用了一種更加直觀的方式進行管理,即用delta表來管理delta表。

我們創建了一張名爲carmel_system.carmel_ delta_meta的表,記錄瞭如表名、owner、deltalog路徑、是否自動清理、清理週期等元信息,並將其CONVERT成一張delta表。所以carmel_delta_meta表的第一條記錄就是自己的信息。然後我們提供了一套操作這張表的API,以調用靜態方法的方式放在 DeltaTableMetadata 類的半生對象中:

如下所示,當用戶對一張表執行CONVERT TO DELTA命令時,會生成一個事件,通過DeltaTableListener捕獲後將該delta表的元信息寫入carmel_delta_meta,當用戶刪除delta表時,DropTableEvent同樣可以觸發上圖的刪除操作API,從carmel_delta_meta刪除這條記錄:

(點擊可查看大圖)

另外在YARN的保留隊列(reserved queue只允許管理員權限連接)裏啓動一個DeltaValidate線程,通過讀取carmel_delta_meta中的數據進行驗證,觸發如刪除記錄等操作。同時,如果用戶在CONVERT TO DELTA時指定了Vacuum保留時間:

或是一開始沒有指定保留時間,後續通過命令VACUUM AUTO RUN進行修改:

DeltaValidate線程會自動生成Vacuum任務,並丟到Vacuum線程池調度執行。這裏就不貼代碼了。整個架構如圖5所示:

圖5 delta表的自治管理

(點擊可查看大圖)

此外,我們還增加了TimeTravel的SQL語義,用戶可以通過在SELECT命令裏增加AT關鍵字,單次讀取delta表某個version的快照。也可以通過ROLLBACK命令永久回到某個版本:

(點擊可查看大圖)

通過carmel_delta_meta中記錄的一些表的血緣信息,可以實現delta表的及聯回滾。在某個delta表rollback後,觸發器根據carmel_delta_meta的血緣信息,自動回滾其他相關表(這需要事先定義在carmel_delta_meta的rollback依賴樹和觸發器條件,該功能目前還未上線)。

上面介紹了通過delta表來管理delta表的方式,這一方法能很好地幫我們解耦隊列同步和外部系統依賴的問題,既方便靈活,又快捷安全。

7.

未來的工作

7.1

持續的性能優化

Carmel Spark項目經過兩年的技術迭代,已經具備非常多的功能和優化,例如Range Partation、Optimized Bucket Join、Broadcast/Local Cache、Extended Adaptive Execution、Parquet File Index、Materialized View、ACL、Volcano CBO、Adaptive Runtime Filter、Mutiple Files Scan等,如何讓新的功能如CRUD複用以上優化和特性,也變得越來越富有挑戰了。例如我在測試時發現Broadcast Cache和Mutiple Files Scan兩個功能在和CRUD功能集成時存在bug,又或者目前的Volcano CBO和Parquet File Index還不能應用在delta表上等。

此外,在跨表更新操作上,大表連接的優化目前只針對bucket表,但是當兩張非bucket表進行連接時,性能仍然不夠好。這裏就有多個優化點,比如Adaptive Runtime Filter,就是Join Pushdown,可以將join表的min/max或者join key的bloomfilter推到兩邊進行過濾,以減少參與連接的記錄數,目前只完成了在inner join下的部分功能。

7.2

更完備的語義

除了性能的優化,Carmel Spark作爲Teradata戰略代替品, 需要儘可能兼容Teradata的語義 ,後續如果有用戶需要MERGE INTO或者UPSERT操作,這部分還要繼續擴展。此外,目前UPDATE和DELETE的WHERE條件還不支持子查詢,CONVERT TO DELTA不支持Parquet Format的Hive表,這些都將是後續的工作。

7.3

高度自治的管理

第6節最後提到過的 及聯回滾功能 ,以及 對delta表的審計和監控 都屬於平臺管理的範疇。這些有的已經具備成熟的解決方案,如我們已經有完全和Teradata對標的列級訪問權限控制和審計功能。有的還在不斷完善,如用於File Index和Materialized View的Hive Metastore同步機制還沒有上線,目前用的還是過渡方案。這部分不止針對delta表,有些還可以應用於整個Carmel Spark。

8.

實施和總結

8.1

技術之外

最後簡單說一下項目的情況。這個項目找到我的時候是在 2019年的10月底 ,我剛上線完Spark臨時表功能,物化視圖項目也還陸陸續續有一些bug fix的工作要做,所以真正開始投入去做應該是在11月中旬。CRUD功能目標上線時間是在 2020年的2月份 ,不像物化視圖這類優化型項目,功能型項目承諾上線時間的要求往往更高一些。加之期間還有春節假期,oncall和各種bug fix的工作,對於該項目來說排期還是比較緊的。

此外,我們對Delta Lake的成熟度和性能也比較擔憂(現實也驗證了Delta Lake的開源版本在SQL成熟度上的確不足)。實踐中發現除了ACID這個核心功能不用操心以外,基本上都要二次開發。最後和我們使用的基於社區2.3版本進行魔改的Carmel Spark的集成相比,也存在許多挑戰。

再說一下爲什麼選擇Delta Lake。目前來看,除了Delta Lake之外,Apache Hudi和Apache Iceberg也能完成ACID的功能。 當時選擇Delta Lake一是因爲它是Databricks的產品 ,在Databricks內部版本比較成熟,長期來看其開源版本也會和Apache Spark更加緊密。 二是當時公司內部還有一個準實時數倉的項目,立項也是使用Delta Lake。 考慮到儘可能保持技術棧一致,我們選擇了Delta Lake,而且單從這個項目上Apache Hudi和Apache Iceberg並沒有特別的優勢。

最後說一下用戶支持,其實做一個項目最複雜也是最耗時的並不是編碼階段,而是上線後接受用戶的考驗。該功能的第一批用戶是來自eBay瑞士的財務部門分析師團隊,因爲不在同一個時區,春節假期裏幾乎每晚都會通過Zoom和我溝通。這種在用戶和開發者之間的持續交流,使得一些隱藏的問題即時浮現出來,用戶也得到了較好的使用體驗。我們的Carmel Spark每週都會有半個小時的例行發佈窗口,用戶遇到的bug幾乎都在下次發佈窗口時得到了修復。在這一週中,我們也會找出workaround方式,幫助用戶進度的推進。目前該功能已經在所有隊列上啓用,越來越多的用戶開始參與試用。

8.2

總結

本文從源碼角度講解如何利用Delta Lake使老版本的Spark SQL支持跨表的CRUD操作,以及我們所做的優化和管理工作。最後,簡單介紹了未來的工作方向以及項目實施上的一些感悟,希望能對閱讀者有所幫助。

參考文獻

[1] https://spark.apache.org/

[2] https://www.teradata.com/

[3] https://delta.io/

[4] https://parquet.apache.org/

[5] https://hive.apache.org/

[6] https://iceberg.apache.org/

[7] https://hudi.apache.org/

[8]https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

[9]http://blog.madhukaraphatak.com/introduction-to-spark-two-part-4/

[10] https://www.antlr.org/

[11] https://www.mysql.com/

[12] https://www.postgresql.org/

[13]https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

[14]https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

您可能還感興趣:

一探究竟 | eBay流量管理之DSR在基礎架構中的運用及優化

乾貨 | Rheos SQL: 高效易用的實時流式SQL處理平臺

分享 | “三高”產品設計的這些坑,你是不是也踩過?(上)

分享 | “三高”產品設計的這些坑,你是不是也踩過?(下)

一探究竟 | eBay流量管理之看不見的手

解密 | 一樁由數據潔癖引發的DNS懸案

分享 | eBay流量管理之Kubernetes網絡硬核排查案例

:point_down:點擊 閱讀原文 ,eBay ADI-Carmel Team 等你來!

We Are Seeking Talent Like You! 

相關文章