導語

大數據處理技術現今已廣泛應用於各個行業,爲業務解決海量存儲和海量分析的需求。但數據量的爆發式增長,對數據處理能力提出了更大的挑戰,同時對時效性也提出了更高的要求。業務通常已不再滿足滯後的分析結果,希望看到更實時的數據,從而在第一時間做出判斷和決策。典型的場景如電商大促和金融風控等,基於延遲數據的分析結果已經失去了價值。

爲了同時滿足大數據量和高時效性的雙重要求,實時數倉和在線交互式(ad-hoc)分析技術,及相應的基礎組件應運而生,並快速發展。其中包括通用計算引擎(如Spark和Flink),交互式分析系統(如Presto,Druid和ClickHouse),數據湖框架(如Iceberg,Hudi和Delta Lake),和底層存儲(如Ozone)。

本文主要介紹基於Iceberg的特性,通過Spark和Flink,如何打造T+0實時數倉,以及相應功能在Iceberg社區的進展。

離線和實時數倉

傳統的離線數倉可以通過Hive加HDFS搭建。藉助Hive成熟和穩定的能力,以及豐富的上下游生態,構造數據處理和分析平臺。它通常遇到如下痛點:

流批混合的作業難以基於同一套基礎組件搭建; 難以保證端到端的”有且僅有一次“和”強一致“的語義; 流批銜接,即流式數據落地,通常環節多,流程長,時效性差; 難以保證ACID事務和讀寫分離,導致下游出現髒讀等錯誤;如果通過外部邏輯實現ACID事務和強一致性,會進一步加長整個流程; 已寫入的數據很難修正,或者只能以數據文件甚至整個分區這種較大的粒度進行操作,費時費力; 數據落地和處理過程難以實現端到端的增量處理等。

針對上述離線數倉的痛點,隨着流式計算引擎的發展,越來越多的公司引入實時數倉,或者實時和離線融合的數據分析平臺,以求達到秒級的實時響應。

基於Iceberg打造實時數倉

Iceberg最近已經順利畢業,晉升爲Apache頂級項目。它作爲新興的數據湖框架之一,開創性的抽象出”表格式“(table format)這一中間層,既獨立於上層的計算引擎(如Spark和Flink)和查詢引擎(如Hive和Presto),也和下層的文件格式(如Parquet,ORC和Avro)相互解耦。

同時,Iceberg還提供了許多額外的能力:

ACID事務; 時間旅行(time travel),以訪問之前版本的數據; 完備的自定義類型、分區方式和操作的抽象; 列和分區方式可以進化,而且進化對用戶無感,即無需重新組織或變更數據文件; 隱式分區,使SQL不用針對分區方式特殊優化; 面向雲存儲的優化等;

上述的抽象和能力使得Iceberg在流批銜接和實時數倉中可以發揮核心作用。

總體框架

如下圖所示:

使用Flink流式處理引擎消費數據總線,藉助ACID事務的能力強一致的導入Iceberg;讀寫分離使交互式查詢引擎可以第一時間讀取正確的數據;Row-level update和delete可以通過Spark對數據進行修正;增量消費使得已落地的數據可以進一步的返回流式處理引擎中,並只處理和向後傳遞變化的數據;Iceberg中的數據也可以同時被報表系統消費和進一步處理。

ACID事務

Iceberg實現了ACID事務機制,使得邊寫邊讀成爲可能,從而數據可以更快的被下游消費到。ACID事務機制保證下游只能看到已commit的snapshot所包含的數據,而不用擔心讀到部分或者未commit的數據。業務因此可以省去大量的用於保證ACID事務和失敗恢復的邏輯。

如上圖所示,虛線框代表即將被生成的snapshot,其中包含新寫入但尚未commit的數據;實線框表示已經被commit的snapshot,下游可以訪問最新的snapshot(S3)或者之前的snapshot(如S2等)中的數據。由ACID事務衍生出的row-level update和delete的能力將在後文中介紹。

Flink寫入和讀取Iceberg

Flink應用從總線中消費流式數據後,可以通過Flink sink匯聚並落地Iceberg,後續也可以通過Flink source繼續消費Iceberg中的數據。Flink的checkpoint機制和Iceberg的ACID事務特性是保證端到端的“有且僅有一次“語義的關鍵。

關於Flink sink寫入Iceberg,可以分爲相互解耦的兩個層次來實現:

相對底層的DataStream實現:實現SinkFunction和checkpoint相關接口。接入DataStream中,即可實現落地Iceberg。 相對高層的Table和SQL實現:按照對insert、delele和update的不同要求,實現StreamTableSink相關接口(AppendStreamTableSource,UpsertStreamTableSource或RetractStreamTableSource),以支持Table和SQL等高級語義和操作。但最終的寫入和commit操作還是通過上述的底層來實現。此處還需要處理Flink的schema以及類型到Iceberg的schema以及類型的轉換和映射。

關於Flink source讀取Iceberg,也可以分爲相互解耦的兩個層次來實現:

