一個普通程式員眼中的AQS

来源:https://www.cnblogs.com/zhangweicheng/archive/2019/12/10/12000213.html
-Advertisement-
Play Games

AQS是JUC包中許多類的實現根基,這篇文章只是個人理解的產物,不免有誤,若閱讀過程中有發現不對的,希望幫忙指出[贊]! 1 AQS內臟圖 ​ 在開始瞭解 之前,我們先從上帝視角看看 是由幾個部分組成的。 ​ 內部維護了一個 修飾的 資源變數 ,裡面的所有操作都可以說跟這個變數有關係,因為它代表的就 ...


AQS是JUC包中許多類的實現根基,這篇文章只是個人理解的產物,不免有誤,若閱讀過程中有發現不對的,希望幫忙指出[贊]!

1 AQS內臟圖

​  在開始瞭解AQS之前,我們先從上帝視角看看AQS是由幾個部分組成的。

1575639520339

​  AQS內部維護了一個volatile修飾的資源變數,裡面的所有操作都可以說跟這個變數有關係,因為它代表的就是資源,這是一點;另外內部還有為公平爭奪資源而準備的同步隊列,說是同步隊列,實質上存放在AQS的也就head節點和tail節點;此外還有一個等待隊列,等待隊列是為了實現喚醒指定組線程爭奪資源而出現的,通過內部類ConditionObjectfirstWaiterlastWaiter實現,兩個隊列的概念圖如下。

1575905757547

​​  除去這些還有兩個內部類NodeConditionObjectNode是隊列的實現根基,裡面存放了許多重要的信息,如操作的線程、線程競爭的狀態等;而ConditionObject則是Condition介面的實現類,用來實現喚醒指定線程組的(等待隊列)。

state:資源變數,AQS重要組成成分,其內部的操作大多數都是對此資源的競爭。

headtail節點:這兩個Node節點其實就是AQS中的同步隊列,而NodeAQS的內部類,整個資源爭奪的過程就是Node同步隊列節點的調整和狀態變更的過程。

Node內部類:AQS兩個隊列的實現節點。

  • waitStatus :節點狀態,取值為-3~1(整數)。

​​  ​  0:初始狀態或者不代表任何意義時的取值。

​​  ​  -1SIGNAL狀態,個人理解是處於這個狀態的節點後方還有可用的節點,所以當其釋放資源 時要提醒後方節點參與競爭。

​​  ​  -2CONDITION狀態,這個狀態標識當前節點處於等待隊列中,等待隊列中的節點不會參與 競爭,必須從等待隊列出來後重新加入同步隊列才能參與競爭。

​  ​  -3PROPAGATE,表示處於共用模式,此時不僅只是喚醒下個節點,還可能喚醒下下個節 點。

​​  ​  1CANCELLED,廢棄節點,表示當前節點沒用了,處於該狀態的節點不會再改變,所以 AQS中經常會判斷節點狀態是否大於0來檢查節點是否還有用。

  • thread:爭奪資源的線程,存放在節點當中。
  • prev:同步隊列中的上一個節點。
  • next:同步隊列的下一個節點。
  • nextWaiter:下一個等待節點。AQS中的等待隊列,可以有多個等待隊列。

ConditionObjectAQS內部類,實現Condition介面,定義了兩個變數firstWaiterlastWaiter,這 兩個變數 組成等待隊列。可以簡單的理解為Condition介面的功能是​能讓一定數量的線程一起等待某個條件,這個條件就是condition,當condition喚醒的時候,那麼這些等待的線程就會被其喚醒,反之線程則一直等待其喚醒條件。而在AQS​中,ConditionObject可以維護多個等待隊列,當同步隊列中的節點使用了await()方法則將其移除同步隊列放入相應的等待隊列,在等待隊列中使用signal方法則從等待隊列​中移除放入同步隊列隊尾。

2 AQS的開放方法

​  現在我們對AQS的組成有了大概的瞭解,接下來看看其內部資源的競爭、獲取和釋放的實現。AQS採用模板設計模式實現,其定義了許多頂級方法例如acquirerelease等,這些方法子類不能重寫但是可以調用,而如果想讓其正確的調用則需要根據其規則實現開放出來的介面如tryAcquire等(頂級方法內部調用了開放方法)。

​  其開放的方法有tryAcquiretryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively共五種,每個方法裡面沒有具體的實現,反而是直接拋出了異常,如下,所以子類需要重寫用到的方法。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

​  這些方法表示嘗試去獲取資源或者釋放資源,其實現必須要跟state資源狀態相關,舉個例子,tryAcquire方法表示以獨占的方式嘗試獲取資源,如果獲取到了那麼其他線程不得操作其資源,其中入參的arg則表示想要獲取到的資源數量,例如我tryAcquire(5)成功了,那麼狀態變數state變數則增加5,如果tryRelease(5)成功則state狀態變數減少5,等到state==0的時候則表示資源被釋放,即可以理解為鎖被釋放。

