Spark 3.0 推出了 Catalog Plugin 特性。在 Release Note 裏面位於 Highlight 部分。我們這篇文章就來介紹一下 Catalog Plugin 機制。

Catalog Plugin 的設計文章在 Google Doc 上: SPIP: Spark API for Table Metadata 。本文部分參考於這篇設計文檔。

1. 背景

DataSourceV2 是 Spark 2.x 新推出的 API,主要目的是用來和外部數據存儲進行集成,比如數據讀寫。但是這裏缺少關鍵的一環:對錶的元數據進行操作,比如創建、修改、刪除表等。

SparkSQL 和 DataFrame 操作都支持 CTAS (Create Table AS Select) 用來創建一個表並向該表寫入數據,注意這裏是一個操作。缺少創建目標表的 API,CTAS 的實際行爲將完全取決於 DataSourceV2 的實現。比如寫表失敗,表可能被保存也可能被刪除。並且在某些 SaveMode 下,我們無法區分 CTAS 和普通的寫操作,那麼很有可能在 Append 模式下寫表的時候會因爲表被刪除而失敗。最後一點,Spark 沒有一種機制用來設置由 CTAS 創建的表,比如分區。

除此之外,數據工程師也希望類似 CTAS 的 high-level 操作在數據源上面進行操作的時候能保持行爲一致。 SPIP to Standardize SQL Logical Plans 介紹了一些 high-level 的操作,並且總結了這些操作的期望行爲,並期望 Spark 在內部實現上設計一種機制進行保證。這也要求 Catalog API 能對那些數據源進行創建、修改以及刪除等操作。

舉個例子,爲了實現 CTAS,Spark 會創建、寫入或者刪除表(寫入失敗時)。這樣的話,當元數據管理不可用或者 driver 自己失敗的時候,CTAS 可能會刪除表不成功。

除此之外,還有一個暴露 catalog API 的需求。我們使用 DataFrame 編寫 Spark 程序的時候可以使用 SQL 引擎,但是並沒有類似創建、修改以及刪除這種 catalog 的 API 提供。在 Spark 代碼中,Catalog 接口提供了一些操作,但是並不夠全面和強大,比如不支持 multi catalog。

這就是 Catalog Plugin 產生的背景。

所以 Catalog Plugin 的首要目標其實是提供一組 catalog API 用來創建、修改、加載和刪除表。

2. CatalogPlugin Interface

CatalogPlugin 在 Spark 代碼中是一個 Interface,代碼如下。

/**
 * A marker interface to provide a catalog implementation for Spark.
 * <p>
 * Implementations can provide catalog functions by implementing additional interfaces for tables,
 * views, and functions.
 * <p>
 * Catalog implementations must implement this marker interface to be loaded by
 * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
 * required public no-arg constructor. After creating an instance, it will be configured by calling
 * {@link #initialize(String, CaseInsensitiveStringMap)}.
 * <p>
 * Catalog implementations are registered to a name by adding a configuration option to Spark:
 * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
 * in the Spark configuration that share the catalog name prefix,
 * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
 * string map of options in initialization with the prefix removed.
 * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".
 *
 * @since 3.0.0
 */
@Evolving
public interface CatalogPlugin {
  /**
   * Called to initialize configuration.
   * <p>
   * This method is called once, just after the provider is instantiated.
   *
   * @param name the name used to identify and load this catalog
   * @param options a case-insensitive string map of configuration
   */
  void initialize(String name, CaseInsensitiveStringMap options);

  /**
   * Called to get this catalog's name.
   * <p>
   * This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is
   * called to pass the catalog's name.
   */
  String name();

  /**
   * Return a default namespace for the catalog.
   * <p>
   * When this catalog is set as the current catalog, the namespace returned by this method will be
   * set as the current namespace.
   * <p>
   * The namespace returned by this method is not required to exist.
   *
   * @return a multi-part namespace
   */
  default String[] defaultNamespace() {
    return new String[0];
  }
}

