Java 多線程共用模型之管程(下)

来源:https://www.cnblogs.com/lcha-coding/archive/2022/06/11/16365735.html
-Advertisement-
Play Games

介紹了 wait notify notifyAll park unpark ReentrantLock等相關知識 ...


共用模型之管程

wait、notify

wait、notify 原理

  • Owner 線程發現條件不滿足,調用 wait 方法,即可進入 WaitSet 變為 WAITING 狀態
  • BLOCKED 和 WAITING 的線程都處於阻塞狀態,不占用 CPU 時間片
  • BLOCKED 線程會在 Owner 線程釋放鎖時喚醒
  • WAITING 線程會在 Owner 線程調用 notify 或 notifyAll 時喚醒,但喚醒後並不意味者立刻獲得鎖,仍需進入EntryList 重新競爭

API 介紹

  • obj.wait() 讓進入 object 監視器的線程到 waitSet 等待
  • obj.notify() 在 object 上正在 waitSet 等待的線程中挑一個喚醒
  • obj.notifyAll() 讓 object 上正在 waitSet 等待的線程全部喚醒

它們都是線程之間進行協作的手段,都屬於 Object 對象的方法。必須獲得此對象的鎖,才能調用這幾個方法

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo2")
public class demo2 {

    static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (lock){
                log.debug("執行");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其他代碼");
            }
        },"t1").start();

        new Thread(() -> {
            synchronized (lock){
                log.debug("執行");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其他代碼");
            }
        },"t2").start();

        Thread.sleep(2000);
        log.debug("喚醒 lock 上其他線程");
        synchronized (lock){
            lock.notify();  //喚醒 lock 上的一個線程(隨機)
            //lock.notifyAll();   //喚醒 lock 上的所有線程
        }
    }
}
  • notify()

    20:20:58 [t1] c.demo2 - 執行
    20:20:58 [t2] c.demo2 - 執行
    20:21:00 [main] c.demo2 - 喚醒 lock 上其他線程
    20:21:00 [t1] c.demo2 - 其他代碼
    
  • notifyAll()

    20:22:04 [t1] c.demo2 - 執行
    20:22:04 [t2] c.demo2 - 執行
    20:22:06 [main] c.demo2 - 喚醒 lock 上其他線程
    20:22:06 [t2] c.demo2 - 其他代碼
    20:22:06 [t1] c.demo2 - 其他代碼
    

wait() 方法會釋放對象的鎖,進入 WaitSet 等待區,從而讓其他線程就機會獲取對象的鎖。無限制等待,直到notify 為止

wait(long n) 有時限的等待, 到 n 毫秒後結束等待,或是被 notify

wait、notify 正確使用

sleep vs. wait

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要強制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  • sleep 在睡眠的同時,不會釋放對象鎖的,但 wait 在等待的時候會釋放對象鎖
  • 它們狀態 TIMED_WAITING
step 1

思考下麵的解決方案好不好,為什麼?

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeOut = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (room){
                log.debug("有煙沒?[{}]",hasCigarette);
                if(!hasCigarette){
                    log.debug("沒煙,睡會!");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有煙沒?[{}]",hasCigarette);
                if(hasCigarette){
                    log.debug("開始幹活!");
                }
            }
        },"小南").start();

        for(int i=0;i<5;i++){
            new Thread(() -> {
                synchronized (room){
                    log.debug("開始幹活!");
                }
            },"其他人").start();
        }

        Thread.sleep(1000);
        new Thread(() -> {
            hasCigarette = true;
            log.debug("煙到了!");
        },"送煙的").start();
    }
}

輸出:

20:41:09 [小南] c.demo4 - 有煙沒?[false]
20:41:09 [小南] c.demo4 - 沒煙,睡會!
20:41:10 [送煙的] c.demo4 - 煙到了!
20:41:11 [小南] c.demo4 - 有煙沒?[true]
20:41:11 [小南] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
20:41:11 [其他人] c.demo4 - 開始幹活!
  • 其它幹活的線程,都要一直阻塞,效率太低
  • 小南線程必須睡足 2s 後才能醒來,就算煙提前送到,也無法立刻醒來
  • 加了 synchronized (room) 後,就好比小南在裡面反鎖了門睡覺,煙根本沒法送進門,main 沒加synchronized 就好像 main 線程是翻窗戶進來的
  • 解決方法,使用 wait - notify 機制
step 2

思考下麵的實現行嗎,為什麼?

