只要是 web 項目,程式都會直接或間接使用到線程池,它的使用是如此頻繁,以至於像空氣一樣,大多數時候被我們無視了。但有時候,我們會相當然地認為線程池與其它對象池(如:資料庫連接池)一樣,要用的時候向池子索取,用完後歸還給它即可。然後事實上,線程池獨樹一幟、鶴立雞群,它與普通的對象池就是不同。本文本 ...
只要是 web 項目,程式都會直接或間接使用到線程池,它的使用是如此頻繁,以至於像空氣一樣,大多數時候被我們無視了。但有時候,我們會相當然地認為線程池與其它對象池(如:資料庫連接池)一樣,要用的時候向池子索取,用完後歸還給它即可。然後事實上,線程池獨樹一幟、鶴立雞群,它與普通的對象池就是不同。本文本將先闡述這種差異,接著用最簡單的代碼實現一個線程池,最後再對 JDK 中與線程池相關的 Executor 體系做一個全面介紹。
線程池與普通資源池的差異
提到 pool 這個設計思想,第一反映是這樣的:從一個資源容器中獲取空閑的資源對象。如果容器中有空閑的,就直接從空閑資源中取出一個返回,如果容器中沒有空閑資源,且容器空間未用盡,就新創建一個資源對象,然後再返回給調用方。這個容器就是資源池,它看起來就像這樣:
圖中的工人隊伍里,有3人是空閑的,工頭(資源池的管理者)可以任選兩人來提供勞務服務。同時,隊隊伍尚未飽和,還可以容納一名工人。如果雇主要求一次性提供4名勞工服務,則工頭需要再招納一名工人加入隊伍,然後再向雇主提供服務。此時,這個團隊(資源池)已達到飽和,不能再對外提供勞務服務了,除非某些工人完成了工作。
以上是一個典型資源池的基本特點,那麼線程池是否也同樣如此呢。至少第一感覺是沒問題的,大概應該也是這樣吧,畢竟拿從池中取出一個線程,再讓它執行對應的代碼,這聽上去很科學嘛。等等,總感覺哪裡不對呢,線程這東西能像普通方法調用那樣,讓我們在主程式里隨意支配嗎?沒錯,問題就在這裡,線程一旦運行起來,就完全閉關鎖國了,除了按照運行前約定好的方式進行數據通信外,再也不能去打擾它老人家了。因此,線程池有點像發動機,池中的各個線程就對應發動機的各個汽缸。整個發動機一旦啟動(線程池激活),各個汽缸中的活塞便按照預定的設計,不停地來回運動,永遠也不停止,直到燃油耗盡,或人為地關閉油門。在此期間,我們是不能控制單個汽缸的活動方向的。就如同我們不能控制正在運行的線程,讓其停止正在執行的代碼,轉而去執行其它代碼一樣(利用 Thread.interrpt() 方法也達不到此目的,而 Thread.stop() 更是直接終止了線程)①。
既然不能直接給線程池裡的單個線程明確指派任務,那線程池的意義何在呢?意義就在於,雖然不能一對一精確指派任務,但可以給整個線程池提交任務,至於這些任務由池中的哪個線程來執行,則是不可控的。此時,可以把線程池看作是生產流水線上的單個工序。這裡以給「老乾媽香辣醬」的玻璃瓶加蓋子為例,給瓶子加蓋就是要執行的任務,最初該工序上只設置了一個機械臂,加蓋子也順序操作的。但單個機械臂忙不過來,後來又加了一個機械臂,這樣效率就提高了。瓶子被加蓋的順序也是不確定的,但最終所有瓶子都會被加蓋。
手動編寫一個簡易的線程池
如上小節所述,線程池與其它池類組件不一樣,調用方不可能直接從池中取出一個線程,然後讓它執行一段任務代碼。因為線程一旦啟動起來,就會在自己的頻軌道內獨立運行,不受外部控制。要讓這些線程執行外部提交的任務,需要提供一個數據通道,將任務打包成一個數據結構傳遞過去。而這些運行起來的線程,他們都執行一個相同的迴圈操作:讀取任務 → 執行任務 → 讀取任務 → ...... ②
┌──────────┐ ┌──────────────┐
┌─→ │Take Task │ -→ │ Execute Task │ ─┐
│ └──────────┘ └──────────────┘ │
└─────────────────────────────────────┘
這個讀取任務的數據通道就是隊列,池中的所有線程都不斷地執行 ② 處的迴圈邏輯,這便是線程池運行的基本原理。
相對於線程池這個叫法,實際上「執行器 Executor」這個術語在實踐中使用得要更多些。因為在 jdk 的 java.util.concurrent 包下,有一個 Executor 介面,它只有一個方法:
public interface Executor {
void execute(Runnable command);
}
這便是執行器介面,顧名思義,它接受一個 Runnable 對象,並能夠執行它。至於如何執行,交由具體的實現類負責,目前至少有以下四種執行方式 ③
- 在當前線程中同步執行
- 總是新開線程來非同步執行
- 只使用一個線程來非同步串列執行
- 使用多個線程來併發執行
本小節將以一個簡易的線程池方式來實現 Executor。
編寫只有一個線程的線程池
這是線程池的最簡形式,實現代碼也非常簡單,如下所示
public class SingleThreadPoolExecutor implements Executor {
// 任務隊列
private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
// 直接將任務添加到隊列中
@Override
public void execute(Runnable task) {
tasks.offer(task);
}
public SingleThreadPoolExecutor() {
// 在構造函數中,直接創建一個線程,作為為線程池的唯一任務執行線程
// 它將在被創建後立即執行,執行邏輯為:
// 1. 從隊列中獲取任務
// 2. 如果獲取到任務,則執行它,執行完後,返回第1步
// 3. 如果未獲取到任務,則簡短休息,繼續第1步
Thread taskRunner = new Thread(() -> {
Runnable task;
while (true) {
task = tasks.poll();
if (task != null) {
task.run();
continue;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
taskRunner.start();
}
}
上述的單線程執行器實現中,執行任務的線程是永遠不會停止的,獲取到任務時,就執行它,沒有獲取到,就一直不斷的獲取。下麵是這個執行器的測試代碼:
public class SingleThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
SingleThreadPoolExecutorstp stp= new SingleThreadPoolExecutor();
// 連續添加 5 個任務
for (int i = 1; i <= 5; i++) {
stp.execute(new SpeakNameTask("Coding Change The World " + i));
}
System.out.println("主線程已結束");
}
// 一個模擬的任務:簡單地輸出名稱
static class SpeakNameTask implements Runnable {
private String name;
public SpeakNameTask(String name) {
this.name = name;
}
@Override
public void run() {
Random random = new Random();
int milliseconds = 500 + random.nextInt(1000);
try {
TimeUnit.MILLISECONDS.sleep(milliseconds);
System.out.println("["+Thread.currentThread().getName()+"]: I believe " + name);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
下麵是輸出結果:
主線程已結束
[Thread-0]: I believe Coding Change The World 1
[Thread-0]: I believe Coding Change The World 2
[Thread-0]: I believe Coding Change The World 3
[Thread-0]: I believe Coding Change The World 4
[Thread-0]: I believe Coding Change The World 5
可以看到:作為測試程式的主線程,已經先執行結束了,而線程池還在順序地執行主線程添加的任務。並且線程池在執行完所有任務後,並沒有退出,jvm 進程會一直存在。
改進為擁有多個線程的線程池
多線程版本的線程池執任務執行器,只是在單線程版本上,增加了執行線程的數量,其它的變化不是很大。但為了更好的組織代碼,需要將任務執行線程的邏輯單獨抽取出來。另外,為了模擬得更像一個池,本示例代碼還增加了以下特性
-
支持核心線程數功能
核心線程數在執行器創建時,一起創建,並永不結束 -
支持最大線程數功能
當核心線程執行任務效率變慢時,增加執行線程 -
支持空閑線程移除功能
當非核心線程空閑時長超過限定值時,結束該線程,並從池中移除
主要代碼如下:
MultiThreadPoolExecutor.java (點擊查看代碼)
public class MultiThreadPoolExecutor implements Executor {
// 線程池
private final Set<TaskRunner> runnerPool = new HashSet<>();
// 任務隊列
private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
// 單個線程最大空閑毫秒數
private int maxIdleMilliSeconds = 3000;
// 核心線程數
private int coreThreadCount = 1;
// 最大線程數
private int maxThreadCount = 3;
public MultiThreadPoolExecutor() {
// 初始化核心線程
for (int i = 0; i < coreThreadCount; i++) {
addRunner(true);
}
}
private void addRunner(boolean isCoreRunner) {
TaskRunner runner = new TaskRunner(isCoreRunner);
runnerPool.add(runner);
runner.start();
}
@Override
public void execute(Runnable task) {
tasks.add(task);
addRunnerIfRequired();
}
// 視情況增加線程數,這裡簡化為當任務數超過線程數的兩倍時,就增加線程
private void addRunnerIfRequired() {
if (tasks.size() <= 2 * runnerPool.size()) {
return;
}
// 未達到最大線程數時,可增加執行線程
if (runnerPool.size() < maxThreadCount) {
synchronized (this) {
if (runnerPool.size() < maxThreadCount) {
addRunner(false);
}
}
}
}
class TaskRunner extends Thread {
// 是否為核心線程
private final boolean coreRunner;
// 已空閑的毫秒數
private long idleMilliseconds = 0;
TaskRunner(boolean coreRunner) {
this.coreRunner = coreRunner;
}
@Override
public void run() {
Runnable task;
while (true) {
task = tasks.poll();
if (task != null) {
task.run();
continue;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
idleMilliseconds += 10;
if(coreRunner) {
continue;
}
if (idleMilliseconds > maxIdleMilliSeconds) {
// 超過最大空間時間,線程結束,並從池中移徐本線程
runnerPool.remove(this);
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
}
完整代碼已上傳至 thread-pool-sample
其實多線程版本的主要難點,是判定增加新線程來執行任務的演算法,即如何確定當前需要添加新線程,而不是保持當前的線程數量來執行任務,以保證最高的效率。以這個粗糙的原始版本為基準,不斷豐富細節和增強健壯性,就可以慢慢演進出 Jdk 中的 Executor 體系。
JDK 線程池任務執行器淺析
Executor 體系類結構
Executor 介面是任務執行器的頂級介面,它僅定義了一個方法,但並未限制如何執行傳遞過來的任務。正如第③處所述,「線程池執行」也只是多種方式中的一種,也是用得最多的一種。由於 Executor 介面定義的功能過於單一,於是在 JDK 的併發包下,又對它進行了擴展,這個擴展就是 ExecutorService,如下所示:
public interface ExecutorService extends Executor {
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
void shutdown();
List<Runnable> shutdownNow();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
boolean isShutdown();
boolean isTerminated();
}
這些擴展方法共分為三組,分別是:任務提交類、狀態控制類、狀態檢查類。從分類上可以看出,ExecutorService 增加了「提交任務」的概念(相對於 Executor 的「執行任務」)。另外,還有「關閉」操作,以及檢測執行器當前的狀態,這些都是 Exector 不具備的。下麵這個分類列表更加清晰:
-
任務提交
方法 非同步提交 批量提交 超時等待 submit(Runnable task) √ submit(Callable task) √ invokeAll(Collection<? extends Callable > tasks) √ invokeAll(Collection<? extends Callable > tasks,long timeout, TimeUnit unit) √ √ invokeAll(Collection<? extends Callable > tasks) √ invokeAny(Collection<? extends Callable > tasks,long timeout, TimeUnit unit) √ √ -
狀態控制
- shutdown()
- shutdownNow()
- awaitTermination(long timeout, TimeUnit unit)
-
狀態檢查
- isShutdown()
- isTerminated()
除了增加了新的方法外,還新增加了一種任務類型,即:java.util.concurrent.Callable,而 Executor 介面定義的任務介面是 java.lang.Runnable。二者的區別是,Callable#call() 方法有返回值,而後者沒有。一般而言,任務提交給執行器後,通常都會非同步執行。提交任務的線程是拿不到這個 call() 方法執行完畢後的返回值的,既然這樣,那定義這個有返回值的方法還有什麼意義呢?
為了拿到返回值,引入了 java.util.concurrent.Future 介面,它定義了獲取單個非同步任務執行結果的方法,不僅如此,它還定義了其它一些訪問和控制單個任務的方法,見下表:
方法 | 解釋 |
---|---|
get() | 阻塞調用線程,直到所關聯的任務執行結束,拿到返回值,或任務執行結束(取消操作和發生異常均會導致結) |
get(long timeout, TimeUnit unit) | 同上,但會有一個最大等待時長,若超過該時長後,任務依然未執行結束,則結束等待,並拋出 TimeoutException |
cancel(boolean mayInterruptIfRunning) | 嘗試取消關聯的任務,只是嘗試,遇到以下情況,均無法取消 · 任務已經取消 · 任務已完成 · 其它原因 通常任務一旦開始執行,就無法取消, 除非是極其特定的任務,這類任務的代碼本身會與外界通信,判斷是否應該取消自己的執行。 因此本方法提供了一個 mayInterruptIfRunning 參數,用來做這種信息傳達, 但也僅僅是一個信息傳達,表達了期望已運行的任務能自我終止, 但能否真的終止,取決於任務本身的代碼邏輯 |
isCancelled() | 檢測關聯的任務是否已「取消」 |
isDone() | 檢測關聯的任務是否已「結束」,任務正常執行完畢、遭遇異常和被取消均視為任務已「結束」 |