手寫JDK組件之阻塞隊列BlockedQueue
摘要:我們準備了20個元素入隊,然後併發進行出隊,等待兩個線程出隊完成之後,在主線程進行隊列元素的遍歷操作,結果如下:。通過Lock+Condition實現 “等待-通知” 機制與synchronized類似,我們本文實現阻塞隊列BlockedQueue的方式就是通過Lock+Condition實現。
研究了一段時間框架,有點審美疲勞,今天講點輕鬆的,手寫一個阻塞隊列,實踐一把lock+condition。
“等待通知”機制
首先複習一下經典的 “等待通知”機制。
線程首先獲取互斥鎖,當線程要求的條件不滿足時,釋放互斥鎖,進入等待狀態;當要求的條件滿足時,通知等待的線程,重新獲取互斥鎖 –《極客時間-Java併發編程實戰》
在Java中實現 “等待通知” 機制一般有兩種方式,synchronized/Lock+Condition。
通過synchronized實現 “等待-通知” 機制
synchronized同步原語(或稱:管程)配合wait()、notify()、notifyAll()就可以實現“等待通知”機制。
機理是怎樣的呢?
當使用synchronized管程對某一塊臨界區進行加鎖,同一時刻,只能允許一個線程進入synchronized保護的臨界區中。
當該遠程進入臨界區之後,其他的線程如果來訪問臨界區就需要進入等待隊列中進行等待。
這裏要注意,等待隊列與鎖是一一對應關係,每個互斥鎖都有自己的獨立的等待隊列。
Java對象的wait()方法就能夠讓線程進入等待狀態,此時線程被阻塞。
當線程進入等待隊列時,會釋放當前持有的互斥鎖。當它釋放鎖之後,其他的線程就有機會獲得該互斥鎖並進入臨界區。
那如何通知滿足條件的線程呢?
通過Java對象的notify()和notifyAll()方法就能夠實現。當條件滿足時調用notify(),會通知等待隊列中的線程,通知它 條件曾經滿足過 。
notify()只能保證在通知的那一時間點,條件是滿足的。也就是,有可能被通知線程執行的時間點與通知的時間點是不相等的;即:線程執行的時候,條件已經不滿足了(可能有其他的線程滿足了該條件而插隊)
另外,就算線程被通知而喚醒,在進入臨界區前依舊需要獲取互斥鎖,因爲這把需要獲取的鎖在調用wait()的時候已經被釋放了。
需要注意的是:
wait()、notify()、notifyAll()被調用的前提是獲取到了響應的互斥鎖,也就是調用這三個方法的位置都是在 synchronized{} 內部。如果調用的位置在synchronized外部或者不是使用同一把互斥鎖,JVM會拋出 java.lang.IllegalMonitorStateException 異常。
關於synchronized實現 “等待-通知” 機制我們就講到這裏。
通過Lock+Condition實現 “等待-通知” 機制與synchronized類似,我們本文實現阻塞隊列BlockedQueue的方式就是通過Lock+Condition實現。
Lock+Condition原理講解
Condition 定義了等待/通知兩種類型的方法:await()/signal()/signalAll()。線程調用這些方法之前需要獲取Condition關聯的鎖。 Condition對象是由Lock對象通過newCondition()方法創建的,也就是說,Condition是依賴Lock對象的。
類比上文中講到的synchronized實現 “等待-通知” 機制,Lock/Condition涉及到的方法與synchronized方式涉及到的方法的語義是一一對應的,具體如下表:
synchronized | Lock/Condition | 描述 |
---|---|---|
wait() | await() | 等待 |
notify() | signal() | 通知單個等待隊列中的線程 |
notifyAll() | signalAll() | 通知所有等待隊列中的線程 |
實現阻塞隊列BlockedQueue
瞭解並複習了 管程中的“等待/通知機制”,我們開始實現阻塞隊列BlockedQueue。
在編寫過程中參考了JUC中的ArrayBlockingQueue源碼實現。
public class BlockedQueue<T> { final Lock lock = new ReentrantLock(); // 條件變量:隊列不滿 final Condition notFull = lock.newCondition(); // 條件變量:隊列不空 final Condition notEmpty = lock.newCondition(); // 阻塞單列最大長度 int capacity = 0; // 當前已經存在下標:入隊 int putIndex = 0; // 當前已經存在下標:出隊 int takeIndex = 0; // 元素總數 int elementsSize = 0; // 元素數組 Object[] items; // 構造方法 public BlockedQueue(int capacity) { this.capacity = capacity; items = new Object[capacity]; System.out.println("capacity=" + capacity + ",items.size=" + items.length); }
這段代碼中我們聲明瞭阻塞隊列,支持泛型。聲明瞭需要的成員變量以及有參構造方法。構造方法中根據外界輸入的隊列最大長度初始化了內部的元素數組。
提前聲明並初始化了Lock(實現方式爲ReentrantLock可重入鎖),並在Lock基礎上初始化了兩個Condition條件變量,分別標記隊列不滿、隊列不空。
// 入隊 void enq(T x) { lock.lock(); try { // 隊列已滿 while (items.length == elementsSize) { // 等待隊列不滿 notFull.await(); } // 入隊操作... items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; ++elementsSize; // 入隊後, 通知可出隊 notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println(x.toString() + "--入隊完成"); } }
這段代碼爲入隊邏輯。
首先獲取可重入鎖,如果加鎖成功則進入臨界區邏輯,否則嘗試解鎖。
當隊列已經滿時,則進入阻塞狀態,等待隊列不滿。
如果隊列不滿則進行入隊,當前下標的元素即爲要入隊的元素,元素總長度增1。
// 出隊 T deq() { lock.lock(); T x = null; try { // 隊列已空 while (items.length == 0) { // 等待隊列不空 notEmpty.await(); } // 出隊操作... x = (T) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; elementsSize--; // 出隊後,通知可入隊 notFull.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return x; }
這段代碼爲出隊邏輯。
首先獲取可重入鎖,如果加鎖成功則進入臨界區邏輯,否則嘗試解鎖。
當隊列已經空,則進入阻塞狀態,等待隊列不空。
如果隊列不空則進行出隊操作,先暫存當前下標的元素,並將當前下標的元素標記爲空(NULL);元素總長度減1,解鎖後返回當前已經出隊的元素。
public T get(int index) { return (T) items[index]; }
這段代碼爲獲取對應下標的元素,如果元素不存在則返回空。
測試阻塞隊列:單線程操作
開發完基本邏輯之後,我們寫一個demo來測試一下BlockedQueue。
public static void main(String[] args) { BlockedQueue<String> blockedQueue = new BlockedQueue<>(20); for (int i = 0; i < 20; i++) { blockedQueue.enq("snowalker:" + i); } System.out.println("入隊結束:-------------------------"); for (int i = 0; i < 20; i++) { System.out.println(blockedQueue.get(i)); } for (int i = 0; i < 20; i++) { blockedQueue.deq(); } System.out.println("出隊結束:-------------------------"); for (int i = 0; i < 20; i++) { System.out.println(blockedQueue.get(i)); } }
邏輯很好理解,我們構造了一個BlockedQueue,添加了20個元素進行入隊。入隊之後遍歷元素,查看入隊結果。
接着進行20次出隊,並遍歷出隊後的結果。
運行結果如下:
capacity=20,items.size=20 入隊結束:------------------------- snowalker:0 snowalker:1 snowalker:2 snowalker:3 snowalker:4 snowalker:5 snowalker:6 snowalker:7 snowalker:8 snowalker:9 snowalker:10 snowalker:11 snowalker:12 snowalker:13 snowalker:14 snowalker:15 snowalker:16 snowalker:17 snowalker:18 snowalker:19 出隊結束:------------------------- null null null null null null null null null null null null null null null null null null null null
可以看到,進行了20次入隊之後元素共有20個;
進行了20次出隊操作之後,元素全部爲空,表示出隊成功。
測試阻塞隊列:多線程操作
我們接着測試一下多線程併發操作下,BlockedQueue的表現。
BlockedQueue<String> blockedQueue = new BlockedQueue<>(20); CountDownLatch begin = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(2); Thread thread0 = new Thread(new Runnable() { @Override public void run() { try { begin.await(); System.out.println("線程0準備完畢"); for (int i = 0; i < 10; i++) { blockedQueue.enq("線程0-snowalker-" + i); } System.out.println("線程0入隊結束:-------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } finally { end.countDown(); } } }); Thread thread1 = new Thread(new Runnable() { @Override public void run() { try { begin.await(); System.out.println("線程1準備完畢"); for (int i = 10; i < 20; i++) { blockedQueue.enq("線程1-snowalker-" + i); } System.out.println("線程1入隊結束:-------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } finally { end.countDown(); } } }); thread0.start(); thread1.start(); begin.countDown(); end.await(); System.out.println("主線程準備完畢!"); System.out.println("主線程遍歷開始!"); for (int i = 0; i < 20; i++) { System.out.println(blockedQueue.get(i)); } System.out.println("Bingo!"); }
我們定義了兩個線程,每個線程添加10個元素,通過閉鎖CountDownLatch進行併發添加,添加完成之後遍歷添加結果。打印如下:
capacity=20,items.size=20 線程0準備完畢 線程1準備完畢 線程0-snowalker-0--入隊完成 線程1-snowalker-10--入隊完成 線程0-snowalker-1--入隊完成 線程1-snowalker-11--入隊完成 線程0-snowalker-2--入隊完成 線程1-snowalker-12--入隊完成 線程0-snowalker-3--入隊完成 線程1-snowalker-13--入隊完成 線程0-snowalker-4--入隊完成 線程1-snowalker-14--入隊完成 線程0-snowalker-5--入隊完成 線程1-snowalker-15--入隊完成 線程1-snowalker-16--入隊完成 線程1-snowalker-17--入隊完成 線程1-snowalker-18--入隊完成 線程0-snowalker-6--入隊完成 線程1-snowalker-19--入隊完成 線程1入隊結束:------------------------- 線程0-snowalker-7--入隊完成 線程0-snowalker-8--入隊完成 線程0-snowalker-9--入隊完成 線程0入隊結束:------------------------- 主線程準備完畢! 主線程遍歷開始! 線程0-snowalker-0 線程1-snowalker-10 線程0-snowalker-1 線程1-snowalker-11 線程0-snowalker-2 線程1-snowalker-12 線程0-snowalker-3 線程1-snowalker-13 線程0-snowalker-4 線程1-snowalker-14 線程0-snowalker-5 線程1-snowalker-15 線程0-snowalker-6 線程1-snowalker-16 線程1-snowalker-17 線程1-snowalker-18 線程1-snowalker-19 線程0-snowalker-7 線程0-snowalker-8 線程0-snowalker-9 Bingo!
可以看到結果符合預期,我們接着測試一下併發出隊,接着上面的添加結果進行併發出隊操作。
CountDownLatch begin = new CountDownLatch(1); CountDownLatch dequeue = new CountDownLatch(2); for (int i = 0; i < 20; i++) { blockedQueue.enq("snowalker:" + i); } Thread thread2 = new Thread(new Runnable() { @Override public void run() { try { begin.await(); System.out.println("線程2準備完畢"); for (int i = 0; i <= 10; i++) { blockedQueue.deq(); } System.out.println("線程2出隊結束:-------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } finally { dequeue.countDown(); } } }); Thread thread3 = new Thread(new Runnable() { @Override public void run() { try { begin.await(); System.out.println("線程3準備完畢"); for (int i = 0; i <= 10; i++) { blockedQueue.deq(); } System.out.println("線程3出隊結束:-------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } finally { dequeue.countDown(); } } }); thread2.start(); thread3.start(); begin.countDown(); dequeue.await(); System.out.println("主線程準備完畢!"); System.out.println("主線程遍歷開始!"); for (int i = 0; i < 20; i++) { System.out.println(blockedQueue.get(i)); } System.out.println("Bingo!"); }
我們準備了20個元素入隊,然後併發進行出隊,等待兩個線程出隊完成之後,在主線程進行隊列元素的遍歷操作,結果如下:
capacity=20,items.size=20 snowalker:0--入隊完成 snowalker:1--入隊完成 snowalker:2--入隊完成 snowalker:3--入隊完成 snowalker:4--入隊完成 snowalker:5--入隊完成 snowalker:6--入隊完成 snowalker:7--入隊完成 snowalker:8--入隊完成 snowalker:9--入隊完成 snowalker:10--入隊完成 snowalker:11--入隊完成 snowalker:12--入隊完成 snowalker:13--入隊完成 snowalker:14--入隊完成 snowalker:15--入隊完成 snowalker:16--入隊完成 snowalker:17--入隊完成 snowalker:18--入隊完成 snowalker:19--入隊完成 線程2準備完畢 線程2出隊結束:------------------------- 線程3準備完畢 線程3出隊結束:------------------------- 主線程準備完畢! 主線程遍歷開始! null null null null null null null null null null null null null null null null null null null null Bingo!
結果如上圖所示,可以看到併發出隊結果滿足預期。
小結
本文我們利用JUC中的Lock+Condition管程實現了自定義BlockedQueue阻塞隊列的開發,並通過測試用例測試了併發條件下的出隊入隊,結果符合預期。
版權聲明:
原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以鏈接形式標明本文地址。