線程池的運行邏輯與你想象的不一樣,它是池族中的異類

来源:https://www.cnblogs.com/guzb/p/18108245/difference-of-thread-pool-implementation
-Advertisement-
Play Games

只要是 web 項目,程式都會直接或間接使用到線程池,它的使用是如此頻繁,以至於像空氣一樣,大多數時候被我們無視了。但有時候,我們會相當然地認為線程池與其它對象池(如:資料庫連接池)一樣,要用的時候向池子索取,用完後歸還給它即可。然後事實上,線程池獨樹一幟、鶴立雞群,它與普通的對象池就是不同。本文本 ...


只要是 web 項目,程式都會直接或間接使用到線程池,它的使用是如此頻繁,以至於像空氣一樣,大多數時候被我們無視了。但有時候,我們會相當然地認為線程池與其它對象池(如:資料庫連接池)一樣,要用的時候向池子索取,用完後歸還給它即可。然後事實上,線程池獨樹一幟、鶴立雞群,它與普通的對象池就是不同。本文本將先闡述這種差異,接著用最簡單的代碼實現一個線程池,最後再對 JDK 中與線程池相關的 Executor 體系做一個全面介紹。

線程池與普通資源池的差異

提到 pool 這個設計思想,第一反映是這樣的:從一個資源容器中獲取空閑的資源對象。如果容器中有空閑的,就直接從空閑資源中取出一個返回,如果容器中沒有空閑資源,且容器空間未用盡,就新創建一個資源對象,然後再返回給調用方。這個容器就是資源池,它看起來就像這樣:

pool-illustration-via-workman

圖中的工人隊伍里,有3人是空閑的,工頭(資源池的管理者)可以任選兩人來提供勞務服務。同時,隊隊伍尚未飽和,還可以容納一名工人。如果雇主要求一次性提供4名勞工服務,則工頭需要再招納一名工人加入隊伍,然後再向雇主提供服務。此時,這個團隊(資源池)已達到飽和,不能再對外提供勞務服務了,除非某些工人完成了工作。

以上是一個典型資源池的基本特點,那麼線程池是否也同樣如此呢。至少第一感覺是沒問題的,大概應該也是這樣吧,畢竟拿從池中取出一個線程,再讓它執行對應的代碼,這聽上去很科學嘛。等等,總感覺哪裡不對呢,線程這東西能像普通方法調用那樣,讓我們在主程式里隨意支配嗎?沒錯,問題就在這裡,線程一旦運行起來,就完全閉關鎖國了,除了按照運行前約定好的方式進行數據通信外,再也不能去打擾它老人家了。因此,線程池有點像發動機,池中的各個線程就對應發動機的各個汽缸。整個發動機一旦啟動(線程池激活),各個汽缸中的活塞便按照預定的設計,不停地來回運動,永遠也不停止,直到燃油耗盡,或人為地關閉油門。在此期間,我們是不能控制單個汽缸的活動方向的。就如同我們不能控制正在運行的線程,讓其停止正在執行的代碼,轉而去執行其它代碼一樣(利用 Thread.interrpt() 方法也達不到此目的,而 Thread.stop() 更是直接終止了線程)

four-stroke-engine-illustration

既然不能直接給線程池裡的單個線程明確指派任務,那線程池的意義何在呢?意義就在於,雖然不能一對一精確指派任務,但可以給整個線程池提交任務,至於這些任務由池中的哪個線程來執行,則是不可控的。此時,可以把線程池看作是生產流水線上的單個工序。這裡以給「老乾媽香辣醬」的玻璃瓶加蓋子為例,給瓶子加蓋就是要執行的任務,最初該工序上只設置了一個機械臂,加蓋子也順序操作的。但單個機械臂忙不過來,後來又加了一個機械臂,這樣效率就提高了。瓶子被加蓋的順序也是不確定的,但最終所有瓶子都會被加蓋。

手動編寫一個簡易的線程池

