帶你熟悉3種AQS的線程併發工具的用法

来源:https://www.cnblogs.com/huaweiyun/archive/2023/01/31/17079017.html
-Advertisement-
Play Games

摘要:AQS 的全稱為(AbstractQueuedSynchronizer),AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器。 本文分享自華為雲社區《【高併發】AQS中的CountDownLatch、Semaphore與CyclicBarrier核 ...


摘要:AQS 的全稱為(AbstractQueuedSynchronizer),AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器。

本文分享自華為雲社區《【高併發】AQS中的CountDownLatch、Semaphore與CyclicBarrier核心用法》,作者: 冰 河。

AQS 的全稱為(AbstractQueuedSynchronizer),AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器。本文主要講述AQS中的CountDownLatch、Semaphore與CyclicBarrier核心用法。

CountDownLatch

概述

同步輔助類,通過它可以阻塞當前線程。也就是說,能夠實現一個線程或者多個線程一直等待,直到其他線程執行的操作完成。使用一個給定的計數器進行初始化,該計數器的操作是原子操作,即同時只能有一個線程操作該計數器。

調用該類await()方法的線程會一直阻塞,直到其他線程調用該類的countDown()方法,使當前計數器的值變為0為止。每次調用該類的countDown()方法,當前計數器的值就會減1。當計數器的值減為0的時候,所有因調用await()方法而處於等待狀態的線程就會繼續往下執行。這種操作只能出現一次,因為該類中的計數器不能被重置。如果需要一個可以重置計數次數的版本,可以考慮使用CyclicBarrier類。

CountDownLatch支持給定時間的等待,超過一定的時間不再等待,使用時只需要在await()方法中傳入需要等待的時間即可。此時,await()方法的方法簽名如下:

public boolean await(long timeout, TimeUnit unit)

使用場景

在某些業務場景中,程式執行需要等待某個條件完成後才能繼續執行後續的操作。典型的應用為並行計算:當某個處理的運算量很大時,可以將該運算任務拆分成多個子任務,等待所有的子任務都完成之後,父任務再拿到所有子任務的運算結果進行彙總。

代碼示例

調用ExecutorService類的shutdown()方法,並不會第一時間內把所有線程全部都銷毀掉,而是讓當前已有的線程全部執行完,之後,再把線程池銷毀掉。

示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample {
 private static final int threadCount = 200;
 public static void main(String[] args) throws InterruptedException {
 ExecutorService exec = Executors.newCachedThreadPool();
 final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 for (int i = 0; i < threadCount; i++){
 final int threadNum = i;
 exec.execute(() -> {
 try {
 test(threadNum);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }finally {
 countDownLatch.countDown();
 }
 });
 }
 countDownLatch.await();
        log.info("finish");
 exec.shutdown();
 }
 private static void test(int threadNum) throws InterruptedException {
 Thread.sleep(100);
 log.info("{}", threadNum);
 Thread.sleep(100);
 }
}

支持給定時間等待的示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchExample {
 private static final int threadCount = 200;
 public static void main(String[] args) throws InterruptedException {
 ExecutorService exec = Executors.newCachedThreadPool();
 final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 for (int i = 0; i < threadCount; i++){
 final int threadNum = i;
 exec.execute(() -> {
 try {
 test(threadNum);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }finally {
 countDownLatch.countDown();
 }
 });
 }
 countDownLatch.await(10, TimeUnit.MICROSECONDS);
        log.info("finish");
 exec.shutdown();
 }
 private static void test(int threadNum) throws InterruptedException {
 Thread.sleep(100);
 log.info("{}", threadNum);
 }
}

Semaphore

概述

控制同一時間併發線程的數目。能夠完成對於信號量的控制,可以控制某個資源可被同時訪問的個數。

提供了兩個核心方法——acquire()方法和release()方法。acquire()方法表示獲取一個許可,如果沒有則等待,release()方法則是在操作完成後釋放對應的許可。Semaphore維護了當前訪問的個數,通過提供同步機制來控制同時訪問的個數。Semaphore可以實現有限大小的鏈表。

使用場景

Semaphore常用於僅能提供有限訪問的資源,比如:資料庫連接數。

代碼示例