相對底層的DataStream實現:實現SourceFunctin和checkpoint相關接口。接入DataStream中,即可被Flink引擎驅動發送ScanTask來讀取Iceberg中的數據。 相對高層的Table和SQL實現:實現StreamTableSource,並輔助以ProjectableTableSource以支持projection從Flink下推到Iceberg,以及FilterableTableSource以支持表達式下推到Iceberg,從而過濾不滿足條件的行。

社區正在積極推進 Flink Iceberg sink [1] 的開發和合入。同時Flink sink和source也有 不同的實現 [2] 。但目標都是希望Iceberg能夠成爲首個在社區官方支持Flink的數據湖框架。

Flink Iceberg sink後續改進主要包括:在現有隻能append寫入的基礎上,增加update和delete語義,使延遲數據可以得到正確的處理。由於整個流程不可避免的出現數據延遲到達的情況,而落地通常使用“事件時間(event time)”來聚合和分區,因此如何把延遲到達的數據合入正確的分區是需要解決的問題。實現此功能後,CDC(Change Data Capture)的場景即可被全面支持了。但這仍然依賴於 row-level delete的實現 [3]

基於Spark進行數據修正

支持對已入庫數據的修正,同時保證ACID的事務特性,是現代數倉的基本能力。Iceberg的重要特性之一就是在row-level這一細粒度下的update和delete的能力。

Row-level update和delete通常有Copy-on-Write和Merge-on-Read兩種方案。其中Copy-on-Write把生成新數據文件的壓力集中於寫入的時候,適合對讀有較高要求的場景;而Merge-on-Read把合併最終結果的壓力放在讀取的時候,適合於快速寫入的場景。

我們在內部已經實現了基於Copy-on-Write的方式。同時也將Iceberg作爲Spark 3.0的V2 Data Source和multi-catalog,和Spark進行了集成,用戶可以方便的通過Spark SQL進行update、delete和merge into等DML操作,以及建表刪表等DDL操作。

我們作爲社區中spark-3分支的維護者,正在積極推進相關功能的開發和合入,讓更多的人受益。

增量消費Iceberg中的數據

流式數據落地數倉以後,還可以通過增量消費的方式回到流式處理引擎當中,繼續向下遊傳遞,做進一步的處理。而且針對數據延遲到達的情況,增量消費也爲下游提供了僅獲取變化數據的方式(而非全部數據),提高了信息傳遞的效率。增量消費,配合Flink sink寫入支持update和delete語義,使端到端支持增量處理,可以進一步降低整個流程的延遲。Iceberg可以方便的基於snapshot的歷史實現增量消費。

基於Spark,指定開始和結束snapshot-id的 incremental scan [4] 已經合入社區,以此爲基礎,基於micro-batch的 Spark Structured Streaming Read [5] 也已經實現。

Flink也有類似的增量消費的實現。

數據和元數據的壓縮合並

爲了提高讀取時job planning的效率,小文件的壓縮合並(compaction)是數倉日常維護中的重要任務,特別是流式數據直接落地,和基於Merge-on-Read實現row-level update和delete的功能,更加劇了小文件的產生。這裏要注意的是壓縮合並的對象既包括數據文件,也包括元數據文件。

壓縮合並可以分爲三個級別:

Minor compaction:僅合併元數據文件(rewrite manifest),不操作數據文件; Major compaction:合併元數據和數據文件。未來還需要處理數據文件和Merge-on-Read產生的delete文件的合併; Optimization:合併元數據和數據文件的同時,清理過期的snapshot以及這些snapshot對應的元數據和數據文件。

對於天生就是小文件的元數據,Iceberg可以自動的通過MergeAppend進行合併;也可以通過RewriteManifests手動發起合併,但它使用起來不是很方便。社區已經開發出 對應的Spark Action [6] ,依靠外部的Spark計算資源,方便的進行元數據的合併。

而對於數據文件的合併,社區也正在積極推進 相應Spark Action [7] 的開發和合入。

總結

隨着數據量的持續增大,和業務對時效性的嚴苛要求,實時數倉的作用愈發的重要。而Iceberg憑藉ACID事務、時間旅行和優秀的抽象等特性,以及對Spark和Flink等計算引擎接入的廣泛支持,作爲實時數倉的核心組件,可以縮短導入流程,方便數據變更,加速數據讀取。

參考

[1] Iceberg作爲Flink sink:https://github.com/apache/iceberg/pull/856

[2] https://github.com/generic-datalake/iceberg-pro

[3] Row-level delete的里程碑任務分解:https://github.com/apache/iceberg/milestone/4

[4] 基於Spark實現incremental scan:https://github.com/apache/iceberg/pull/829

[5] Spark Structured Streaming讀取Iceberg:https://github.com/apache/iceberg/pull/796

[6] 用於合併元數據的Spark action:https://github.com/apache/iceberg/pull/875

[7] 用於合併數據文件的Spark action:https://github.com/apache/iceberg/pull/1083

相關文章