如上小節所述,線程池與其它池類組件不一樣,調用方不可能直接從池中取出一個線程,然後讓它執行一段任務代碼。因為線程一旦啟動起來,就會在自己的頻軌道內獨立運行,不受外部控制。要讓這些線程執行外部提交的任務,需要提供一個數據通道,將任務打包成一個數據結構傳遞過去。而這些運行起來的線程,他們都執行一個相同的迴圈操作:讀取任務 → 執行任務 → 讀取任務 → ......

      ┌──────────┐    ┌──────────────┐
  ┌─→ │Take Task │ -→ │ Execute Task │ ─┐
  │   └──────────┘    └──────────────┘  │
  └─────────────────────────────────────┘

這個讀取任務的數據通道就是隊列,池中的所有線程都不斷地執行 ② 處的迴圈邏輯,這便是線程池運行的基本原理。

相對於線程池這個叫法,實際上「執行器 Executor」這個術語在實踐中使用得要更多些。因為在 jdk 的 java.util.concurrent 包下,有一個 Executor 介面,它只有一個方法:

public interface Executor {
    void execute(Runnable command);
}

這便是執行器介面,顧名思義,它接受一個 Runnable 對象,並能夠執行它。至於如何執行,交由具體的實現類負責,目前至少有以下四種執行方式

  • 在當前線程中同步執行
  • 總是新開線程來非同步執行
  • 只使用一個線程來非同步串列執行
  • 使用多個線程來併發執行

本小節將以一個簡易的線程池方式來實現 Executor。

編寫只有一個線程的線程池

這是線程池的最簡形式,實現代碼也非常簡單,如下所示

public class SingleThreadPoolExecutor implements Executor {
    // 任務隊列
    private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();

    // 直接將任務添加到隊列中
    @Override
    public void execute(Runnable task) {
        tasks.offer(task);
    }

    public SingleThreadPoolExecutor() {
        // 在構造函數中,直接創建一個線程,作為為線程池的唯一任務執行線程
        // 它將在被創建後立即執行,執行邏輯為:
        // 1. 從隊列中獲取任務
        // 2. 如果獲取到任務,則執行它,執行完後,返回第1步
        // 3. 如果未獲取到任務,則簡短休息,繼續第1步
        Thread taskRunner = new Thread(() -> {
            Runnable task;
            while (true) {
                task = tasks.poll();
                if (task != null) {
                    task.run();
                    continue;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        });
        taskRunner.start();
    }
}

上述的單線程執行器實現中,執行任務的線程是永遠不會停止的,獲取到任務時,就執行它,沒有獲取到,就一直不斷的獲取。下麵是這個執行器的測試代碼:

public class SingleThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        SingleThreadPoolExecutorstp stp= new SingleThreadPoolExecutor();
        // 連續添加 5 個任務
        for (int i = 1; i <= 5; i++) {
            stp.execute(new SpeakNameTask("Coding Change The World " + i));
        }
        System.out.println("主線程已結束");
    }

    // 一個模擬的任務:簡單地輸出名稱
    static class SpeakNameTask implements Runnable {
        private String name;