每次獲取並釋放一個許可,示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
 private static final int threadCount = 200;
 public static void main(String[] args) throws InterruptedException {
 ExecutorService exec = Executors.newCachedThreadPool();
 final Semaphore semaphore = new Semaphore(3);
 for (int i = 0; i < threadCount; i++){
 final int threadNum = i;
 exec.execute(() -> {
 try {
 semaphore.acquire(); //獲取一個許可
 test(threadNum);
 semaphore.release(); //釋放一個許可
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 });
 }
 exec.shutdown();
 }
 private static void test(int threadNum) throws InterruptedException {
 log.info("{}", threadNum);
 Thread.sleep(1000);
 }
}

每次獲取並釋放多個許可,示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
 private static final int threadCount = 200;
 public static void main(String[] args) throws InterruptedException {
 ExecutorService exec = Executors.newCachedThreadPool();
 final Semaphore semaphore = new Semaphore(3);
 for (int i = 0; i < threadCount; i++){
 final int threadNum = i;
 exec.execute(() -> {
 try {
 semaphore.acquire(3); //獲取多個許可
 test(threadNum);
 semaphore.release(3); //釋放多個許可
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 });
 }
        log.info("finish");
 exec.shutdown();
 }
 private static void test(int threadNum) throws InterruptedException {
 log.info("{}", threadNum);
 Thread.sleep(1000);
 }
}