package WaNo.step;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class step2 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeOut = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (room){
                log.debug("有煙沒?[{}]",hasCigarette);
                if(!hasCigarette){
                    log.debug("沒煙,睡會!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有煙沒?[{}]",hasCigarette);
                if(hasCigarette){
                    log.debug("開始幹活!");
                }
            }
        },"小南").start();

        for(int i=0;i<5;i++){
            new Thread(() -> {
                synchronized (room){
                    log.debug("開始幹活!");
                }
            },"其他人").start();
        }

        Thread.sleep(1000);
        new Thread(() -> {
            synchronized (room){
                hasCigarette = true;
                log.debug("煙到了!");
                room.notify();
            }
        },"送煙的").start();
    }
}

輸出:

20:46:32 [小南] c.demo4 - 有煙沒?[false]
20:46:32 [小南] c.demo4 - 沒煙,睡會!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:32 [其他人] c.demo4 - 開始幹活!
20:46:33 [送煙的] c.demo4 - 煙到了!
20:46:33 [小南] c.demo4 - 有煙沒?[true]
20:46:33 [小南] c.demo4 - 開始幹活!
  • 解決了其它幹活的線程阻塞的問題
  • 但如果有其它線程也在等待條件呢?
step 3
package WaNo.step;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class step3 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeOut = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (room){
                log.debug("有煙沒?[{}]",hasCigarette);
                if(!hasCigarette){
                    log.debug("沒煙,睡會!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有煙沒?[{}]",hasCigarette);
                if(hasCigarette){
                    log.debug("開始幹活!");
                } else {
                    log.debug("沒乾成活...");
                }
            }
        },"小南").start();

        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("外賣送到沒?[{}]", hasTakeOut);
                if (!hasTakeOut) {
                    log.debug("沒外賣,先歇會!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("外賣送到沒?[{}]", hasTakeOut);
                if (hasTakeOut) {
                    log.debug("可以開始幹活了");
                } else {
                    log.debug("沒乾成活...");
                }
            }
        }, "小女").start();

        for(int i=0;i<5;i++){
            new Thread(() -> {
                synchronized (room){
                    log.debug("開始幹活!");
                }
            },"其他人").start();
        }

        Thread.sleep(1000);
        new Thread(() -> {
            synchronized (room){
                hasCigarette = true;
                log.debug("煙到了!");
                room.notify();
            }
        },"送煙的").start();
    }
}

輸出:

20:53:12.173 [小南] c.TestCorrectPosture - 有煙沒?[false] 
20:53:12.176 [小南] c.TestCorrectPosture - 沒煙,先歇會!
20:53:12.176 [小女] c.TestCorrectPosture - 外賣送到沒?[false] 
20:53:12.176 [小女] c.TestCorrectPosture - 沒外賣,先歇會!
20:53:13.174 [送外賣的] c.TestCorrectPosture - 外賣到了噢!
20:53:13.174 [小南] c.TestCorrectPosture - 有煙沒?[false] 
20:53:13.174 [小南] c.TestCorrectPosture - 沒乾成活...

notify 只能隨機喚醒一個 WaitSet 中的線程,這時如果有其它線程也在等待,那麼就可能喚醒不了正確的線程,稱之為【虛假喚醒】

解決方法,改為 notifyAll

step 4
new Thread(() -> {
 	synchronized (room) {
 		hasTakeout = true;
 		log.debug("外賣到了噢!");
 		room.notifyAll();
 	}
}, "送外賣的").start();

輸出:

20:55:23.978 [小南] c.TestCorrectPosture - 有煙沒?[false] 
20:55:23.982 [小南] c.TestCorrectPosture - 沒煙,先歇會!
20:55:23.982 [小女] c.TestCorrectPosture - 外賣送到沒?[false] 
20:55:23.982 [小女] c.TestCorrectPosture - 沒外賣,先歇會!
20:55:24.979 [送外賣的] c.TestCorrectPosture - 外賣到了噢!
20:55:24.979 [小女] c.TestCorrectPosture - 外賣送到沒?[true] 
20:55:24.980 [小女] c.TestCorrectPosture - 可以開始幹活了
20:55:24.980 [小南] c.TestCorrectPosture - 有煙沒?[false] 
20:55:24.980 [小南] c.TestCorrectPosture - 沒乾成活...

用 notifyAll 僅解決某個線程的喚醒問題,但使用 if + wait 判斷僅有一次機會,一旦條件不成立,就沒有重新判斷的機會了

解決方法,用 while + wait,當條件不成立,再次 wait

step 5

將 if 改為 while

