更新時間:2022-08-24 10:26:36 來源:動力節(jié)點 瀏覽3139次
在Java多線程應用中,隊列的使用率很高,多數(shù)生產(chǎn)消費模型的首選數(shù)據(jù)結構就是隊列。Java提供的線程安全的Queue可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據(jù)實際需要選用阻塞隊列或者非阻塞隊列。
注:什么叫線程安全?這個首先要明確。線程安全的類 ,指的是類內(nèi)共享的全局變量的訪問必須保證是不受多線程形式影響的。如果由于多線程的訪問(比如修改、遍歷、查看)而使這些變量結構被破壞或者針對這些變量操作的原子性被破壞,則這個類就不是線程安全的。
本文來講講這兩種Queue,本文分為以下兩個部分,用分割線分開:
BlockingQueue 阻塞算法
ConcurrentLinkedQueue,非阻塞算法
首先來看看BlockingQueue:
Queue是什么就不需要多說了吧,一句話:隊列是先進先出。相對的,棧是后進先出。如果不熟悉的話先找本基礎的數(shù)據(jù)結構的書看看吧。
BlockingQueue,顧名思義,“阻塞隊列”:可以提供阻塞功能的隊列。
首先,看看BlockingQueue提供的常用方法:
可能報異常 | 返回布爾值 | 可能阻塞 | 設定等待時間 | |
---|---|---|---|---|
入隊 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
出隊 | remove() | poll() | take() | poll(timeout, unit) |
查看 | element() | peek() | 無 | 無 |
從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是:
add(e) remove() element() 方法不會阻塞線程。當不滿足約束條件時,會拋出IllegalStateException 異常。例如:當隊列被元素填滿后,再調用add(e),則會拋出異常。
offer(e) poll() peek() 方法即不會阻塞線程,也不會拋出異常。例如:當隊列被元素填滿后,再調用offer(e),則不會插入元素,函數(shù)返回false。
要想要實現(xiàn)阻塞功能,需要調用put(e) take() 方法。當不滿足約束條件時,會阻塞線程。
BlockingQueue 阻塞算法
BlockingQueue作為線程容器,可以為線程同步提供有力的保障。
BlockingQueue定義的常用方法如下:
拋出異常 | 特殊值 | 阻塞 | 超時 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
1. ArrayBlockingQueue
基于數(shù)組的阻塞隊列實現(xiàn),在ArrayBlockingQueue內(nèi)部,維護了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象,這是一個常用的阻塞隊列,除了一個定長數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數(shù)組中的位置。
ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費者獲取數(shù)據(jù),都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;按照實現(xiàn)原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現(xiàn)生產(chǎn)者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時,我們還可以控制對象的內(nèi)部鎖是否采用公平鎖,默認采用非公平鎖。
2. LinkedBlockingQueue
基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內(nèi)部也維持著一個數(shù)據(jù)緩沖隊列(該隊列由一個鏈表構成),當生產(chǎn)者往隊列中放入一個數(shù)據(jù)時,隊列會從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊列內(nèi)部,而生產(chǎn)者立即返回;只有當隊列緩沖區(qū)達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數(shù)指定該值),才會阻塞生產(chǎn)者隊列,直到消費者從隊列中消費掉一份數(shù)據(jù),生產(chǎn)者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還因為其對于生產(chǎn)者端和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
作為開發(fā)者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。
阻塞隊列:線程安全
按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,并且隊列檢索操作會獲得位于隊列頭部的元素。鏈接隊列的吞吐量通常要高于基于數(shù)組的隊列,但是在大多數(shù)并發(fā)應用程序中,其可預知的性能要低。
注意:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingDeque {
//阻塞隊列,F(xiàn)IFO
private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Producer("producer1"));
executorService.submit(new Producer("producer2"));
executorService.submit(new Producer("producer3"));
executorService.submit(new Consumer("consumer1"));
executorService.submit(new Consumer("consumer2"));
executorService.submit(new Consumer("consumer3"));
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
System.out.println(name+ " 生產(chǎn): " + i);
//concurrentLinkedQueue.add(i);
try {
concurrentLinkedQueue.put(i);
Thread.sleep(200); //模擬慢速的生產(chǎn),產(chǎn)生阻塞的效果
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
try {
//必須要使用take()方法在獲取的時候阻塞
System.out.println(name+"消費: " + concurrentLinkedQueue.take());
//使用poll()方法 將產(chǎn)生非阻塞效果
//System.out.println(name+"消費: " + concurrentLinkedQueue.poll());
//還有一個超時的用法,隊列空時,指定阻塞時間后返回,不會一直阻塞
//但有一個疑問,既然可以不阻塞,為啥還叫阻塞隊列?
//System.out.println(name+" Consumer " + concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
ConcurrentLinkedQueue,非阻塞算法
非阻塞隊列
基于鏈接節(jié)點的、無界的、線程安全。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列檢索操作從隊列頭部獲得元素。當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當?shù)倪x擇。此隊列不允許 null 元素。
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class NoBlockQueue {
private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Producer("producer1"));
executorService.submit(new Producer("producer2"));
executorService.submit(new Producer("producer3"));
executorService.submit(new Consumer("consumer1"));
executorService.submit(new Consumer("consumer2"));
executorService.submit(new Consumer("consumer3"));
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
System.out.println(name+ " start producer " + i);
concurrentLinkedQueue.add(i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//System.out.println(name+"end producer " + i);
}
}
}
static class Consumer implements Runnable {
private String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
try {
System.out.println(name+" Consumer " + concurrentLinkedQueue.poll());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// System.out.println();
// System.out.println(name+" end Consumer " + i);
}
}
}
}
在并發(fā)編程中,一般推薦使用阻塞隊列,這樣實現(xiàn)可以盡量地避免程序出現(xiàn)意外的錯誤。阻塞隊列使用最經(jīng)典的場景就是socket客戶端數(shù)據(jù)的讀取和解析,讀取數(shù)據(jù)的線程不斷將數(shù)據(jù)放入隊列,然后解析線程不斷從隊列取數(shù)據(jù)解析。還有其他類似的場景,只要符合生產(chǎn)者-消費者模型的都可以使用阻塞隊列。
使用非阻塞隊列,雖然能即時返回結果(消費結果),但必須自行編碼解決返回為空的情況處理(以及消費重試等問題)。
另外他們都是線程安全的,不用考慮線程同步問題。