Pulsar 是一個由 Yahoo 公司於 2016 年開源的消息中間件,2018 年成爲 Apache 的頂級項目。

圖片來自 Pexels

在開源的業界已經有這麼多消息隊列中間件了,Pulsar 作爲一個新勢力到底有什麼優點呢?

Pulsar 自從出身就不斷的再和其他的消息隊列(Kafka,RocketMQ 等等)做比較。

但是 Pulsar 的設計思想和大多數的消息隊列中間件都不同,具備了高吞吐,低延遲,計算存儲分離,多租戶,異地複製等功能。

所以 Pulsar 也被譽爲下一代消息隊列中間件,接下來我會一一對其進行詳細的解析。

Pulsar 架構原理

Pulsar 架構原理如下圖:

整體的架構和其他的消息隊列中間件差別不是太大,相信大家也看到了很多熟悉的名詞,接下來會給大家一一解釋這些名詞的含義。

名詞解釋:

  • Producer:消息生產者,將消息發送到 Broker。
  • Consumer:消息消費者,從 Broker 讀取消息到客戶端,進行消費處理。
  • Broker:可以看作是 Pulsar 的 Server,Producer 和 Consumer 都看作是 Client 消息處理的節點。
  • Pulsar 的 Broker 和其他消息中間件的都不一樣,他是無狀態的沒有存儲,所以可以無限制的擴展,這個後面也會詳解講到。

  • Bookie:負責所有消息的持久化,這裏採用的是 Apache Bookeeper。
  • ZK:和 Kafka 一樣 Pulsar 也是使用 ZK 保存一些元數據,比如配置管理,Topic 分配,租戶等等。
  • Service Discovery:可以理解爲 Pulsar 中的 Nginx,只用一個 URL 就可以和整個 Broker 進行打交道,當然也可以使用自己的服務發現。
  • 客戶端發出的讀取,更新或刪除主題的初始請求將發送給可能不是處理該主題的 Broker 。

    如果這個 Broker 不能處理該主題的請求,Broker 將會把該請求重定向到可以處理主題請求的 Broker。

    不論是 Kafka,RocketMQ 還是我們的 Pulsar 其實作爲消息隊列中間件最爲重要的大概就是分爲三個部分:

  • Producer 是如何生產消息,發送到對應的 Broker。
  • Broker 是如何處理消息,將高效的持久化以及查詢。
  • Consumer 是如何進行消費消息。
  • 而我們後面也會圍繞着這三個部分進行展開講解。

    Producer 生產消息

    先簡單看一下如何用代碼進行消息發送:

    PulsarClient client = PulsarClient.create(''pulsar://pulsar.us-west.example.com:6650'');  Producer producer = client.createProducer(                 ''persistent://sample/standalone/ns1/my-topic'');  // Publish 10 messages to the topic for (int i = 0; i Step1:首先使用我們的 URL 創建一個 Client 這個 URL 是我們 Service Discovery 的地址,如果我們使用單機模式可以進行直連。

    Step2:我們傳入了一個類似 URL 的參數,我們只需要傳遞這個就能指定我們到底在哪個 Topic 或者 Namespace 下面創建的,URL 的格式爲:

    {persistent|non-persistent}://tenant/namespace/topic 

    Step3:調用 Send 方法發送消息,這裏也提供了 sendAsync 方法支持異步發送。

    上面三個步驟中,步驟 1,2 屬於我們準備階段,用於構建客戶端,構建 Producer,我們真的核心邏輯在 Send 中。

    那這裏我先提幾個小問題,大家可以先想想在其他消息隊列中是怎麼做的,然後再對比 Pulsar 的看一下:

  • 我們調用了 Send 之後是會立即發送嗎?
  • 如果是多 PartitIOn,怎麼找到我應該發送到哪個 Broker 呢?
  • 發送模式

    我們上面說了 Send 分爲 Async 和 Sync 兩種模式,但實際上在 Pulsar 內部 Sync 模式也是採用的 Async 模式,在 Sync 模式下模擬回調阻塞,達到同步的效果。

    這個在 Kafka 中也是採用的這個模式,但是在 RocketMQ 中,所有的 Send 都是真正的同步,都會直接請求到 Broker。

    基於這個模式,在 Pulsar 和 Kafka 中都支持批量發送,在 RocketMQ 中是直接發送,批量發送有什麼好處呢?

    當我們發送的 TPS 特別高的時候,如果每次發送都直接和 Broker 直連,可能會做很多的重複工作,比如壓縮,鑑權,創建鏈接等等。

    比如我們發送 1000 條消息,那麼可能會做 1000 次這個重複的工作,如果是批量發送的話這 1000 條消息合併成一次請求,相對來說壓縮,鑑權這些工作就只需要做一次。

    有同學可能會問,批量發送會不會導致發送的時間會有一定的延誤?這個其實不需要擔心,在 Pulsar 中默認定時每隔 1ms 發送一次 Batch,或者當 batchsize 默認到了 1000 都會進行發送,這個發送的頻率都還是很快的。

    發送負載均衡

    在消息隊列中通常會將 Topic 進行水平擴展,在 Pulsar 和 Kafka 中叫做 Partition,在 RocketMQ 中叫做 Queue,本質上都是分區,我們可以將不同分區落在不同的 Broker 上,達到我們水平擴展的效果。

    在我們發送的時候可以自己制定選擇 Partition 的策略,也可以使用它默認輪訓 Partition 策略。

    當我們選擇了 Partition 之後,我們怎麼確定哪一個 Partition 對應哪一個 Broker 呢?

    可以先看看下面這個圖:

    Step1:我們所有的信息分區映射信息在 ZK 和 Broker 的緩存中都有進行存儲。

    Step2:我們通過查詢 Broker,可以獲取到分區和 Broker 的關係,並且定時更新。

    Step3:在 Pulsar 中每個分區在發送端的時候都被抽象成爲一個單獨的 Producer,這個和 Kafka,RocketMQ 都不一樣。

    在 Kafka 裏面大概就是選擇了 Partition 之後然後再去找 Partition 對應的 Broker 地址,然後進行發送。

    Pulsar 將每一個 Partition 都封裝成 Producer,在代碼實現上就不需要去關注他具體對應的是哪個 Broker,所有的邏輯都在 Producer 這個代碼裏面,整體來說比較乾淨。

    壓縮消息

    消息壓縮是優化信息傳輸的手段之一,我們通常看見一些大型文件都會是以一個壓縮包的形式提供下載。

    在我們消息隊列中我們也可以用這種思想,我們將一個 Batch 的消息,比如有 1000 條可能有 1M 的傳輸大小,但是經過壓縮之後可能就只會有幾十 KB,增加了我們和 Broker 的傳輸效率,但是與之同時我們的 CPU 也帶來了損耗。

    Pulsar 客戶端支持多種壓縮類型,如 lz4、zlib、zstd、snappy 等。

    client.newProducer()     .topic(“test-topic”)     .compressionType(CompressionType.LZ4)     .create(); 

    Broker

    接下來我們來說說第二個比較重要的部分 Broker,在 Broker 的設計中 Pulsar 和其他所有的消息隊列差別比較大,而正是因爲這個差別也成爲了他的特點。

    計算和存儲分離

    首先我們來說說他最大的特點:計算和存儲分離。

    我們在開始的說過 Pulsar 是下一代消息隊列,就非常得益於他這個架構設計,無論是 Kafka 還是 RocketMQ,所有的計算和存儲都放在同一個機器上。

    這個模式有幾個弊端:

  • 擴展困難:當我們需要擴展的集羣的時候,我們通常是因爲 CPU 或者磁盤其中一個原因影響,但是我們卻要申請一個可能 CPU 和磁盤配置都很好的機器,造成了資源浪費。並且 Kafka 這種進行擴展,還需要進行遷移數據,過程十分繁雜。
  • 負載不均衡:當某些 Partion 數據特別多的時候,會導致 Broker 負載不均衡,如下面圖,如果某個 Partition 數據特別多,那麼就會導致某個 Broker(輪船)承載過多的數據,但是另外的 Broker 可能又比較空閒。
  • Pulsar 計算分離架構能夠非常好的解決這個問題:

  • 對於計算:也就是我們的 Broker,提供消息隊列的讀寫,不存儲任何數據,無狀態對於我們擴展非常友好,只要你機器足夠,就能隨便上。
  • 擴容 Broker 往往適用於增加 Consumer 的吞吐,當我們有一些大流量的業務或者活動,比如電商大促,可以提前進行 Broker 的擴容。

  • 對於存儲:也就是我們的 Bookie,只提供消息隊列的存儲,如果對消息量有要求的,我們可以擴容 Bookie,並且我們不需要遷移數據,擴容十分方便。
  • 消息存儲

    名詞解析:

    上圖是 Bookie 的讀寫架構圖,裏面有一些名詞需要先介紹一下:

  • Entry:是存儲到 bookkeeper 中的一條記錄,其中包含 Entry ID,記錄實體等。
  • Ledger:可以認爲 ledger 是用來存儲 Entry 的,多個 Entry 序列組成一個 ledger。
  • Journal:其實就是 bookkeeper 的 WAL(write ahead log),用於存 bookkeeper 的事務日誌,journal 文件有一個最大大小,達到這個大小後會新起一個 journal 文件。
  • Entry log:存儲 Entry 的文件,ledger 是一個邏輯上的概念,entry 會先按 ledger 聚合,然後寫入 entry log 文件中。同樣,entry log 會有一個最大值,達到最大值後會新起一個新的 entry log 文件。
  • Index file:ledger 的索引文件,ledger 中的 entry 被寫入到了 entry log 文件中,索引文件用於 entry log 文件中每一個 ledger 做索引,記錄每個 ledger 在 entry log 中的存儲位置以及數據在 entry log 文件中的長度。
  • MetaData Storage:元數據存儲,是用於存儲 bookie 相關的元數據,比如 bookie 上有哪些 ledger,bookkeeper 目前使用的是 zk 存儲,所以在部署 bookkeeper 前,要先有 zk 集羣。
  • 整體架構上的寫流程:

  • Step1:Broker 發起寫請求,首先對 Journal 磁盤寫入 WAL,熟悉 MySQL 的朋友知道 redolog,journal 和 redolog 作用一樣都是用於恢復沒有持久化的數據。
  • Step2:然後再將數據寫入 index 和 ledger,這裏爲了保持性能不會直接寫盤,而是寫 pagecache,然後異步刷盤。
  • Step3:對寫入進行 ack。
  • 讀流程爲:

  • Step1:先讀取 index,當然也是先讀取 cache,再走 disk。
  • Step2:獲取到 index 之後,根據 index 去 entry logger 中去對應的數據。
  • 如何高效讀寫?在 Kafka 中當我們的 Topic 變多了之後,由於 Kafka 一個 Topic 一個文件,就會導致我們的磁盤 IO 從順序寫變成隨機寫。

    在 RocketMQ 中雖然將多個 Topic 對應一個寫入文件,讓寫入變成了順序寫,但是我們的讀取很容易導致我們的 Pagecache 被各種覆蓋刷新,這對於我們的 IO 的影響是非常大的。

    所以 Pulsar 在讀寫兩個方面針對這些問題都做了很多優化:

    寫流程:順序寫+Pagecache。在寫流程中我們的所有的文件都是獨立磁盤,並且同步刷盤的只有 Journal。

    Journal 是順序寫一個 journal-wal 文件,順序寫效率非常高。ledger 和 index 雖然都會存在多個文件,但是我們只會寫入 Pagecache,異步刷盤,所以隨機寫不會影響我們的性能。

    讀流程:broker cache+bookie cache,在 Pulsar 中對於追尾讀(tailing read)非常友好基本不會走 IO。

    一般情況下我們的 Consumer 是會立即去拿 Producer 發送的消息的,所以這部分在持久化之後依然在 Broker 中作爲 Cache 存在。

    當然就算 Broker 沒有 Cache(比如 Broker 是新建的),我們的 Bookie 也會在 Memtable 中有自己的 Cache,通過多重 Cache 減少讀流程走 IO。

    我們可以發現在最理想的情況下讀寫的 IO 是完全隔離開來的,所以在 Pulsar 中能很容易就支持百萬級 Topic,而在我們的 Kafka 和 RocketMQ 中這個是非常困難的。

    無限流式存儲

    一個 Topic 實際上是一個 ledgers流(Segment),通過這個設計所以 Pulsar 他並不是一個單純的消息隊列系統,他也可以代替流式系統,所以他也叫流原生平臺,可以替代 Flink 等系統。

    可以看見我們的 Event Stream(topic/partition),由多個 Segment 存儲組成,而每個 Segment 由 Entry 組成,這個可以看作是我們每批發送的消息通常會看作是一個 Entry。

    Segment 可以看作是我們寫入文件的一個基本維度,同一個 Segment 的數據會寫在同一個文件上面,不同 Segment 將會是不同文件,而 Segment 之間的在 Metadata 中進行保存。

    分層存儲

    在 Kafka 和 RocketMQ 中消息是會有一定的保存時間的,因爲磁盤會有空間限制。

    在 Pulsar 中也提供這個功能,但是如果你想讓自己的消息永久存儲,那麼可以使用分級存儲,我們可以將一些比較老的數據,定時的刷新到廉價的存儲中,比如 s3,那麼我們就可以無限存儲我們的消息隊列了。

    數據複製

    在 Pulsar 中的數據複製和 Kafka,RocketMQ 都有很大的不同,在其他消息隊列中通常是其他副本主動同步,通常這個時間就會變得不可預測。

    而在 Pulsar 採用了類似 Qurom 協議,給一組可用的 Bookie 池,然後併發的寫入其中的一部分 Bookie,只要返回部分成功(通常大於 1/2)就好。

  • Ensemble Size(E):決定給定 Ledger 可用的 Bookie 池大小。
  • Write Quorum Size(Qw):指定 Pulsar 向其中寫入 Entry 的 Bookie 數量。
  • Ack Quorum Size(Qa):指定必須 Ack 寫入的 Bookie 數量。
  • 採用這種併發寫的方式,會更加高效的進行數據複製,尤其是當數據副本比較多的時候。

    Consumer

    接下來我們來聊聊 Pulsar 中最後一個比較重要的組成 Consumer。

    訂閱模式

    訂閱模式是用來定義我們的消息如何分配給不同的消費者,不同消息隊列中間件都有自己的訂閱模式。

    一般我們常見的訂閱模式有:

  • 集羣模式:一條消息只能被一個集羣內的消費者所消費。
  • 廣播模式:一條消息能被集羣內所有的消費者消費。
  • 在 Pulsar 中提供了 4 種訂閱模式,分別是:

    獨佔:顧名思義只能由一個消費者獨佔,如果同一個集羣內有第二個消費者去註冊,第二個就會失敗,這個適用於全局有序的消息。

    災備:加強版獨佔,如果獨佔的那個掛了,會自動的切換到另外一個好的消費者,但是還是隻能由一個獨佔。

    共享模式:這個模式看起來有點像集羣模式,一條消息也是隻能被一個集羣內消費者消費,但是和 RocketMQ 不同的是,RocketMQ 是以 Partition 維度,同一個 Partition 的數據都會被髮到一個機器上。

    在 Pulsar 中消費不會以 Partition 維度,而是輪訓所有消費者進行消息發送。這有個什麼好處呢?

    如果你有 100 臺機器,但是你只有 10 個 Partition 其實你只有 10 臺消費者能運轉,但是在 Pulsar 中 100 臺機器都可以進行消費處理。

    鍵共享:類似上面說的 Partition 維度去發送,在 RocketMQ 中同一個 Key 的順序消息都會被髮送到一個 Partition。

    但是這裏不會有 Partition 維度,而只是按照 Key 的 Hash 去分配到固定的 Consumer,也解決了消費者能力限制於 Partition 個數問題。

    消息獲取模式

    不論是在 Kafka 還是在 RocketMQ 中我們都是 Client 定時輪訓我們的 Broker 獲取消息,這種模式叫做長輪訓(Long-Polling)模式。

    這種模式有一個缺點網絡開銷比較大,我們來計算一下 Consumer 被消費的時延,我們假設 Broker 和 Consumer 之間的一次網絡延時爲 R。

    那麼我們總共的時間爲:

  • 當某一條消息 A 剛到 Broker 的,這個時候 long-polling 剛好打包完數據返回,Broker 返回到 Consumer 這個時間爲 R。
  • Consumer 又再次發送 Request 請求,這個又爲 R。
  • 將我們的消息 A 返回給 Consumer 這裏又爲 R。
  • 如果只考慮網絡時延,我們可以看見我們這條消息的消費時延大概是 3R,所以我們必須想點什麼對其進行一些優化。

    有同學可能馬上就能想到,我們消息來了直接推送給我們的 Consumer 不就對了,這下我們的時延只會有一次 R,這個就是我們常見的推模式。

    但是簡單的推模式是有問題的,如果我們有生產速度遠遠大於消費速度,那麼推送的消息肯定會幹爆我們的內存,這個就是背壓。

    那麼我們怎麼解決背壓呢?我們就可以優化推送方式,將其變爲動態推送,我們結合 Long-polling,在 long-polling 請求時將 Buffer 剩餘空間告知給 Broker,由 Broker 負責推送數據。

    此時 Broker 知道最多可以推送多少條數據,那麼就可以控制推送行爲,不至於沖垮 Consumer。

    舉個例子:Consumer 發起請求時 Buffer 剩餘容量爲 100,Broker 每次最多返回 32 條消息。

    那麼 Consumer 的這次 long-polling 請求 Broker 將在執行 3 次 Push(共 Push 96 條消息)之後返回 Response 給 Consumer(Response 包含 4 條消息)。

    如果採用 long-polling 模型,Consumer 每發送一次請求 Broker 執行一次響應。

    這個例子需要進行 4 次 long-polling 交互(共 4 個 Request 和 4 個 Response,8 次網絡操作;Dynamic Push/Pull 中是 1 個 Request,3 次 Push 和 1 個 Response,共 5 次網絡操作)。

    所以 Pulsar 就採用了這種消息獲取模式,從 Consumer 層進一步優化消息達到時間。

    我覺得這個設計非常巧妙,很多中間件的這種 long-polling 模式都可以參考這種思想去做一個改善。

    總結

    Apache Pulsar 很多設計思想都和其他中間件不一樣,但無疑於其更加貼近於未來。

    大膽預測一下其他的一些消息中間件未來的發展也都會向其靠攏,目前國內的 Pulsar 使用者也是越來越多,騰訊雲提供了 Pulsar 的雲版本 TDMQ。

    當然還有一些其他的知名公司華爲,知乎,虎牙等等有都在對其做一個逐步的嘗試,我相信 Pulsar 真的是一個趨勢。

    最後也讓我想起了最近大江大河大結局的一句話:

    所有的變化,都可能伴隨着痛苦和彎路,開放的道路,也不會是闊野坦途,但大江大河,奔湧向前的趨勢,不是任何險灘暗礁,能夠阻擋的。道之所在,雖千萬人吾往矣。

    作者:咖啡拿鐵

    編輯:陶家龍

    出處:轉載自公衆號咖啡拿鐵(ID:close_3092860495)

    相關文章