while (!hasCigarette) {
 	log.debug("沒煙,先歇會!");
 	try {
 		room.wait();
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
}

輸出:

20:58:34.322 [小南] c.TestCorrectPosture - 有煙沒?[false] 
20:58:34.326 [小南] c.TestCorrectPosture - 沒煙,先歇會!
20:58:34.326 [小女] c.TestCorrectPosture - 外賣送到沒?[false] 
20:58:34.326 [小女] c.TestCorrectPosture - 沒外賣,先歇會!
20:58:35.323 [送外賣的] c.TestCorrectPosture - 外賣到了噢!
20:58:35.324 [小女] c.TestCorrectPosture - 外賣送到沒?[true] 
20:58:35.324 [小女] c.TestCorrectPosture - 可以開始幹活了
20:58:35.324 [小南] c.TestCorrectPosture - 沒煙,先歇會!
套路總結
synchronized(lock) {
 	while(條件不成立) {
 		lock.wait();
 	}
 	// 幹活
}

//另一個線程
synchronized(lock) {
 	lock.notifyAll();
}

同步模式之保護性暫停

定義

即 Guarded Suspension,用在一個線程等待另一個線程的執行結果

要點

  • 有一個結果需要從一個線程傳遞到另一個線程,讓他們關聯同一個 GuardedObject
  • 如果有結果不斷從一個線程到另一個線程那麼可以使用消息隊列(見生產者/消費者)
  • JDK 中,join 的實現、Future 的實現,採用的就是此模式
  • 因為要等待另一方的結果,因此歸類到同步模式

實現
package WaNo.step;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

@Slf4j(topic = "c.demo4")
public class demo4 {
    public static void main(String[] args) {
        //線程1 等待線程2 的下載結果
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            List<String> list = (List<String>) guardedObject.get();
            log.debug("結果的大小是:{}",list.size());
        },"t1").start();

        new Thread(() -> {
            log.debug("執行下載");
            try {
                Thread.sleep(5000);
                List<String> list = new ArrayList<>();
                list.add("1");
                guardedObject.complete(list);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}

class GuardedObject {
    //結果
    private Object response;

    //獲取結果
    public Object get() {
        synchronized (this){
            //還沒有結果
            while (response == null){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    //產生結果
    public void complete(Object response){
        synchronized (this){
            //給結果成員變數賦值
            this.response = response;
            this.notifyAll();
        }
    }
}

輸出:

16:47:15 [t2] c.demo4 - 執行下載
16:47:20 [t1] c.demo4 - 結果的大小是:1

非同步模式之生產者/消費者

要點

  • 與前面的保護性暫停中的 GuardObject 不同,不需要產生結果和消費結果的線程一一對應
  • 消費隊列可以用來平衡生產和消費的線程資源
  • 生產者僅負責產生結果數據,不關心數據該如何處理,而消費者專心處理結果數據
  • 消息隊列是有容量限制的,滿時不會再加入數據,空時不會再消耗數據
  • JDK 中各種阻塞隊列,採用的就是這種模式

package WaNo;

import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

@Slf4j(topic = "c.demo5")
public class demo5 {
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);

        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id,"值"+id));
            },"生產者" + i).start();
        }

        new Thread(() -> {
            while (true){
                try {
                    Thread.sleep(1000);
                    Message message = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消費者").start();
    }
}

//消息隊列類(線程間通信)
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    //消息隊列集合
    private LinkedList<Message> list = new LinkedList<>();
    //隊列容量
    private int capcity;

    public MessageQueue(int capcity){
        this.capcity = capcity;
    }

    //獲取消息
    public Message take(){
        //檢查隊列是否為空
        synchronized (list){
            while (list.isEmpty()){
                try {
                    log.debug("隊列為空,消費者線程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //從隊列頭部獲取消息返回
            Message message = list.removeFirst();
            log.debug("已消費消息 {}",message);
            list.notifyAll();
            return message;
        }
    }

    //存入消息
    public void put(Message message){
        synchronized (list){
            //檢查隊列是否已滿
            while (list.size() == capcity){
                try {
                    log.debug("隊列已滿,生產者線程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //將消息加入隊列的尾部
            list.addLast(message);
            log.debug("已生產消息 {}",message);
            list.notifyAll();
        }
    }
}

@Setter
@AllArgsConstructor
@ToString
@Slf4j(topic = "c.Message")
final class Message {
    private int id;
    private Object value;
}

輸出:

17:18:49 [生產者0] c.MessageQueue - 已生產消息 Message(id=0, value=值0)
17:18:49 [生產者2] c.MessageQueue - 已生產消息 Message(id=2, value=值2)
17:18:49 [生產者1] c.MessageQueue - 隊列已滿,生產者線程等待
17:18:50 [消費者] c.MessageQueue - 已消費消息 Message(id=0, value=值0)
17:18:50 [生產者1] c.MessageQueue - 已生產消息 Message(id=1, value=值1)
17:18:51 [消費者] c.MessageQueue - 已消費消息 Message(id=2, value=值2)
17:18:52 [消費者] c.MessageQueue - 已消費消息 Message(id=1, value=值1)
17:18:53 [消費者] c.MessageQueue - 隊列為空,消費者線程等待

park、unpark

基本使用

它們是 LockSupport 類中的方法

// 暫停當前線程
LockSupport.park(); 

// 恢復某個線程的運行
LockSupport.unpark(暫停線程對象)

先 park 再 unpark

Thread t1 = new Thread(() -> {
 	log.debug("start...");
 	sleep(1);
 	log.debug("park...");
 	LockSupport.park();
 	log.debug("resume...");
},"t1");
t1.start();

Thread.sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

輸出:

18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park... 
18:42:54.583 c.TestParkUnpark [main] - unpark... 
18:42:54.583 c.TestParkUnpark [t1] - resume...

先 unpark 再 park

Thread t1 = new Thread(() -> {
 	log.debug("start...");
     sleep(2);
     log.debug("park...");
     LockSupport.park();
     log.debug("resume...");
}, "t1");
t1.start();

sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

輸出:

18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark... 
18:43:52.769 c.TestParkUnpark [t1] - park... 
18:43:52.769 c.TestParkUnpark [t1] - resume...

特點

與 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必須配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以線程為單位來【阻塞】和【喚醒】線程,而 notify 只能隨機喚醒一個等待線程,notifyAll 是喚醒所有等待線程,就不那麼【精確】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

原理

每個線程都有自己的一個 Parker 對象,由三部分組成 _counter , _cond 和 _mutex

  1. 當前線程調用 Unsafe.park() 方法
  2. 檢查 _counter ,本情況為 0,這時,獲得 _mutex 互斥鎖
  3. 線程進入 _cond 條件變數阻塞
  4. 設置 _counter = 0

  1. 調用 Unsafe.unpark(Thread_0) 方法,設置 _counter 為 1
  2. 喚醒 _cond 條件變數中的 Thread_0
  3. Thread_0 恢復運行
  4. 設置 _counter 為 0

  1. 調用 Unsafe.unpark(Thread_0) 方法,設置 _counter 為 1
  2. 當前線程調用 Unsafe.park() 方法
  3. 檢查 _counter ,本情況為 1,這時線程無需阻塞,繼續運行
  4. 設置 _counter 為 0

重新理解六種狀態

假設有線程 Thread t

情況一

NEW --> RUNNABLE

當調用 t.start() 方法時,由 NEW --> RUNNABLE

情況二

​ RUNNABLE <--> WAITING

t 線程用 synchronized(obj) 獲取了對象鎖後

  • 調用 obj.wait() 方法時,t 線程從 RUNNABLE --> WAITING
  • 調用 obj.notify() , obj.notifyAll() , t.interrupt() 時
    • 競爭鎖成功,t 線程從 WAITING --> RUNNABLE
    • 競爭鎖失敗,t 線程從 WAITING --> BLOCKED

情況三

RUNNABLE <--> WAITING

  • 當前線程調用 t.join() 方法時,當前線程從 RUNNABLE --> WAITING
    • 註意是當前線程t 線程對象的監視器上等待
  • t 線程運行結束,或調用了當前線程的 interrupt() 時,當前線程從 WAITING --> RUNNABLE

情況四

RUNNABLE <--> WAITING

  • 當前線程調用 LockSupport.park() 方法會讓當前線程從 RUNNABLE --> WAITING
  • 調用 LockSupport.unpark(目標線程) 或調用了線程 的 interrupt() ,會讓目標線程從 WAITING --> RUNNABLE

情況五

​ RUNNABLE <--> TIMED_WAITING

t 線程用 synchronized(obj) 獲取了對象鎖後

  • 調用 obj.wait(long n) 方法時,t 線程從 RUNNABLE --> TIMED_WAITING
  • t 線程等待時間超過了 n 毫秒,或調用 obj.notify() , obj.notifyAll() , t.interrupt() 時
    • 競爭鎖成功,t 線程從 TIMED_WAITING --> RUNNABLE
    • 競爭鎖失敗,t 線程從 TIMED_WAITING --> BLOCKED

情況六

RUNNABLE <--> TIMED_WAITING

  • 當前線程調用 t.join(long n) 方法時,當前線程從 RUNNABLE --> TIMED_WAITING
    • 註意是當前線程t 線程對象的監視器上等待
  • 當前線程等待時間超過了 n 毫秒,或t 線程運行結束,或調用了當前線程的 interrupt() 時,當前線程從TIMED_WAITING --> RUNNABLE

情況七

RUNNABLE <--> TIMED_WAITING

  • 當前線程調用 Thread.sleep(long n) ,當前線程從 RUNNABLE --> TIMED_WAITING
  • 當前線程等待時間超過了 n 毫秒,當前線程從 TIMED_WAITING --> RUNNABLE

情況八

RUNNABLE <--> TIMED_WAITING

  • 當前線程調用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 時,當前線程從 RUNNABLE --> TIMED_WAITING
  • 調用 LockSupport.unpark(目標線程) 或調用了線程 的 interrupt() ,或是等待超時,會讓目標線程從TIMED_WAITING--> RUNNABLE

情況九

RUNNABLE <--> BLOCKED

  • t 線程用 synchronized(obj) 獲取了對象鎖時如果競爭失敗,從 RUNNABLE --> BLOCKED
  • 持 obj 鎖線程的同步代碼塊執行完畢,會喚醒該對象上所有 BLOCKED 的線程重新競爭,如果其中 t 線程競爭成功,從 BLOCKED --> RUNNABLE ,其它失敗的線程仍然 BLOCKED

情況十

RUNNABLE <--> TERMINATED

當前線程所有代碼運行完畢,進入 TERMINATED

多把鎖

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo6")
public class demo6 {
    public static void main(String[] args) {
        BigRoom bigRoom = new BigRoom();
        new Thread(() -> {
            bigRoom.study();
        },"r1").start();

        new Thread(() -> {
            bigRoom.sleep();
        },"r2").start();
    }
}

@Slf4j(topic = "c.BigRoom")
class BigRoom {
    private final Object studyRoom = new Object();
    private final Object bedRoom = new Object();

    public void sleep(){
        synchronized (bedRoom){
            log.debug("sleep two hours");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void study(){
        synchronized (studyRoom){
            log.debug("study one hour");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

輸出:

20:01:42 [r2] c.BigRoom - sleep two hours
20:01:42 [r1] c.BigRoom - study one hour

將鎖的粒度細分

  • 好處,是可以增強併發度
  • 壞處,如果一個線程需要同時獲得多把鎖,就容易發生死鎖

活躍性

死鎖

有這樣的情況:一個線程需要同時獲取多把鎖,這時就容易發生死鎖

t1 線程 獲得 A對象 鎖,接下來想獲取 B對象 的鎖 t2 線程 獲得 B對象 鎖,接下來想獲取 A對象 的鎖

Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
 	synchronized (A) {
 		log.debug("lock A");
 		sleep(1);
		synchronized (B) {
 			log.debug("lock B");
 			log.debug("操作...");
 		}
 	}
}, "t1");

Thread t2 = new Thread(() -> {
 	synchronized (B) {
 		log.debug("lock B");
 		sleep(0.5);
 		synchronized (A) {
 			log.debug("lock A");
  			log.debug("操作...");
 		}
 	}
}, "t2");
t1.start();
t2.start();

輸出:

12:22:06.962 [t2] c.TestDeadLock - lock B 
12:22:06.962 [t1] c.TestDeadLock - lock A

哲學家進餐問題

有五位哲學家,圍坐在圓桌旁。

  • 他們只做兩件事,思考和吃飯,思考一會吃口飯,吃完飯後接著思考。
  • 吃飯時要用兩根筷子吃,桌上共有 5 根筷子,每位哲學家左右手邊各有一根筷子。
  • 如果筷子被身邊的人拿著,自己就得等待

這種線程沒有按預期結束,執行不下去的情況,歸類為【活躍性】問題,除了死鎖以外,還有活鎖和饑餓者兩種情況

活鎖

活鎖出現在兩個線程互相改變對方的結束條件,最後誰也無法結束

public class TestLiveLock {
 	static volatile int count = 10;
 	static final Object lock = new Object();
 	public static void main(String[] args) {
 	new Thread(() -> {
 		// 期望減到 0 退出迴圈
 		while (count > 0) {
 			sleep(0.2);
 			count--;
 			log.debug("count: {}", count);
 		}
 	}, "t1").start();
 	
 	new Thread(() -> {
 		// 期望超過 20 退出迴圈
 		while (count < 20) {
 			sleep(0.2);
 			count++;
 			log.debug("count: {}", count);
 		}
 	}, "t2").start();
 }

饑餓

一個線程由於優先順序太低,始終得不到 CPU 調度執行,也不能夠結束

ReentrantLock

相對於 synchronized 它具備如下特點

  • 可中斷
  • 可以設置超時時間
  • 可以設置為公平鎖
  • 支持多個條件變數

與 synchronized 一樣,都支持可重入

基本語法

// 獲取鎖
reentrantLock.lock();
try {
 	// 臨界區
} finally {
 	// 釋放鎖
 	reentrantLock.unlock();
}

可重入

可重入是指同一個線程如果首次獲得了這把鎖,那麼因為它是這把鎖的擁有者,因此有權利再次獲取這把鎖如果是不可重入鎖,那麼第二次獲得鎖時,自己也會被鎖擋住

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo1")
public class demo1 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {

        lock.lock();
        try {
            log.debug("enter main");
            m1();
        }finally {
            lock.unlock();
        }
    }

    public static void m1(){
        lock.lock();
        try {
            log.debug("enter m1");
            m2();
        }finally {
            lock.unlock();
        }
    }

    public static void m2(){
        lock.lock();
        try {
            log.debug("enter m2");
        }finally {
            lock.unlock();
        }
    }
}

輸出:

20:19:19 [main] c.demo1 - enter main
20:19:19 [main] c.demo1 - enter m1
20:19:19 [main] c.demo1 - enter m2

可打斷

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo2")
public class demo2 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                //如果沒有競爭,此方法會獲取對象的鎖
                //如果有競爭,就進入阻塞隊列,可以被其他線程用 interrupt 打斷
                log.debug("嘗試獲得鎖");
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("未獲得鎖,返回");
                return;
            }
            try {
                log.debug("獲取到鎖");
            }finally {
                lock.unlock();
            }
        }, "t1");

        lock.lock();
        t1.start();
        Thread.sleep(1000);
        log.debug("打斷t1");
        t1.interrupt();
    }
}

輸出:

20:26:05 [t1] c.demo2 - 嘗試獲得鎖
20:26:06 [main] c.demo2 - 打斷t1
20:26:06 [t1] c.demo2 - 未獲得鎖,返回
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
	at ReentrantLockDemo.demo2.lambda$main$0(demo2.java:16)
	at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 0

註意如果是不可中斷模式,那麼即使使用了 interrupt 也不會讓等待中斷

鎖超時

立刻失敗

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo3")
public class demo3 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("嘗試獲得鎖");
            if(!lock.tryLock()){
                log.debug("獲取不到鎖");
                return;
            }
            try {
                log.debug("獲得到鎖");
            }finally {
                lock.unlock();
            }
        },"t1");

        lock.lock();
        log.debug("獲得到鎖");
        t1.start();
    }
}

輸出:

20:31:15 [main] c.demo3 - 獲得到鎖
20:31:15 [t1] c.demo3 - 嘗試獲得鎖
20:31:15 [t1] c.demo3 - 獲取不到鎖

超時失敗

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;
import sun.reflect.generics.tree.Tree;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo3")
public class demo3 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            log.debug("嘗試獲得鎖");
            try {
                if(!lock.tryLock(1, TimeUnit.SECONDS)){
                    log.debug("獲取不到鎖");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("獲取不到鎖");
                return;
            }
            try {
                log.debug("獲得到鎖");
            }finally {
                lock.unlock();
            }
        },"t1");

        lock.lock();
        log.debug("獲得到鎖");
        Thread.sleep(1000);
        lock.unlock();
        t1.start();
    }
}

