作者: 李呈祥,花名司麟),阿里巴巴計算平臺事業部EMR團隊的高級技術專家,Apache Hive Committer, Apache Flink Committer,深度參與了Hadoop,Hive,Spark,Flink等開源項目的研發工作,對於SQL引擎,分佈式系統有較爲深入的瞭解和實踐,目前主要專注於EMR產品中開源計算引擎的優化工作。

背景

Count Distinct是SQL查詢中經常使用的聚合統計方式,用於計算非重複結果的數目。由於需要去除重複結果,Count Distinct的計算通常非常耗時。

以如下查詢爲例,Count Distinct的實現方式主要有兩種:

SELECT region, COUNT(DISTINCT userId) FROM orders GROUP BY region
  1. 對訂單表的數據按照region進行shuffle分區,在每個分區中使用一個類似HashTable的數據結構,存儲所有的非重複userId的值,最後統計所有key的數量。

  2. 對錶t的數據按照(region, userId)進行shuffle分區,第一步的結果即爲非重複的(region, userId)對,對於第一步的結果再按照region分區,統計每個分區中的Row數量。

第一種方式只需要一次shuffle,但是需要在內存中維護一個數據結構,佔用大量內存,甚至導致OOM。第二種方式多了一次shuffle,但是更加穩定可靠。Spark採用第二種方式實現Count Distinct。在多維分析或報表等場景中,用戶可能需要秒級的交互響應,在大數據量的情況下,很難通過單純地擴充資源滿足要求。本文主要介紹在Spark中如何基於重聚合實現交互式響應的COUNT DISTINCT支持。

預聚合和重聚合

預計算是數據倉庫領域常見的一種提升查詢效率的方式,通過將全部或部分計算結果提前計算好並存儲下來,對於後續的相關的查詢可以直接重用之前的預計算結果,從而加速查詢速度。在多維分析或報表等查詢模式相對比較固定的場景中,我們可以通過預聚合,將需要處理的數據量下降成百上千倍。此外對於預計算來說,由於用戶的查詢維度,過濾條件,統計方式非常多,考慮到預計算的計算和存儲代價,不太可能把每種可能的查詢條件都進行預計算,通常的方式是按照較細粒度進行分組聚合,然後對於後續更粗粒度的分組聚合查詢,可以使用預計算的結果進行重聚合,如下所示:

// 原始SQL

SELECT SUM(userId) AS sum_user FROM orders;


// 預計算SQL

CREATE TABLE pre_sum_orders AS SELECT region, sum(userId) as pre_sum_user FROM orders GROUP BY region;


// 重聚合SQL

SELECT sum(pre_sum_user) AS sum_user FROM pre_sum_orders;

由於pre_sum_orders中的數據已經經過了region的分組聚合,數據量相對於原始表可能會從100萬條下降到100條,重聚合可以非常快的完成。

Count Distinct的重聚合

SUM/COUNT/MIN/MAX等聚合函數都滿足結合律,可以非常容易的支持重聚合,例如Count可以在通過SUM再聚合,最小值可以通過MIN再聚合,最大值也可以通過MAX再聚合。但是對於AVERAGE或COUNT DISTINCT等許多聚合函數卻並非如此。以AVG爲例,每個region平均訂單金額和平均值並不等於全國的訂單平均值,對於COUNT DISTINCT也是如此。對於這類函數,我們也需要找到一種支持重聚合的中間結果。對於AVG,這個中間結果可以是(SUM, COUNT),可以通過如下方式重聚合,

// 原始SQL

SELECT avg(col2) FROM t;


// 預計算SQL

CREATE TABLE pre_avg_t AS SELECT col1, sum(col2) as pre_sum_col2, count(col2)as pre_count_col2 FROM t GROUP BY col1;


// 重聚合SQL

SELECT sum(pre_sum_col2)/sum(pre_count_col2) FROM pre_avg_t

對於COUNT DISTINCT來說,由於需要計算非重複結果的數目,Bitset存儲與計算效率很高,而且可以用來去重,是一個比較合適存儲COUNT DISTINCT預計算結果的數據結構。

Bitmap

Bitmap是一個常見的數據結構,存儲元素到bit的映射關係,邏輯上可以理解爲一個bit數組,每個bit有0和1兩個狀態,將元素映射到數組下標,在插入新元素時,修改該元素對應的數組下標的bit位置爲1,重複元素出現時也不會有影響,由於每個元素僅需1個bit位表示,可以大大減少存儲空間。此外Bitmap還可以通過位運算支持高效的計算,例如使用或運算合併兩個Bitmap。實際實踐中,爲了操作效率,通常會用long/int array作爲Bitmap的存儲,也有很多高效的開源實現可以直接使用,例如Google的EWAHCompressedBitmap,RoaringBitmap等等,還會對bitmap進行壓縮,對較爲稀疏的bitmap節省更多的存儲空間。COUNT DISTINCT會使用的Bitmap接口主要包括:

  1. add(element: Int): 向Bitmap中插入新的值。

  2. or(other: Bitmap): 將兩個Bitmap按bit位進行或操作。

  3. getCardinality(): 獲取Bitmap中1出現的次數。