假設有這樣一個場景,併發太高了,即使使用Semaphore進行控制,處理起來也比較棘手。假設系統當前允許的最高併發數是3,超過3後就需要丟棄,使用Semaphore也能實現這樣的場景,示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
 private static final int threadCount = 200;
 public static void main(String[] args) throws InterruptedException {
 ExecutorService exec = Executors.newCachedThreadPool();
 final Semaphore semaphore = new Semaphore(3);
 for (int i = 0; i < threadCount; i++){
 final int threadNum = i;
 exec.execute(() -> {
 try {
 //嘗試獲取一個許可,也可以嘗試獲取多個許可,
 //支持嘗試獲取許可超時設置,超時後不再等待後續線程的執行
 //具體可以參見Semaphore的源碼
 if (semaphore.tryAcquire()) { 
 test(threadNum);
 semaphore.release(); //釋放一個許可
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 });
 }
        log.info("finish");
 exec.shutdown();
 }
 private static void test(int threadNum) throws InterruptedException {
 log.info("{}", threadNum);
 Thread.sleep(1000);
 }
}

CyclicBarrier

概述

是一個同步輔助類,允許一組線程相互等待,直到到達某個公共的屏障點,通過它可以完成多個線程之間相互等待,只有當每個線程都準備就緒後,才能各自繼續往下執行後面的操作。

與CountDownLatch有相似的地方,都是使用計數器實現,當某個線程調用了CyclicBarrier的await()方法後,該線程就進入了等待狀態,而且計數器執行加1操作,當計數器的值達到了設置的初始值,調用await()方法進入等待狀態的線程會被喚醒,繼續執行各自後續的操作。CyclicBarrier在釋放等待線程後可以重用,所以,CyclicBarrier又被稱為迴圈屏障。

使用場景

可以用於多線程計算數據,最後合併計算結果的場景

CyclicBarrier與CountDownLatch的區別

  • CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法進行重置,並且可以迴圈使用
  • CountDownLatch主要實現1個或n個線程需要等待其他線程完成某項操作之後,才能繼續往下執行,描述的是1個或n個線程等待其他線程的關係。而CyclicBarrier主要實現了多個線程之間相互等待,直到所有的線程都滿足了條件之後,才能繼續執行後續的操作,描述的是各個線程內部相互等待的關係。
  • CyclicBarrier能夠處理更複雜的場景,如果計算發生錯誤,可以重置計數器讓線程重新執行一次。
  • CyclicBarrier中提供了很多有用的方法,比如:可以通過getNumberWaiting()方法獲取阻塞的線程數量,通過isBroken()方法判斷阻塞的線程是否被中斷。

代碼示例

示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample {
 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
 public static void main(String[] args) throws Exception {
 ExecutorService executorService = Executors.newCachedThreadPool();
 for (int i = 0; i < 10; i++){
 final int threadNum = i;
 Thread.sleep(1000);
 executorService.execute(() -> {
 try {
 race(threadNum);
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 }
executorService.shutdown();
 }
 private static void race(int threadNum) throws Exception{
 Thread.sleep(1000);
 log.info("{} is ready", threadNum);
 cyclicBarrier.await();
 log.info("{} continue", threadNum);
 }
}

設置等待超時示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CyclicBarrierExample {
 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
 public static void main(String[] args) throws Exception {
 ExecutorService executorService = Executors.newCachedThreadPool();
 for (int i = 0; i < 10; i++){
 final int threadNum = i;
 Thread.sleep(1000);
 executorService.execute(() -> {
 try {
 race(threadNum);
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 }
 executorService.shutdown();
 }
 private static void race(int threadNum) throws Exception{
 Thread.sleep(1000);
 log.info("{} is ready", threadNum);
 try{
 cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
 }catch (BrokenBarrierException | TimeoutException e){
 log.warn("BarrierException", e);
 }
 log.info("{} continue", threadNum);
 }
}

在聲明CyclicBarrier的時候,還可以指定一個Runnable,當線程達到屏障的時候,可以優先執行Runnable中的方法。

示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample {
 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
 log.info("callback is running");
 });
 public static void main(String[] args) throws Exception {
 ExecutorService executorService = Executors.newCachedThreadPool();
 for (int i = 0; i < 10; i++){
 final int threadNum = i;
 Thread.sleep(1000);
 executorService.execute(() -> {
 try {
 race(threadNum);
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 }
 executorService.shutdown();
 }
 private static void race(int threadNum) throws Exception{
 Thread.sleep(1000);
 log.info("{} is ready", threadNum);
 cyclicBarrier.await();
 log.info("{} continue", threadNum);
 }
}

 

點擊關註,第一時間瞭解華為雲新鮮技術~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 程式設計基礎 基礎知識 什麼是程式? 為進行某項活動的步驟,電腦的程式,為得到某種結果,通過電腦語言表達的指令序列。 什麼是程式設計? 計算思維,是運用電腦科學的基礎概念進行問題求解,系統設計,以及人類行為理解等涵蓋電腦科學之廣度的一系列思維活動。 計算思維的特點: 1.滿足電腦程式執行的 ...
  • Gin框架實戰——HTML渲染 最近使用Go的Gin框架做了個簡單的前端網頁,記錄一下細節~ 1.載入靜態文件 由於網頁需要使用css、圖片等渲染,而靜態文件必須先聲明:否則模板中調用載入不出來,這個很重要,即使你把文件放到對應路徑下,html中也寫了相應的路徑,但是開啟go服務端的網頁,會顯示不出 ...
  • 1、shutil高級文件操作模塊 shutil模塊提供了大量的文件的高級操作。特別針對文件拷貝和刪除,主要功能為目錄和文件操作以及壓縮操作。對單個文件的操作也可參見os模塊。 2、shutil模塊的拷貝方法 >>> import shutil >>> shutil.chown('test.txt', ...
  • 一、DDS工作原理 以正弦信號為例,DDS大概就是將M個點的一個周期的正弦序列存入ROM中,序列數據的地址就是正弦信號的相位; 通過修改頻率控制字(Fword)來改變每隔多少個地址取ROM里的數據進行輸出。頻率控制字越大,從ROM取出的數據點就越少,點數越少,輸出一個周期信號的時間就越短,從而改變了 ...
  • 在做SpringBoot項目的過程中,有時客戶會提出按照指定時間執行一次業務的需求。 在單一使用ScheduledTaskRegistrar類解決定時任務問題的時候,可能會達不到預期的動態調整定時任務的效果。 ...
  • 概要 前端時間做尺規作圖相關的動畫的時候,封裝了一個圓規的動畫,順便研究了下 manim 庫的動畫函數。 manim 本身就是做動畫的庫,所以,基於它封裝自定義的動畫非常方便。 動畫原理 對於單個的元素,manim本身就提供了非常多的動畫函數。 比如:創建/消除的動畫,移動元素的動畫,旋轉元素的動畫 ...
  • 本文記錄一次線上 GC 問題的排查過程與思路,希望對各位讀者有所幫助。過程中也走了一些彎路,現在有時間沉澱下來思考並總結出來分享給大家,希望對大家今後排查線上 GC 問題有幫助。 ...
  • 本文已收錄至Github,推薦閱讀 👉 Java隨想錄 微信公眾號:Java隨想錄 CSDN: 碼農BookSea 人的一切痛苦,本質上都是對自己的無能的憤怒。——王小波 ZGC有人稱它為Zero GC,其實“Z”並非什麼專業名詞的縮寫,這款收集器的名字就叫作Z Garbage Collector ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...