輸出:

20:34:03 [main] c.demo3 - 獲得到鎖
20:34:04 [t1] c.demo3 - 嘗試獲得鎖
20:34:04 [t1] c.demo3 - 獲得到鎖

公平鎖

ReentrantLock 預設是不公平的

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(false);
        lock.lock();
        for (int i = 0; i < 500; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "t" + i).start();
        }
// 1s 之後去爭搶鎖
        Thread.sleep(1000);
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " start...");
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " running...");
            } finally {
                lock.unlock();
            }
        }, "強行插入").start();
        lock.unlock();
    }
}

強行插入,有機會在中間輸出

註意該實驗不一定總能復現

t39 running... 
t40 running... 
t41 running... 
t42 running... 
t43 running... 
強行插入 start... 
強行插入 running... 
t44 running... 
t45 running... 
t46 running... 
t47 running... 
t49 running...

改為公平鎖後

ReentrantLock lock = new ReentrantLock(true);

強行插入,總是在最後輸出

t465 running... 
t464 running... 
t477 running... 
t442 running... 
t468 running... 
t493 running... 
t482 running... 
t485 running... 
t481 running... 
強行插入 running...

公平鎖一般沒有必要,會降低併發度

條件變數

ReentrantLock 的條件變數比 synchronized 強大之處在於,它是支持多個條件變數的,這就好比

  • synchronized 是那些不滿足條件的線程都在一間休息室等消息
  • 而 ReentrantLock 支持多間休息室,有專門等煙的休息室、專門等早餐的休息室、喚醒時也是按休息室來喚醒