使用Bitmap進行重聚合

我們可以基於Bitmap實現對COUNT DISTINCT的重聚合,這主要包含兩部分工作:1. 在預聚合時將COUNT DISTINCT字段值寫入Bitmap,並將Bitmap序列化成Binary類型字段保存。2. 在重聚合時,讀取並反序列化Bitmap字段,在合併後統計最終count數量。這兩步工作都需要新增自定義Spark UDF實現:

  1. bit_mapping : 接受Integer類型字段作爲參數,內部維護Bitmap數據結構,將輸入數據插入Bitmap中,並把Bitmap序列化二進制數據作爲輸出結果。

  2. re_count_distinct : 接受二進制數據作爲參數,反序列化位Bitmap,merge同一分區的多個Bitmap,把Bitmap的cardinality作爲結果輸出。

下面的SQL展示瞭如何使用這兩個UDF:

// 原始SQL

SELECT region, COUNT(DISTINCT userId) as dist_count_user FROM orders GROUP BY region;


// 預計算SQL

CREATE TABLE pre_count_orders AS SELECT region, bit_mapping(userId) as userId_bitmap FROM orders GROUP BY region;


// 改寫後SQL

SELECT region, re_count_distinct(userId_bitmap) as dist_count_user FROM pre_count_orders GROUP BY region;

原始SQL需要全量掃描orders表的數據,並經過partition key爲(region, userId)和region的兩輪shuffle。而改寫後的SQL,不需要任何shuffle操作,而且pre_count_orders由於經過了region的分組聚合,表的行數相對於orders表大大降低,整體執行速度可能有十倍甚至百倍以上的提升。

對於更粗粒度的COUNT DISTINCT,也可以通過重聚合支持:

// 原始SQL

SELECT COUNT(DISTINCT userId) as dist_count_user FROM orders;


// 改寫後SQL

SELECT re_count_distinct(userId_bitmap) as dist_count_user FROM pre_count_orders;

Global Dictionary

在上面bit_mapping的實現中,由於Bitmap的輸入要求是從0開始的自然數類型,以對應bit數組的下標,但是我們實際統計的字段,例如上面的userId的具體值很可能不是一個簡單的自然數,所以需要一個映射函數,能夠將統計字段的取值範圍映射成自然數。此外,由於Spark SQL分佈式執行的特性,這個映射必須是一致的,即同一個字段值在不同的spark task必須映射成同一個自然數,否則在merge Bitmap的時候,同一個字段對應的值就會在合併後的Bitmap中出現兩次,違反COUNT DISTINCT的語義要求。一個維護了統計字段到自然數的全局字典可以幫助我們滿足這個要求

Spark在1.5版本後加入了對開窗函數的支持,其中的row_number()函數的作用是根據表中字段分組排序,然後給排序好的記錄添加一個從1開始的序號,我們可以使用這個函數實現一個全局字典的功能。例如,對於任意類型的userId,我們可以通過如下的方式映射爲從1開始的自然數。

SELECT userId, row_number() over(order by userId) as bitmap_index FROM orders GROUP BY userId;

有了userId到bitmap_index的映射,我們就可以通過Join把全局字典和orders表關聯起來,使用bitmap_index字段去構建bitmap了,COUNT DISTINCT預聚合的SQL示例如下:

WITH globalDict AS (SELECT userId, row_number() over(order by userId) as bitmap_index FROM orders GROUP BY userId)

CREATE TABLE pre_count_orders

AS SELECT region, bit_mapping(bitmap_index) as userId_bitmap

FROM orders, globalDict

WHERE orders.userId = globalDict.userId

GROUP BY region;

最後,構建全局字典是一個較爲耗時的工作,當有新的訂單加入orders表後,如果我們構建新的預聚合結果時能夠重用已有的全局字典,只是增加新的userId到字典中,顯然可以提高執行預聚合的效率。

其他

Approximate Count Distinct可以用更小的代價統計count distinct的估計值,在不需要精確值的場景中被大量使用,Approximate Count Distinct主要基於Hyperloglog數據結構實現,HLL sketch作爲hll的中間結果,也是可以重聚合的。Swoop https://www.swoop.com/ 開源了高性能的 HLL native 函數工具包,作爲 spark-alchemy  https://github.com/swoop-inc/spark-alchemy 項目的一部分,提供了預聚合和重聚合的函數,用戶可以直接使用,實現Approximate Count Distinct的重聚合。詳細信息可以參考:Spark-Alchemy: HyperLogLog的使用介紹

在EMR Spark中通過Relational Cache支持了Count Distinct的預聚合和重聚合,提供了pre_count_distinct和re_count_distinct函數的實現,還提供了自定義的優化規則,將pre_count_distinct函數自動轉化爲基於Global Dictionary和bit_mapping的執行計劃,不需要用戶手工拼寫複雜的預聚合SQL邏輯。

總結

本文主要介紹了在Spark中實現Count Distinct重聚合的基本思路和實現方式,通過基於Bitmap的重聚合,可以實現幾個數量級的性能提升,滿足交互式分析場景統計Count Distinct的需求。

阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區數個Spark技術同學每日在線答疑,只爲營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

相關文章