        public SpeakNameTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            Random random = new Random();
            int milliseconds = 500 + random.nextInt(1000);
            try {
                TimeUnit.MILLISECONDS.sleep(milliseconds);
                System.out.println("["+Thread.currentThread().getName()+"]: I believe " + name);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

下麵是輸出結果:

主線程已結束
[Thread-0]: I believe Coding Change The World 1
[Thread-0]: I believe Coding Change The World 2
[Thread-0]: I believe Coding Change The World 3
[Thread-0]: I believe Coding Change The World 4
[Thread-0]: I believe Coding Change The World 5

可以看到:作為測試程式的主線程,已經先執行結束了,而線程池還在順序地執行主線程添加的任務。並且線程池在執行完所有任務後,並沒有退出,jvm 進程會一直存在。

改進為擁有多個線程的線程池

多線程版本的線程池執任務執行器,只是在單線程版本上,增加了執行線程的數量,其它的變化不是很大。但為了更好的組織代碼,需要將任務執行線程的邏輯單獨抽取出來。另外,為了模擬得更像一個池,本示例代碼還增加了以下特性

  • 支持核心線程數功能
    核心線程數在執行器創建時,一起創建,並永不結束

  • 支持最大線程數功能
    當核心線程執行任務效率變慢時,增加執行線程

  • 支持空閑線程移除功能
    當非核心線程空閑時長超過限定值時,結束該線程,並從池中移除

主要代碼如下:

MultiThreadPoolExecutor.java (點擊查看代碼)
public class MultiThreadPoolExecutor implements Executor {

    // 線程池
    private final Set<TaskRunner> runnerPool = new HashSet<>();

    // 任務隊列
    private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();

    // 單個線程最大空閑毫秒數
    private int maxIdleMilliSeconds = 3000;

    // 核心線程數
    private int coreThreadCount = 1;

    // 最大線程數
    private int maxThreadCount = 3;

    public MultiThreadPoolExecutor() {
        // 初始化核心線程
        for (int i = 0; i < coreThreadCount; i++) {
            addRunner(true);
        }
    }

    private void addRunner(boolean isCoreRunner) {
        TaskRunner runner = new TaskRunner(isCoreRunner);
        runnerPool.add(runner);
        runner.start();
    }

    @Override
    public void execute(Runnable task) {
        tasks.add(task);
        addRunnerIfRequired();
    }

    // 視情況增加線程數,這裡簡化為當任務數超過線程數的兩倍時,就增加線程
    private void addRunnerIfRequired() {
        if (tasks.size() <= 2 * runnerPool.size()) {
            return;
        }
        // 未達到最大線程數時,可增加執行線程
        if (runnerPool.size() < maxThreadCount) {
            synchronized (this) {
                if (runnerPool.size() < maxThreadCount) {
                    addRunner(false);
                }
            }
        }
    }

    class TaskRunner extends Thread {
        // 是否為核心線程
        private final boolean coreRunner;

        // 已空閑的毫秒數
        private long idleMilliseconds = 0;

        TaskRunner(boolean coreRunner) {
            this.coreRunner = coreRunner;
        }

        @Override
        public void run() {
            Runnable task;
            while (true) {
                task = tasks.poll();
                if (task != null) {
                    task.run();
                    continue;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                    idleMilliseconds += 10;
                    if(coreRunner) {
                        continue;
                    }
                    if (idleMilliseconds > maxIdleMilliSeconds) {
                        // 超過最大空間時間,線程結束,並從池中移徐本線程
                        runnerPool.remove(this);
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }
}

完整代碼已上傳至 thread-pool-sample

其實多線程版本的主要難點,是判定增加新線程來執行任務的演算法,即如何確定當前需要添加新線程,而不是保持當前的線程數量來執行任務,以保證最高的效率。以這個粗糙的原始版本為基準,不斷豐富細節和增強健壯性,就可以慢慢演進出 Jdk 中的 Executor 體系。

JDK 線程池任務執行器淺析

Executor 體系類結構

Executor 介面是任務執行器的頂級介面,它僅定義了一個方法,但並未限制如何執行傳遞過來的任務。正如第③處所述,「線程池執行」也只是多種方式中的一種,也是用得最多的一種。由於 Executor 介面定義的功能過於單一,於是在 JDK 的併發包下,又對它進行了擴展,這個擴展就是 ExecutorService,如下所示:

public interface ExecutorService extends Executor {
    Future<?> submit(Runnable task);
    <T> Future<T> submit(Callable<T> task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
            throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;

    void shutdown();
    List<Runnable> shutdownNow();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    boolean isShutdown();
    boolean isTerminated();
}

這些擴展方法共分為三組,分別是:任務提交類、狀態控制類、狀態檢查類。從分類上可以看出,ExecutorService 增加了「提交任務」的概念(相對於 Executor 的「執行任務」)。另外,還有「關閉」操作,以及檢測執行器當前的狀態,這些都是 Exector 不具備的。下麵這個分類列表更加清晰:

  • 任務提交

    方法 非同步提交 批量提交 超時等待
    submit(Runnable task)
    submit(Callable task)
    invokeAll(Collection<? extends Callable> tasks)
    invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)
    invokeAll(Collection<? extends Callable> tasks)
    invokeAny(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)
  • 狀態控制

    • shutdown()
    • shutdownNow()
    • awaitTermination(long timeout, TimeUnit unit)
  • 狀態檢查

    • isShutdown()
    • isTerminated()

除了增加了新的方法外,還新增加了一種任務類型,即:java.util.concurrent.Callable,而 Executor 介面定義的任務介面是 java.lang.Runnable。二者的區別是,Callable#call() 方法有返回值,而後者沒有。一般而言,任務提交給執行器後,通常都會非同步執行。提交任務的線程是拿不到這個 call() 方法執行完畢後的返回值的,既然這樣,那定義這個有返回值的方法還有什麼意義呢?

為了拿到返回值,引入了 java.util.concurrent.Future 介面,它定義了獲取單個非同步任務執行結果的方法,不僅如此,它還定義了其它一些訪問和控制單個任務的方法,見下表:

方法 解釋
get() 阻塞調用線程,直到所關聯的任務執行結束,拿到返回值,或任務執行結束(取消操作和發生異常均會導致結)
get(long timeout, TimeUnit unit) 同上,但會有一個最大等待時長,若超過該時長後,任務依然未執行結束,則結束等待,並拋出 TimeoutException
cancel(boolean mayInterruptIfRunning) 嘗試取消關聯的任務,只是嘗試,遇到以下情況,均無法取消
· 任務已經取消
· 任務已完成
· 其它原因

通常任務一旦開始執行,就無法取消,
除非是極其特定的任務,這類任務的代碼本身會與外界通信,判斷是否應該取消自己的執行。
因此本方法提供了一個 mayInterruptIfRunning 參數,用來做這種信息傳達,
但也僅僅是一個信息傳達,表達了期望已運行的任務能自我終止,
但能否真的終止,取決於任務本身的代碼邏輯
isCancelled() 檢測關聯的任務是否已「取消」
isDone() 檢測關聯的任務是否已「結束」,任務正常執行完畢、遭遇異常和被取消均視為任務已「結束」


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 這學期剛剛接觸面向對象程式設計,使用的是java語言。在此之前只接觸過c語言。以我目前的學習進程來看二者的差別更多體現在面向對象的其中一個基本特性上,即封裝性。在c語言中幾乎所有內容都是公開的,java可以有效得規避這點。 學習的知識點 1.知道了類間關係。面向對象程式設計中要根據實際情況合理 ...
  • 大家好,我是老貓。今天和大家分享一下程式員日常的繪圖思路,以及一些老貓日常使用的繪圖工具。 為什麼要畫圖? 我們在進行系統設計的時候,為了更加具象地呈現系統的輪廓以及各個組件或者系統之間的關係和邊界以及工作流程。我們就會畫邏輯架構圖,模塊圖、流程圖、時序圖等等。 在日常開發中,軟體設計圖是一種非常好 ...
  • 發佈訂閱模式是怎樣的? 現在市面上流行的很多消息中間件就是採用的該種模式,這種模式 在實際業務中 將 事件發佈者(Publisher) 與 事件訂閱者 (Subscriber)通過額外的事件通道(Event Channel)來解耦,其基本原理與先前提到的觀察者模式有些許類似,但發佈訂閱模式額外存在了 ...
  • 1 不具備記憶能力的 它是零狀態的,我們平常在使用一些大模型產品,尤其在使用他們的API的時候,我們會發現那你和它對話,尤其是多輪對話的時候,經過一些輪次後,這些記憶就消失了,因為它也記不住那麼多。 2 上下文視窗的限制 大模型對其input和output,也就是它的輸入輸出有數量限制。為了保護它的 ...
  • C++ 構造函數 構造函數是 C++ 中一種特殊的成員函數,當創建類對象時自動調用。它用於初始化對象的狀態,例如為屬性分配初始值。構造函數與類同名,且沒有返回值類型。 構造函數類型 C++ 支持多種類型的構造函數,用於滿足不同的初始化需求: 預設構造函數: 不帶參數的構造函數,通常用於初始化對象的默 ...
  • 單向順序鏈表的創建,增,刪,減,查 /******************************************************************* * * file name: 單向順序鏈表的創建,增,刪,減,查 * author : [email protected] ...
  • 作者:青石路 來源:https://www.cnblogs.com/youzhibing/p/18019399 MyBatis 替換成 MyBatis-Plus 背景介紹 一個老項目,資料庫用的是 MySQL 5.7.36 , ORM 框架用的 MyBatis 3.5.0 , mysql-conne ...
  • C++對象在經過類的封裝後,存取對象中的數據成員的效率是否相比C語言的結構體訪問效率要低下?本篇將從C++類的不同定義形式來一一分析C++對象的數據成員的訪問在編譯器中是如何實現的,以及它們的存取效率如何? ...
一周排行
    -Advertisement-
    Play Games
  • 概述:本文代碼示例演示瞭如何在WPF中使用LiveCharts庫創建動態條形圖。通過創建數據模型、ViewModel和在XAML中使用`CartesianChart`控制項,你可以輕鬆實現圖表的數據綁定和動態更新。我將通過清晰的步驟指南包括詳細的中文註釋,幫助你快速理解並應用這一功能。 先上效果: 在 ...
  • openGauss(GaussDB ) openGauss是一款全面友好開放,攜手伙伴共同打造的企業級開源關係型資料庫。openGauss採用木蘭寬鬆許可證v2發行,提供面向多核架構的極致性能、全鏈路的業務、數據安全、基於AI的調優和高效運維的能力。openGauss深度融合華為在資料庫領域多年的研 ...
  • openGauss(GaussDB ) openGauss是一款全面友好開放,攜手伙伴共同打造的企業級開源關係型資料庫。openGauss採用木蘭寬鬆許可證v2發行,提供面向多核架構的極致性能、全鏈路的業務、數據安全、基於AI的調優和高效運維的能力。openGauss深度融合華為在資料庫領域多年的研 ...
  • 概述:本示例演示了在WPF應用程式中實現多語言支持的詳細步驟。通過資源字典和數據綁定,以及使用語言管理器類,應用程式能夠在運行時動態切換語言。這種方法使得多語言支持更加靈活,便於維護,同時提供清晰的代碼結構。 在WPF中實現多語言的一種常見方法是使用資源字典和數據綁定。以下是一個詳細的步驟和示例源代 ...
  • 描述(做一個簡單的記錄): 事件(event)的本質是一個委托;(聲明一個事件: public event TestDelegate eventTest;) 委托(delegate)可以理解為一個符合某種簽名的方法類型;比如:TestDelegate委托的返回數據類型為string,參數為 int和 ...
  • 1、AOT適合場景 Aot適合工具類型的項目使用,優點禁止反編 ,第一次啟動快,業務型項目或者反射多的項目不適合用AOT AOT更新記錄: 實實在在經過實踐的AOT ORM 5.1.4.117 +支持AOT 5.1.4.123 +支持CodeFirst和非同步方法 5.1.4.129-preview1 ...
  • 總說周知,UWP 是運行在沙盒裡面的,所有許可權都有嚴格限制,和沙盒外交互也需要特殊的通道,所以從根本杜絕了 UWP 毒瘤的存在。但是實際上 UWP 只是一個應用模型,本身是沒有什麼許可權管理的,許可權管理全靠 App Container 沙盒控制,如果我們脫離了這個沙盒,UWP 就會放飛自我了。那麼有沒... ...
  • 目錄條款17:讓介面容易被正確使用,不易被誤用(Make interfaces easy to use correctly and hard to use incorrectly)限制類型和值規定能做和不能做的事提供行為一致的介面條款19:設計class猶如設計type(Treat class de ...
  • title: 從零開始:Django項目的創建與配置指南 date: 2024/5/2 18:29:33 updated: 2024/5/2 18:29:33 categories: 後端開發 tags: Django WebDev Python ORM Security Deployment Op ...
  • 1、BOM對象 BOM:Broswer object model,即瀏覽器提供我們開發者在javascript用於操作瀏覽器的對象。 1.1、window對象 視窗方法 // BOM Browser object model 瀏覽器對象模型 // js中最大的一個對象.整個瀏覽器視窗出現的所有東西都 ...