使用要點:

  • await 前需要獲得鎖
  • await 執行後,會釋放鎖,進入 conditionObject 等待
  • await 的線程被喚醒(或打斷、或超時)取重新競爭 lock 鎖
  • 競爭 lock 鎖成功後,從 await 後繼續執行
package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo4")
public class demo4 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        //創建一個新的條件變數(休息室)
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();

        lock.lock();
        //進入休息室等待
        condition1.await();
        
        condition1.signal();
        //condition1.signalAll();
    }
}

同步模式之順序控制

固定運行順序

比如,必須先 2 後 1 列印

wait notify版

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
    static final Object lock = new Object();
    //表示 t2 是否被運行過
    static boolean t2runned = false;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock){
                while (!t2runned){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("1");
            }
        },"t1");

        Thread t2 = new Thread(() -> {
            synchronized (lock){
                log.debug("2");
                t2runned = true;
                lock.notify();
            }
        },"t2");

        t1.start();
        t2.start();
    }
}

輸出:

20:49:28 [t2] c.demo4 - 2
20:49:28 [t1] c.demo4 - 1

park unpark版

可以看到,實現上很麻煩:

  • 首先,需要保證先 wait 再 notify,否則 wait 線程永遠得不到喚醒。因此使用了『運行標記』來判斷該不該wait
  • 第二,如果有些干擾線程錯誤地 notify 了 wait 線程,條件不滿足時還要重新等待,使用了 while 迴圈來解決此問題
  • 最後,喚醒對象上的 wait 線程需要使用 notifyAll,因為『同步對象』上的等待線程可能不止一個