從代碼中我們可以獲得幾點有用的信息:

  1. 自定義 catalog 必須實現這個 interface
  2. 然後通過 Catalog#load(String, SQLConf) 進行加載,加載時會調用具體 Catalog 的無參構造函數方法進行初始化
  3. 初始化之後會調用 CatalogPlugin 中的 initialize 方法進行初始化
  4. 使用 CatalogPlugin 需要添加如下配置,其中第二個配置就是我們傳遞給 CatalogPlugin 的 initialize 方法的參數
    • spark.sql.catalog.catalog-name=com.example.YourCatalogClass
    • spark.sql.catalog.catalog-name.(key)=(value)

我們查看一下 CatalogPlugin Interface 的實現和繼承關係可以看到如下圖。我們可以看到 TableCatalog Interfact 繼承了 CatalogPlugin,然後 V2SessionCatalog 和 JDBCTableCatalog 是兩個具體的 class,實現了 TableCatalog。所以我們可以有理由相信 TableCatalog 中實現了創建、修改、刪除表的 api。

3. TableCatalog

TableConfig 也是一個 Interface,代碼如下。

/**
 * Catalog methods for working with Tables.
 * <p>
 * TableCatalog implementations may be case sensitive or case insensitive. Spark will pass
 * {@link Identifier table identifiers} without modification. Field names passed to
 * {@link #alterTable(Identifier, TableChange...)} will be normalized to match the case used in the
 * table schema when updating, renaming, or dropping existing columns when catalyst analysis is case
 * insensitive.
 *
 * @since 3.0.0
 */
@Evolving
public interface TableCatalog extends CatalogPlugin {
  ...
}

根據註釋可以看出 TableCatalog 定義了 Catalog 和表進行交互的方法,其實就是前面說的增刪改。值得注意的是 TableCatalog 可以被實現成字符敏感或者字符不敏感的,實現方法是通過一個 alterTable 方法去對 field 做規範化,確實挺巧妙的。

TableCatalog 定義的方法非常的簡單,都是和 table 相關的,如下,這裏就不再細說的。

TableCatalog 的實現有 V2SessionCatalog 和 JDBCCatalog,其中 V2SessionCatalog 是爲了和之前的 SparkSession 中的 Catalog 做兼容,這裏就不再細說了。

4. CatalogManager

前面介紹 Catalog 使用的時候提供一個配置就可以了。

spark.sql.catalog.catalog-name=com.example.YourCatalogClass

那麼我們有理由懷疑所有的 catalog 都是通過一個 Map 映射關係來管理的,實際上確實差不多,這個管理的 Class 就是 CatalogManager。

/**
 * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
 * the caller to look up a catalog by name.
 *
 * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They
 * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To avoid tracking current
 * namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get
 * current database of `SessionCatalog` when the current catalog is the session catalog.
 */
private[sql]
class CatalogManager(
  ...
  )

從 CatalogManager 的註釋中我們可以看出這就是一個 CatalogPlugin 的管理者,並且是線程安全的。我們簡單看一下 CatalogManager 內部的方法和成員。

如上所示,簡單介紹其中兩個。

  • catalogs: 一個 map: mutable.HashMap[String, CatalogPlugin],保存 catalog 名字和 Class 的隱射關係
  • catalog(String):用來查找特定名字的 Catalog,返回 CatalogPlugin 接口。

5. 使用舉例

使用舉例下面這篇文章寫的挺好的,copy 部分內容如下,全文可以移步: https://developer.aliyun.com/article/756968

基於 Spark 3.0 preview使用Iceberg + SparkSQL

在Spark DatasourceV2增加了multiple catalog等功能後,回到我們想要查詢的SQL,實現步驟如下:

1.在Iceberg側對CatalogPlugin/TableCatalog/SupportsRead等接口進行實現,實現類名如: org.apache.iceberg.spark.SparkCatalog

2.在spark的配置文件中設置:

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

3.基於配置的catalogName,調整SQL如下,就可以進行基於SQL的跨數據源查詢了。

select * 
from iceberg_catalog.ns1.t1
join hive_db.t2 on t1.k1 = t2.k1;

4.除了跨數據源數據分析以外,現在還可以對Iceberg的表進行DDL操作了,如,

create table iceberg_catalog.t1 ......
drop table iceberg_catalog.t1
相關文章