MongoSpark爲入口類,調用MongoSpark.load,該方法返回一個MongoRDD類對象,Mongo Spark Connector框架本質上就是一個大號的自定義RDD,加了些自定義配置、適配幾種分區器規則、Sql的數據封裝等等,個人認爲相對核心的也就是分區器的規則實現;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。

當前實現的分區器(Partitioner):

MongoPaginateByCountPartitioner 基於總數的分頁分區器

MongoPaginateBySizePartitioner 基於大小的分頁分區器

MongoSamplePartitioner 基於採樣的分區器

MongoShardedPartitioner 基於分片的分區器

MongoSinglePartitioner 單分區分區器

MongoSplitVectorPartitioner

基於分割向量的分區器

這裏根據源碼簡單介紹MongoSinglePartitioner與MongoSamplePartitioner分區器,這或許就是用得最多的兩種分區器,他的默認分區器(DefaultMongoPartitioner)就是MongoSamplePartitioner分區器;該分區默認的PartitionKey爲_id、默認PartitionSizeMB爲64MB、默認每個分區採樣爲10;

MongoSamplePartitioner

該類的 核心 也是 唯一的方法爲:partitions方法 ,下面爲該方法的執行流程與核心邏輯;

1、檢查執行buildInfo指令檢查Mongo版本用於判斷是否支持隨機採樣聚合運算,版本大於3.2。hasSampleAggregateOperator方法。Mongo3.2版本中才新增了數據採樣功能。

Mongodb中的語法爲:

db.cName.aggregate([
  {$sample:{ size: 10 } }
])

上示例N等於10,如果N大於collection中總數據的5%,那麼$sample將會執行collection掃描、sort,然後選擇top N條文檔;如果N小於5%,對於 wiredTiger 而言則會遍歷collection並使用“ 僞隨機 ”的方式選取N條文檔,對於 MMAPv1 引擎則在_id索引上隨機選取N條文檔。

2、執行collStats,用於獲取集合的存儲信息,如行數、大小、存儲大小等等信息;

matchQuery: 過濾條件

partitionerOptions: ReadConfig傳進去的分析器選項

partitionKey: 分區key,默認爲_id

partitionSizeInBytes: 分區大小,默認64MB

samplesPerPartition: 每個分區默認採樣數量,默認10

count: 集合總條數

avgObjSizeInBytes: 對象平均字節數

numDocumentsPerPartition: 每個分區文檔數, partitionSizeInBytes / avgObjSizeInBytes :分區大小/對象平均大小

numberOfSamples:

採樣數量,samplesPerPartition * count / numDocumentsPerPartition,每個分區採樣數*集合總數/每個分區文檔數

如每個分區文檔數大於集合總文檔數 ,則將直接創建單分區,不採取採樣數據方式創建分區,因爲此時數據量太少單個分區已經可以容得下無需多個分區;

一、創建單分區

MongoSinglePartitioner類中通過PartitionerHelper.createPartitions執行相關邏輯;

_id作爲partitionKey,

二、通過採樣數據創建分區

指定採樣條件、採樣數據量、PartitionKey、排序條件等,獲取採樣數據;

集合拆分:

def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1

右側邊界:

val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}

獲取 右側邊界 ,使用採樣數據數組索引對每個分區採樣數求餘等於0對採樣數據進行過濾取右側邊界(如匹配條件不爲空則再取最後一條數據);

如採樣得到62條數據,並且沒有存在匹配條件,根據上述的採樣數據過濾條件最後取得7條數據,分別爲數據數組索引爲0、索引爲10、20、30、40、50、60的7條數據,數據的值爲PartitionKey默認就是集合中_id字段的值;

創建分區(Partitions)

獲取得到PartitionKey、rightHandBoundaries後就可以調用PartitionerHelper.createPartitions創建Partition;下面爲創建Partition的具體邏輯;

使用PartitionKey創建查詢邊界 ,每個分區具有不同的查詢邊界,有最大、最小邊界;此處創建分區Partition並在每個分區中指定了查詢邊界;

上面獲取得到了7條數據,此處將創建8個分區;下面給出了簡單數據用於說明該分區邊界條件的基本邏輯與實現;

1、創建Min、1、3、5、7、9、11、13、Max的序列

2、創建1、3、5、7、9、11、13、Max序列

3、使用zip將兩個序列拉鍊式的合併:合併後的數據爲:

4、Min,1、1,3、3,5、5,7、7,9、9,11、11,13、13,Max

Partition的邊界條件將使用上面的邊界條件,8條數據八個Partition一個對應;

0 Partition的邊界條件爲:小於1

1 Partition的邊界條件爲:大於等於1 小於 3

2 Partition的邊界條件爲:大於等於3 小於 5

3 Partition的邊界條件爲:大於等於5 小於 7

4 Partition的邊界條件爲:大於等於7小於 9

5 Partition的邊界條件爲:大於等於9 小於 11

6 Partition的邊界條件爲:大於等於11 小於 13

7 Partition的邊界條件爲:大於等於13

上面的8個Partition爲8個MongoPartition對象,每個對象的index、查詢邊界與上面所說的一一對應;

在MongoRDD類的compute方法

中可以看到根據對應的分區與上面創建分區時所建立的邊界條件用於計算(從Mongo中獲取對應數據);

分區1分區與邊界條件

MongoSinglePartitioner

創建單分區分區器時,直接調用PartitionerHelper.createPartitions方法創建分區,該類並無其他邏輯,並且固定的PartitionKey爲_id,右側邊界條件爲空集合,然後創建id爲0的MongoPartition對象,並無查詢邊界;

相關文章