可以使用 LockSupport 類的 park 和 unpark 來簡化上面的題目:

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "demo5")
public class demo5 {
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            LockSupport.park();
            log.debug("1");
        }, "t1");

        t1.start();

        new Thread(() -> {
            log.debug("2");
            LockSupport.unpark(t1);
        },"t2").start();
    }
}

交替輸出

線程 1 輸出 a 5 次,線程 2 輸出 b 5 次,線程 3 輸出 c 5 次。現在要求輸出 abcabcabcabcabc 怎麼實現

wait notify版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "demo5")
public class demo5 {
    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify(1,5);
        new Thread(() -> {
            wn.print("a",1,2);
        }).start();
        new Thread(() -> {
            wn.print("b",2,3);
        }).start();
        new Thread(() -> {
            wn.print("c",3,1);
        }).start();
    }
}
/*
    輸出內容    等待標記    下一個標記
    a           1           2
    b           2           3
    c           3           1
 */
@AllArgsConstructor
class WaitNotify{
    //等待標記
    private int flag;
    //迴圈次數
    private int loopNumber;

    //列印
    public void print(String str,int waitFlag,int nextFlag){
        for (int i = 0; i < loopNumber; i++) {
            synchronized (this){
                while (flag != waitFlag){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                flag = nextFlag;
                this.notifyAll();
            }
        }
    }
}

輸出:

abcabcabcabcabc

ReentrantLock版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo6")
public class demo6 {
    public static void main(String[] args) throws InterruptedException {
        AwaitSignal awaitSignal = new AwaitSignal(5);
        Condition a = awaitSignal.newCondition();
        Condition b = awaitSignal.newCondition();
        Condition c = awaitSignal.newCondition();
        new Thread(() -> {
            awaitSignal.print("a", a, b);
        }).start();
        new Thread(() -> {
            awaitSignal.print("b", b, c);
        }).start();
        new Thread(() -> {
            awaitSignal.print("c", c, a);
        }).start();

        Thread.sleep(1000);
        awaitSignal.lock();
        try {
            System.out.println("開始。。。");
            a.signal();
        }finally {
            awaitSignal.unlock();
        }
    }
}

@AllArgsConstructor
class AwaitSignal extends ReentrantLock {
    private int loopNumber;
    /**
     * @param str 列印內容
     * @param current   進入哪一間休息室
     * @param next  下一間休息室
     */
    public void print(String str,Condition current,Condition next){
        for (int i = 0; i < loopNumber; i++) {
            lock();
            try {
                try {
                    current.await();
                    System.out.print(str);
                    next.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
                unlock();
            }
        }
    }
}

輸出:

開始。。。
abcabcabcabcabc

park unpark版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;

import java.util.concurrent.locks.LockSupport;

public class demo7 {

