線程池就是提前創(chuàng)建若干個線程,如果有任務需要處理,線程池里的線程就會處理任務,處理完之后線程并不會被銷毀,而是等待下一個任務。由于創(chuàng)建和銷毀線程都是消耗系統(tǒng)資源的,所以當你想要頻繁的創(chuàng)建和銷毀線程的時候就可以考慮使用線程池來提升系統(tǒng)的性能。
java中經(jīng)常需要用到多線程來處理一些業(yè)務,我們非常不建議單純使用繼承Thread或者實現(xiàn)Runnable接口的方式來創(chuàng)建線程,那樣勢必有創(chuàng)建及銷毀線程耗費資源、線程上下文切換問題。同時創(chuàng)建過多的線程也可能引發(fā)資源耗盡的風險,這個時候引入線程池比較合理,方便線程任務的管理。java中涉及到線程池的相關類均在jdk1.5開始的java.util.concurrent包中,涉及到的幾個核心類及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等。
線程池提供了一種限制和管理資源(包括執(zhí)行一個任務)。 每個線程池還維護一些基本統(tǒng)計信息,例如已完成任務的數(shù)量。
線程池的好處如下:
1.降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
2.可有效的控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率,同時避免過多資源競爭,避免堵塞。
3.提供定時執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
1)線程提交到線程池
2)判斷核心線程池是否已經(jīng)達到設定的數(shù)量,如果沒有達到,則直接創(chuàng)建線程執(zhí)行任務
3)如果達到了,則放在隊列中,等待執(zhí)行
4)如果隊列已經(jīng)滿了,則判斷線程的數(shù)量是否已經(jīng)達到設定的最大值,如果達到了,則直接執(zhí)行拒絕策略
5)如果沒有達到,則創(chuàng)建線程執(zhí)行任務。
RUNNING :能接受新提交的任務,并且也能處理阻塞隊列中的任務;
SHUTDOWN:關閉狀態(tài),不再接受新提交的任務,但卻可以繼續(xù)處理阻塞隊列中已保存的任務。在線程池處于 RUNNING 狀態(tài)時,調用 shutdown()方法會使線程池進入到該狀態(tài)。(finalize() 方法在執(zhí)行過程中也會調用shutdown()方法進入該狀態(tài));
STOP:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時,調用 shutdownNow() 方法會使線程池進入到該狀態(tài);
TIDYING:如果所有的任務都已終止了,workerCount (有效線程數(shù)) 為0,線程池進入該狀態(tài)后會調用 terminated() 方法進入TERMINATED 狀態(tài)。
TERMINATED:在terminated() 方法執(zhí)行完后進入該狀態(tài),默認terminated()方法中什么也沒有做。
Executor就是一個線程池框架,Executor 位于java.util.concurrent.Executors ,提供了用于創(chuàng)建工作線程的線程池的工廠方法。它包含一組用于有效管理工作線程的組件。Executor API 通過 Executors 將任務的執(zhí)行與要執(zhí)行的實際任務解耦。
Executor 接口對象能執(zhí)行我們的線程任務;
Executors 工具類的不同方法按照我們的需求創(chuàng)建了不同的線程池,來滿足業(yè)務的需求。
ExecutorService 接口繼承了Executor接口并進行了擴展,提供了更多的方法,我們能夠獲得任務執(zhí)行的狀態(tài)并且可以獲取任務的返回值。
1.newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
2.newFixedThreadPoo
創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
3.newCachedThreadPool
創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
4.newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行。
1)newFixedThreadPool和newSingleThreadExecutor: 主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool: 主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。
3)線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓程序員更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。
ThreadPoolExecutor是線程池的核心實現(xiàn)類,在JDK1.5引入,位于java.util.concurrent包。
通過下面的demo來了解ThreadPoolExecutor創(chuàng)建線程的過程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 測試ThreadPoolExecutor對線程的執(zhí)行順序
**/
public class ThreadPoolSerialTest {
public static void main(String[] args) {
//核心線程數(shù)
int corePoolSize = 3;
//最大線程數(shù)
int maximumPoolSize = 6;
//超過 corePoolSize 線程數(shù)量的線程最大空閑時間
long keepAliveTime = 2;
//以秒為時間單位
TimeUnit unit = TimeUnit.SECONDS;
//創(chuàng)建工作隊列,用于存放提交的等待執(zhí)行任務
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try {
//創(chuàng)建線程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循環(huán)提交任務
for (int i = 0; i < 8; i++) {
//提交任務的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//線程打印輸出
System.out.println("大家好,我是線程:" + index);
try {
//模擬線程執(zhí)行時間,10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每個任務提交后休眠500ms再提交下一個任務,用于保證提交順序
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
當一個新任務被提交時:
1. 當前活躍線程數(shù)<corePoolSize,則創(chuàng)建一個新線程執(zhí)行新任務;
2. 當前活躍線程數(shù)>corePoolSize,且隊列(workQueue)未滿時,則將新任務放入隊列中;
3. 當前活躍線程數(shù)>corePoolSize,且隊列(workQueue)已滿,且當前活躍線程數(shù)<maximumPoolSize,則繼續(xù)創(chuàng)建一個新線程執(zhí)行新任務;
4. 當前活躍線程數(shù)>corePoolSize,且隊列(workQueue)已滿,且當前活躍線程數(shù)=maximumPoolSize,則執(zhí)行拒絕策略(handler);
當任務執(zhí)行完成后:
1. 超出corePoolSize的空閑線程,在等待新任務時,如果超出了keepAliveTime,則線程會被銷毀;
2. 如果allowCoreThreadTimeOut被設置為true,那么corePoolSize以內的空閑線程,如果超出了keepAliveTime,則同樣會被銷毀。
corePoolSize就是線程池中的核心線程數(shù)量,這幾個核心線程在沒有用的時候,也不會被回收
maximumPoolSize就是線程池中可以容納的最大線程的數(shù)量
keepAliveTime就是線程池中除了核心線程之外的其他的最長可以保留的時間,因為在線程池中,除了核心線程即使在無任務的情況下也不能被清 除,其余的都是有存活時間的,意思就是非核心線程可以保留的最長的空閑時間
util就是計算這個時間的一個單位。
workQueue就是等待隊列,任務可以儲存在任務隊列中等待被執(zhí)行,執(zhí)行的是FIFIO原則(先進先出)。
threadFactory就是創(chuàng)建線程的線程工廠。
handler是一種拒絕策略,我們可以在任務滿了之后,拒絕執(zhí)行某些任務。
當線程充滿了ThreadPool的有界隊列時,飽和策略開始起作用。飽和策略可以理解為隊列飽和后,處理后續(xù)無法入隊的任務的策略。ThreadPoolExecutor可以通過調用setRejectedExecutionHandler來修改飽和策略。
當請求任務不斷的過來,而系統(tǒng)此時又處理不過來的時候,我們需要采取的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經(jīng)包含四種處理策略。
AbortPolicy策略:該策略會直接拋出異常,阻止系統(tǒng)正常工作。
CallerRunsPolicy 策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
DiscardOleddestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執(zhí)行的任務,并嘗試再次提交當前任務。
DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。
除了JDK默認提供的四種拒絕策略,我們可以根據(jù)自己的業(yè)務需求去自定義拒絕策略,自定義的方式很簡單,直接實現(xiàn)RejectedExecutionHandler接口即可。
1.execute()方法用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執(zhí)行成功與否;
2.submit()方法用于提交需要返回值的任務。線程池會返回一個 Future 類型的對象,通過這個 Future 對象可以判斷任務是否執(zhí)行成功,并且可以通過 Future 的 get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執(zhí)行完。
線程組ThreadGroup對象中的stop,resume,suspend會導致安全問題,主要是死鎖問題,已經(jīng)被官方廢棄,多以價值已經(jīng)大不如以前。
線程組ThreadGroup不是線程安全的,在使用過程中不能及時獲取安全的信息。
shutdownNow():立即關閉線程池(暴力),正在執(zhí)行中的及隊列中的任務會被中斷,同時該方法會返回被中斷的隊列中的任務列表;
shutdown():平滑關閉線程池,正在執(zhí)行中的及隊列中的任務能執(zhí)行完成,后續(xù)進來的任務會被執(zhí)行拒絕策略;
isTerminated():當正在執(zhí)行的任務及對列中的任務全部都執(zhí)行(清空)完就會返回true;
線程池將線程和任務進行解耦,線程是線程,任務是任務,擺脫了之前通過 Thread 創(chuàng)建線程時的一個線程必須對應一個任務的限制。在線程池中,同一個線程可以從阻塞隊列中不斷獲取新任務來執(zhí)行,其核心原理在于線程池對 Thread 進行了封裝,并不是每次執(zhí)行任務都會調用 Thread.start() 來創(chuàng)建新線程,而是讓每個線程去執(zhí)行一個“循環(huán)任務”,在這個“循環(huán)任務”中不停的檢查是否有任務需要被執(zhí)行,如果有則直接執(zhí)行,也就是調用任務中的 run 方法,將 run 方法當成一個普通的方法執(zhí)行,通過這種方式將只使用固定的線程就將所有任務的 run 方法串聯(lián)起來。
1.ArrayBlockingQueue是一個基于數(shù)組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2.LinkedBlockingQueue一個基于鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列
3.SynchronousQueue 一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool()使用了這個隊列。
4.PriorityBlockingQueue 一個具有優(yōu)先級的無限阻塞隊列。
首先是利用好SpringBoot的自動裝配功能,配置好線程池的一些基本參數(shù)。
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/*
* 線程池名前綴
*/
private static final String threadNamePrefix = "Api-Async-";
/**
* bean的名稱, 默認為首字母小寫的方法名
* @return
*/
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 默認情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當有任務來之后,就會創(chuàng)建一個線程去執(zhí)行任務,
* 當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
* 當隊列滿了,就繼續(xù)創(chuàng)建線程,當線程數(shù)量大于等于maxPoolSize后,開始使用拒絕策略拒絕
*/
/*
* 核心線程數(shù)(默認線程數(shù))
*/
executor.setCorePoolSize(corePoolSize);
//最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//緩沖隊列數(shù)
executor.setQueueCapacity(queueCapacity);
//允許線程空閑時間(單位是秒)
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
//用來設置線程池關閉時候等待所有任務都完成再繼續(xù)銷毀其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//線程池對拒絕任務的處理策略,CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化
executor.initialize();
return executor;
}
}
配置好線程池的基本參數(shù)時候,我們就可以使用線程池了, 只要在一個限定域為public的方法頭部加上@Async注解即可。
@Async
public void createOrder() {
System.out.println("執(zhí)行任務");
}
1)分析任務的特性
任務的性質:CPU 密集型任務、IO 密集型任務和混合型任務。
任務的優(yōu)先級:高、中、低。
任務的執(zhí)行時間:長、中、短。
任務的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
2)具體策略
[1]CPU 密集型任務配置盡可能小的線程,如配置N(CPU核心數(shù))+1個線程的線程池。
[2]IO 密集型任務則由于線程并不是一直在執(zhí)行任務,則配置盡可能多的線程,如2*N(CPU核心數(shù))。
[3]混合型任務如果可以拆分,則將其拆分成一個 CPU 密集型任務和一個 IO 密集型任務。只要這兩個任務執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率;如果這兩個任務執(zhí)行時間相差太大,則沒必要進行分解。
[4]優(yōu)先級不同的任務可以使用優(yōu)先級隊列 PriorityBlockingQueue 來處理,它可以讓優(yōu)先級高的任務先得到執(zhí)行。但是,如果一直有高優(yōu)先級的任務加入到阻塞隊列中,那么低優(yōu)先級的任務可能永遠不能執(zhí)行。
[5]執(zhí)行時間不同的任務可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務先執(zhí)行。
[6]依賴數(shù)據(jù)庫連接池的任務,因為線程提交 SQL 后需要等待數(shù)據(jù)庫返回結果,線程數(shù)應該設置得較大,這樣才能更好的利用 CPU。
[7]建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預警能力。可以根據(jù)需要設大一點,比如幾千。使用無界隊列,線程池的隊列就會越來越大,有可能會撐滿內存,導致整個系統(tǒng)不可用。