​  如果只是使用AQS的話,再加上幾個變更狀態的方法就可以了,我們不需要瞭解更多的東西,如同AQS的文檔給出的案例一般,簡單的重寫幾個方法便可以實現一種鎖,如下,一個不可重入鎖的簡單實現。

class Mutex implements Lock, java.io.Serializable {

   // 同步內部類,鎖的真正操作都是通過該類的操作 
   private static class Sync extends AbstractQueuedSynchronizer {
     // 檢查當前是否已經處於鎖定的狀態
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }

     // 如果資源變數為0,則獲取鎖(資源)
     public boolean tryAcquire(int acquires) {
       // acquires的值只能是1,否則的話不進入下麵代碼
       assert acquires == 1;
       if (compareAndSetState(0, 1)) {
         // 設置持有當前鎖的線程
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // 通過將狀態變數state設定為0來表示鎖的釋放
     protected boolean tryRelease(int releases) {
       // 傳入的參數只能是1,否則是無效操作
       assert releases == 1; 
       // 如果狀態狀態等於0,說明不是鎖定狀態
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // 提供Condition,返回其AQS內部類ConditionObject
     Condition newCondition() { return new ConditionObject(); }

     // 反序列化
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // 內部類已經實現了所有需要的方法,我們只要封裝一層就行
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

進行一個小測試

public static void main(String[] args) {
    Lock lock = new Mutex();
    new Thread(() -> {
        lock.lock();
        try {
            System.err.println("獲得鎖線程名:" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + "釋放鎖");
        }
    }).start();

    new Thread(() -> {
        lock.lock();
        try {
            System.err.println("獲得鎖線程名:" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + "釋放鎖");
        }
    }).start();
}

最終的結果圖如下

1575643000409

​  這樣就實現了一個不可重入鎖,是不是看起來很簡單?那肯定啊,難的都被AQS團隊的大佬們封裝完了。

AQS的那些頂級方法

首先來看acquire方法:

// 代碼邏輯不複雜,首先嘗試獲取資源,如果失敗了則將封裝節點放入同步隊列中直到獲取到資源
public final void acquire(int arg) {
    // 嘗試獲得鎖,如果失敗了則增加節點放入等待隊列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

​  其中addWaiter將當前線程封裝成一個節點放入等待隊列中,而acquireQueued方法則是在一個迴圈中嘗試獲取資源,如果獲取資源的過程中被線程被打斷不會進行任何形式的相應,只是記錄一下當前節點被打斷過,在獲取到資源後再把被打斷的邏輯補上。

​  我們看看addWaiter做了什麼。

private Node addWaiter(Node mode) {
    // 將當前線程封裝入一個節點之中
    Node node = new Node(Thread.currentThread(), mode);
    
    // 首先嘗試一次快速的入隊,如果失敗的話則採用正常方式入隊
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 入隊操作
    enq(node);
    return node;
}

再看下入隊操作的實現

private Node enq(final Node node) {
    // 迴圈直到將節點放入同步隊列中
    for (;;) {
        Node t = tail;
        // 如果同步隊列是空的話則進行隊列的初始化
        if (t == null) { 
            // 這裡註意初始化的時候head是一個新增的Node,其waitStatus為0
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 否則的話嘗試設置尾節點,失敗的話重新迴圈
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

​  到這裡我們可以知道addWaiter方法首先將當前線程封裝為節點,然後嘗試快速的將節點放入隊列尾,如果失敗的話則進行正常的入隊操作,而入隊操作的則是不斷的迴圈將當前節點設置為尾節點,其中如果一開始隊列為空的話則進行隊列的初始化。

​  再回到acquire方法中這一段代碼中

if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

​​  節點入隊之後還有一個acquireQueued操作,這個方法就是線程不斷自旋的去獲取資源的過程,在一定嘗試後進入阻塞。我們進入此方法

final boolean acquireQueued(final Node node, int arg) {
    // 預設獲取失敗
    boolean failed = true;
    try {
        /*
         * 線程打斷標識,我們知道使用interrupt()方法只是改變了線程中的打斷標識變數,
         * 並不能打斷正在運行的線程,而對於這個打斷變數的處理一般有兩種方式,
         * 一種是記錄下來,一種是拋出異常,這裡選擇前者,而可打斷的acquire則是選擇後者
         */
        boolean interrupted = false;
        // 這裡就是自旋的過程了
        for (;;) {
            // 拿到前一個節點
            final Node p = node.predecessor();
            // 如果前節點為頭節點則嘗試一次獲取
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 沒有拿到資源,根據前節點決定線程是否進入阻塞狀態,兩個方法解釋在下方
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如上方所說,記錄打斷標識
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 檢查是否需要阻塞線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前節點的狀態
        int ws = pred.waitStatus;
        // 如果前節點狀態為SIGNAL,我對這個狀態的理解是其後續還有處於同步隊列中的節點
        if (ws == Node.SIGNAL)
            // 表示下方代碼已經執行過一次了,所以直接返回
            return true;
        if (ws > 0) {
            // 狀態大於0,則表示節點已經取消作廢,那麼需要一直往前找直到找到有效的節點
            // 在AQS中經常使用狀態>0來表示無效,<0表示有效
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 剩下的狀態只能是0或者PROPAGATE(CONDITION(-2)狀態不會出現在同步隊列中)
             * 這裡並沒有返回true,說明還要進行一次迴圈
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

// 在上方方法判斷需要掛起線程之後,調用parkAndCheckInterrupt方法將線程掛起
private final boolean parkAndCheckInterrupt() {
        // 使用park方法將線程掛起
        LockSupport.park(this);
        // 在上面我們提到線程的打斷標識,interrupted()方法返回後會重置這個標識
        return Thread.interrupted();
}

​  獨占式獲取資源的主要方法差不多就是這樣,還有可打斷的獨占式獲取方法acquireInterruptibly,代碼如下,其實現基本相同,只是對於我們方纔說的打斷標識的處理從記錄改成了拋出異常,所以才是可打斷的,有興趣可以自己再看下,基本邏輯相同,看起來也就耗費不了多少時間。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        // 拋出異常處理
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

​  瞭解完獲取資源自然知道釋放資源的過程,相對來說釋放資源要相對容易一些,大致邏輯為嘗試釋放資源,如果成功了,則改變節點的狀態並且喚醒下一個可用節點(一般是下一個,但是可能出現下一個節點已經被取消的情況)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 修改線程的狀態,並且喚醒下一個節點進行資源競爭
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
    // 改變節點狀態
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 喚醒下一個可用節點,一般來說是下一個節點,但是可能出現下個節點被取消
     * 或者為空的情況,這個時候就要從尾結點向前遍歷直到找到有效的節點(從尾節點向前遍歷
     * 是因為無論下個節點是空還是取消的節點,正向遍歷都不可能走得通了,取消的節點的next
     * 就是其本身,所以只能從後面開始往前遍歷)
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到下個節點之後將其喚醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

​  到這裡AQS獨占式的獲取釋放資源的過程大致就結束了,雖然只是簡單的將其獲取和釋放過程過了一遍,但是知道這些腦子裡對AQS也有了大概的框架模型,在這個模型中繼續去理解其他方法相信也不會太難,總之先記錄到這裡。

總結

​  首先從一開始我們用一張圖瞭解了AQS的大致構造,接下來又瞭解了組成成分的作用,其中主要圍繞兩個隊列(同步隊列等待隊列)和兩個同步類(NodeConditionObject)說明瞭其實現。

​  接著進入資源獲取和釋放,用獨占式的acquirerelease方法來說明整個過程,acquire方法包含了嘗試獲取資源、入隊和休眠等操作,而release方法相對簡易的改變了狀態,並且喚醒後方節點,從而分別表示鎖的獲取和釋放,到這裡就算過了一遍AQS的獨占式流程。

I wish I could write better.


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 今天在使用go與php的AES加解密交互中,一直有個問題那就是在go中加密後,在php端始終都是無法解密,經過排查最後發現是加密key長度引起的問題, 這裡簡單記錄下。 go的AES使用的是第三方的庫, "openssl" ,因為用的匆忙,沒註意看文檔,所以就直接弄了示例代碼,才發現和php端無法解 ...
  • 此模式通過一個模板方法來定義程式的框架或演算法,通常模板方法定義在基類中,即原始的模板,然後子類就可以根據不同的需要實現或重寫模板方法中的某些演算法步驟或者框架的某部分,最後達到使用相同模板實現不同功能的效果。 核心思想: 使用一個模板方法定義好總的演算法框架。 子類中根據需要重新定義某些操作,但是不能修 ...
  • 有個需求,從某個介面下載的一個zip壓縮包,往裡面添加一個說明文件。搜索了一下,沒有找到往zip直接添加文件的方法,最終解決方法是先解壓、再壓縮。具體過程如下: ...
  • 屬性 語法格式:修飾符 類型 屬性名 = 初值; 說明: 修飾符:public、protected、private:用於表示成員變數的訪問許可權。static:表示該成員變數為類變數,也稱為靜態變數。final:表示將該成員變數聲明為常量,其值無法更改。 類型:表示變數的類型。 屬性名:表示變數名稱。 ...
  • Java類的初始化順序 多說無益,以下是本人親自試驗的代碼,一目瞭然: 1 package test1; 2 3 public class Test { 4 public static void main(String[] argc) { 5 new Child(); 6 System.out.pr ...
  • composer install thinkphp6 報錯 Parse error: syntax error, unexpected ':', expecting '{' in vendor\topthink\think-helper\src\helper.php on line 233 ...
  • 前言本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。作者:萬能搜吧 都是copy的百度SDK文檔,簡單說說怎麼用。 1、沒安裝Python的參見此文:Python學習筆記系列 1 ——安裝調試Python開發軟體 2、win+r輸 ...
  • import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext} object Transformation { def main(args: Array[String]): U ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...