    static  Thread t1;
    static  Thread t2;
    static  Thread t3;

    public static void main(String[] args) {
        ParkUnpark pu = new ParkUnpark(5);
        t1 = new Thread(() -> {
            pu.print("a", t2);
        },"t1");
        t2 = new Thread(() -> {
            pu.print("b", t3);
        },"t2");
        t3 = new Thread(() -> {
            pu.print("c", t1);
        },"t3");

        t1.start();
        t2.start();
        t3.start();

        LockSupport.unpark(t1);
    }
}

@AllArgsConstructor
class ParkUnpark{
    private int loopNumber;

    public void print(String str,Thread next){
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(next);
        }
    }
}

輸出:

abcabcabcabcabc

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 日常開發過程過程中。樹形結構運用的非常頻繁。 例如:公司組織結構、各種分類結構、分組結構等等。 SET FOREIGN_KEY_CHECKS = 0; CREATE TABLE IF NOT EXISTS `tbl_sapo_group` ( `id` int(10) unsigned NOT NU ...
  • 引子 把大象裝進冰箱需要3步:打開冰箱門,把大象裝入冰箱,關閉冰箱門。 擴展一下,我們考慮把動物裝進冰箱的場景。比如,把豬🐷裝進冰箱,把狗🐶裝進冰箱,等等。 怎麼利用面向對象的思想來進行程式設計呢? talk is cheap, show me the code. 把大象裝進冰箱的程式設計及實現 ...
  • Spring容器包含兩個重要的特性:面向切麵編程(AOP)和控制反轉(IOC)。面向切麵編程是面向對象(OOP)的一種補充,在面向對象編程的過程中編程針對的目標是一個個對象,而面向切麵編程中編程針對的目標是一個個切麵。切麵支持跨類型跨對象(如事務的切麵可以加在任何地方)進行模塊化。 前言 AOP是S ...
  • 目錄 一.簡介 二.效果演示 三.源碼下載 四.猜你喜歡 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 轉場 零基礎 O ...
  • 1.背景 在大型項目開發過程中,經常會遇到列印大量日誌,輸出信息和在源碼中寫註釋的情況。對於軟體開發來說,我們一般都是列印輸出英文的日誌(主要考慮軟體在各種環境下的相容性,如果列印中文日誌可能會出現亂碼,另外英文日誌更容易搜索,更容易後續做國際化),但是對於我們中國人來說,很容易就把中文全形的中文標 ...
  • 每天都會分享幾個有趣的Python小知識,現在給大家分享幾個適合新手練習的小項目,好玩不燒腦,提升技能不在話下。等會就叫你的室友跟你一起VS,輕輕鬆松成為捲王。 但是問題有三個: 1、你不知道已經有哪些輪子已經造好了,哪個適合你用。有名有姓的的著名輪子就400多個,更別說沒名沒姓自己在製造中的輪子。 ...
  • 工作很多年後,才發現有很多工具類庫,可以大大簡化代碼量,提升開發效率,初級開發者卻不知道。而這些類庫早就成為了業界標準類庫,大公司的內部也都在使用,如果剛工作的時候就有人告訴我使用這些工具類庫,該多好! 一塊看一下有哪些工具類庫你也用過。 1. Java自帶工具方法 1.1 List集合拼接成以逗號 ...
  • 又到每天Python小技巧分享的時候了,今天給大家分享的是怎麼樣去爬取清純小姐姐照片(沒有人會拒絕美女吧,小聲說),這篇文章好像有點刺激,未成年的小伙伴就不要進來了。快來看看這些清純的小姐姐的容顏,話不多說,上教程。 先來看看效果圖 不好意思,圖片有點辣眼睛,被攔截了,還沒有還給我..... imp ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...