6.併發編程,總結

来源:https://www.cnblogs.com/changxin7/archive/2019/08/24/11405931.html
-Advertisement-
Play Games

1. course 1.進程創建的兩種方式 1. 開啟進程的第一種方式: 2. 開啟進程的第二種方式: 3. 簡單應用 2.獲取進程pid 3.驗證進程之間的空間隔離 4. join 5.進程的其他參數 6.守護進程 7.僵屍進程孤兒進程 基於 環境( ) 主進程需要等待子進程結束之後,主進程才結束 ...


1. course

1.進程創建的兩種方式

  1. 開啟進程的第一種方式:

    from multiprocessing import Process
    import random
    import time
    
    
    def task(name):
        print(f'{name} is running')
        time.sleep(random.randint(1, 3))
        print(f'{name} is gone')
    
    
    if __name__ == '__main__':  # 在windows環境下, 開啟進程必須在 __name__ == '__main__' 下麵
        p = Process(target=task, args=('常鑫'))  # 創建一個進程對象
        p.start()  
        '''
        只是想操作系統發出一個開闢子進程的信號,然後就執行下一行 這個信號操作系統收到後,會從記憶體中開闢一個子進程空間,然後將主進程所有數據copy載入到子進程,然後再調用cpu去執行開闢子進程的開銷很大
        '''
        print('開始')
        time.sleep(2)
    # 所以永遠先執行主進程的代碼
  2. 開啟進程的第二種方式:

    from multiprocessing import Process
    import random
    import time
    
    
    class MyProcess(Process):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run1(self):
            print(f'{self.name} is running')
            time.sleep(random.randint(1, 3))
            print(f'{self.name} is gone')
    
    
    if __name__ == '__main__':
        p = MyProcess('常鑫')
        p.start()
        print('==主')
  3. 簡單應用

    # 簡單應用/
    from multiprocessing import Process
    import time
    
    
    def task(name):
        print(f'{name} is running')
        time.sleep(1)
        print(f'{name} is gone')
    
    
    def task1(name):
        print(f'{name} is running')
        time.sleep(2)
        print(f'{name} is gone')
    
    
    def task2(name):
        print(f'{name} is running')
        time.sleep(3)
        print(f'{name} is gone')
    
    
    if __name__ == '__main__':
        p1 = Process(target=task, args=('常鑫一號英雄',))
        p2 = Process(target=task, args=('常鑫二號英雄',))
        start_time = time.time()
        task(1)
        task1(2)
        task2(3)
        print(f'結束時間{time.time() - start_time}')
    # 三個進程併發或者並行的執行三個任務
    # 創建進程是並行,不創建是串列

2.獲取進程pid

import os

print(f'子進程:{os.getpid()}')
print(f'主進程:{os.getppid()}')

cmd命令查看pid
tasklist 查看所有進程的pid
tasklist|findstr pycharm  查看pycharm的pid
from multiprocessing import Process
import os

print(f'子進程:{os.getpid()}')
print(f'主進程:{os.getppid()}')


def task(name):
    print(f'子進程:{os.getpid()}')
    print(f'主進程:{os.getppid()}')


if __name__ == '__main__':
    p = Process(target=task, args=('常鑫',))
    p.start()
    print('==主開始')
    print(f'==主{os.getpid()}')
    print(f'===主{os.getppid()}')

3.驗證進程之間的空間隔離

初始子進程的時候copy主進程,之後主進程和子進程沒有任何聯繫,不共同享用任何內容
from multiprocessing import Process
import time

name = '常鑫'


def task():
    global name
    name = '郭記'
    print(f'子進程的名字是: {name}')


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    time.sleep(1)
    print(f'主進程的名字是: {name}')
----------------------------分割線-----------------------------
lst = ['郭蘇慧', ]


def task1():
    lst.append('郭記')
    print(f'子進程的名字是: {lst}')


if __name__ == '__main__':
    p = Process(target=task1)
    p.start()
    time.sleep(2)
    print(f'主進程的名字是: {lst}')

4. join

join 讓主進程等待子進程結束之後再執行主進程
join 只針對主進程,如果join下麵多次join 他是不阻塞的
join 就是阻塞,主進程有join,主進程下麵的代碼一律不執行,直到進程執行完畢之後,在執行.
# 正確 重點
from multiprocessing import Process
import time


def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')


if __name__ == '__main__':
    start_time = time.time()
    l1 = []
    for i in range(1, 4):
        p = Process(target=task, args=(i,))
        l1.append(p)
        p.start()

    for i in l1:
        i.join()

    print(f'==主{time.time() - start_time}')

錯誤示範:
for i in range(1,4):
    p = Process(target=task,args=(i,))
    p.start()
    p.join()
'''
p1 = Process(target=task,args=(1,))
p1.start()
p1.join()
p2 = Process(target=task,args=(2,))
p2.start()
p2.join()
p3 = Process(target=task,args=(3,))
p3.start()
p3.join()

'''
# join讓主進程等待子進程結束之後再執行主進程
from multiprocessing import Process
import time


def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')


if __name__ == '__main__':
    p = Process(target=task, args=('常鑫',))
    p.start()
    p.join()
    print('==主進程開始')


# 多個進程使用join

def task(name, sec):
    print(f'{name} is running')
    time.sleep(sec)
    print(f'{name} is gone')


if __name__ == '__main__':
    star_time = time.time()
    start_time = time.time()
    p1 = Process(target=task, args=('常鑫', 1))
    p2 = Process(target=task, args=('李業', 2))
    p3 = Process(target=task, args=('海狗', 3))
    p1.start()
    p2.start()
    p3.start()
    # join 只針對主進程,如果join下麵多次join 他是不阻塞的.
    p1.join()
    p2.join()
    p3.join()
    print(f'==主{time.time()-start_time}')
# ----------------------------------------------------------
def task(name, sec):
    print(f'{name}is running')
    time.sleep(sec)
    print(f'{name} is gone')


if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task, args=('常鑫', 3))
    p2 = Process(target=task, args=('李業', 2))
    p3 = Process(target=task, args=('海狗', 1))

    p1.start()
    p2.start()
    p3.start()
    # join就是阻塞

    p1.join()  # 等2s
    print(f'==主1:{time.time() - start_time}')
    p2.join()
    print(f'===主2:{time.time() - start_time}')
    p3.join()
    print(f'==主3:{time.time() - start_time}')

5.進程的其他參數

 p.terminate()  # 殺死子進程 ***
 print(p.is_alive())  # *** 判斷子進程 False True
 p.join()  # ***
from multiprocessing import Process
import time


def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')


if __name__ == '__main__':
    p = Process(target=task, args=('常鑫', ), name='Alex')
    p.start()
    time.sleep(1)
    p.terminate()  # 殺死子進程 ***
    p.join()  # ***
    time.sleep(1)
    print(p.is_alive())  # *** 判斷子進程 False
    print(p.name)
    p.name = 'sb'
    print(p.name)
    print('主程式開始')

6.守護進程

p.daemon = True
將p子進程設置成守護進程,只要主進程結束,守護進程馬上結束 一定要在子進程開啟之前設置
from multiprocessing import Process
import time


def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')


if __name__ == '__main__':
    p = Process(target=task, args=('常鑫',))  # 創建一個進程對象
    p.daemon = True  # 將p子進程設置成守護進程,只要主進程結束,守護進程馬上結束.
    p.start()
    # p.daemon = True  # 一定要在子進程開啟之前設置
    time.sleep(1)
    print('===主')

7.僵屍進程孤兒進程

基於unix環境(linux, macOS)

  • 主進程需要等待子進程結束之後,主進程才結束

    ==主進程時刻監測子進程的運行狀態,當子進程結束之後,一段時間之內,將子進程進行回收.==

  • 為什麼主進程不在子進程結束後馬上對其回收呢?

    1. 主進程與子進程是非同步關係.主進程無法馬上捕獲子進程什麼時候結束.
    2. 如果子進程結束之後馬上再記憶體中釋放資源,主進程就沒有辦法監測子進程的狀態了.
  • unix針對於上面的問題,提供了一個機制.

    所有的子進程結束之後,立馬會釋放掉文件的操作鏈接,記憶體的大部分數據,但是會保留一些內容: ==進程號,結束時間,運行狀態==,等待主進程監測,回收.

  • 僵屍進程:==所有的子進程結束之後,在被主進程回收之前,都會進入僵屍進程狀態.==

  • 僵屍進程有無危害???

    如果父進程不對僵屍進程進行回收(wait/waitpid),產生大量的僵屍進程,這樣就會占用記憶體,占用進程pid號.

  • 孤兒進程:

    父進程由於某種原因結束了,但是你的子進程還在運行中,這樣你的這些子進程就成了孤兒進程.你的父進程如果結束了,你的所有的孤兒進程就會被init進程的回收,init就變成了你的父進程,對你進行回收.

  • 僵屍進程如何解決???

    父進程產生了大量子進程,但是不回收,這樣就會形成大量的僵屍進程,解決方式==就是直接殺死父進程==,將所有的僵屍進程變成孤兒進程進程,init進行回收.

8.互斥鎖

互斥鎖: 
    指散佈在不同進程之間的若幹程式片斷,當某個進程運行其中一個程式片段時,其它進程就不能運行它們之中的任一程式片段,只能等到該進程運行完這個程式片段後才可以運行的一種類似於"鎖"的機制

版本一:
    現在是所有的進程都併發的搶占印表機.
    併發是以效率優先的,但是目前我們的需求: 順序優先.
    多個進程共強一個資源時, 要保證順序優先: 串列,一個一個來.
版本二:
    我們利用join 解決串列的問題,保證了順序優先,但是這個誰先誰後是固定的.
    這樣不合理. 你在爭搶同一個資源的時候,應該是先到先得,保證公平.

lock與join的區別.
共同點: 都可以把併發變成串列, 保證了順序.
不同點: join人為設定順序,lock讓其爭搶順序,保證了公平性.
版本三:(for i in 迴圈)
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys


def task1(p, lock):
    lock.acquire()
    print(f'{p}開始列印了')
    time.sleep(random.randint(1, 3))
    print(f'{p}開始列印了')
    lock.release()


def task2(p, lock):
    lock.acquire()
    print(f'{p}開始列印了')
    time.sleep(random.randint(1, 3))
    print(f'{p}開始列印了')
    lock.release()


def task3(p, lock):
    lock.acquire()
    print(f'{p}開始列印了')
    time.sleep(random.randint(1, 3))
    print(f'{p}開始列印了')
    lock.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(1, 4):
        p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(i, mutex))
        p.start()

9.進程之間的通信

進程在記憶體級別是隔離的,但是文件在磁碟上

1.基於文件通信

搶票系統.
1. 先可以查票.查詢餘票數.  併發
2. 進行購買,向服務端發送請求,服務端接收請求,在後端將票數-1,返回到前端. [串列].

當很多進程共強一個資源(數據)時, 你要保證順序(數據的安全),一定要串列.
互斥鎖: 可以公平性的保證順序以及數據的安全.
基於文件的進程之間的通信:
    1.效率低.
    2.自己加鎖麻煩而且很容易出現死鎖.
from multiprocessing import Process
from multiprocessing import Lock
import random
import time
import json
import os


def search():
    time.sleep(random.randint(1, 3))  # 模擬網路延遲(查詢環節)
    with open('db.json', encoding='utf-8') as f1:
        dic = json.load(f1)
        print(f'{os.getpid()}查看了票的剩餘量,還剩餘{dic["count"]}')


def paid():
    with open('db.json', encoding='utf-8') as f1:
        dic = json.load(f1)
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', encoding='utf-8', mode='w') as f1:
            json.dump(dic, f1)
        print(f'{os.getpid()}購票成功, 還剩餘{dic["count"]}票')
    else:
        time.sleep(1)
        print(f'{os.getpid()}購票未成功')


def task(lock):
    search()
    lock.acquire()
    paid()
    lock.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(5):
        p = Process(target=task, args=(mutex,))
        p.start()

2.基於隊列通信

隊列: 把隊列理解成一個容器,這個容器可以承載一些數據,
隊列的特性: 先進先出永遠保持這個數據. FIFO 羽毛球筒.
    
q.put(5555)  當隊列滿了時,在進程put數據就會阻塞.
print(q.get())  當數據取完時,在進程get數據也會出現阻塞,直到某一個進程put數據.

print(q.get(timeout=3))  阻塞3秒,3秒之後還阻塞直接報錯.
print(q.get(block=False)) 只要遇到阻塞就會報錯.
利用隊列Queue改進選票系統作業:
    - 票數放入隊列中存儲
    - 開啟多個進程進行選票,查票為併發效果,買票為串列效果
    - 購買成功、失敗都需要提示
from multiprocessing import Process
from multiprocessing import Queue
import random
import time
import os


def search(q):
    get = q.get()  # 取出
    print(f'{os.getpid()}查看了票的剩餘量,還剩餘{get["count"]}')
    q.put(get)  # 輸入


def paid(q):
    time.sleep(random.randint(1, 3))
    q_dic = q.get()  # 取出
    q.put(q_dic)  # 輸入
    if q_dic["count"] > 0:
        q_dic["count"] -= 1
        print(f"{os.getpid()}購買成功!{q_dic['count']} ")
        try:
            q.put(q_dic, block=False)
        except Exception:
            pass
    else:
        print(f"{os.getpid()}購買失敗")


def task(q):
    search(q)
    paid(q)


if __name__ == '__main__':
    q = Queue(1)
    q.put({"count": 3})
    for i in range(5):
        p = Process(target=task, args=(q,))
        p.start()
模擬雙十一排隊搶小米手機,多用戶搶購,只能選取前10個用戶:
    開啟多個用戶搶購買手機。
    只能限定10人購買。
    最終將10個用戶的排名展示出來。

import os
from multiprocessing import Queue
from multiprocessing import Process


def task(q):
    try:
        q.put(f'{os.getpid()}', block=False)
    except Exception:
        return


if __name__ == '__main__':
    q = Queue(10)
    for i in range(100):
        p = Process(target=task, args=(q,))
        p.start()
    for i in range(1, 10):
        print(f'排名第{i}的用戶是{q.get()}')

10.生產者消費者模塊

編程思想,模型,設計模式,理論等等,都是交給你一種編程的方法,以後你遇到類似的情況,套用即可.

生產者消費者模型三要素:

​ 生產者: 產生數據的

​ 消費者: 接收數據做進一步處理的

​ 容器: 盆(隊列)

那麼隊列容器起到什麼作用? 起到緩衝的作用,平衡生產力與消費力,解耦.

from multiprocessing import Process
from multiprocessing import Queue
import random
import time


def producer(q, name):
    for i in range(1, 6):
        time.sleep(random.randint(1, 2))
        res = f'{i}號包子'
        q.put(res)
        print(f'生產者{name}生產了{res}')


def consumer(q, name):
    while 1:
        try:
            food = q.get(timeout=3)
            time.sleep(random.randint(1, 3))
            print(f'消費者{name}吃了吃了吃了{food}')
        except Exception:
            pass


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q, '常鑫'))
    p2 = Process(target=consumer, args=(q, '常鑫鑫'))

    p1.start()
    p2.start()

2. thread

1.線程的理論知識

1. 什麼是線程:進程是資源單位, 線程是執行單位.
    進程:進程會在記憶體中開闢一個進程空間,將主進程的資料數據全部複製一份,線程會執行裡面的代碼.
        
2. 線程vs進程:
    1. 開啟進程的開銷非常大,比開啟線程的開銷大很多.
    2. 開啟線程的速度非常快,要快幾十倍到上百倍.
    3. 線程與線程之間可以共用數據,進程與進程之間需藉助隊列等方法實現通信.
    
3. 線程的應用:  數據共用, 開銷小,速度快,
    併發: 一個cpu 看起來像是同時執行多個任務,單個進程開啟三個線程,併發的執行任務
主線程子線程沒有地位之分,但是,一個進程誰在幹活? 一個主線程在幹活,當幹完活了,你得等待其他線程幹完活之後,才能結束本進程
  1. ==什麼是線程==

    一條流水線的工作流程.

    進程: 在記憶體中開啟一個進程空間,然後將主進程的所有的資源數據複製一份,然後調用cpu去執行這些代碼.

    之前的描述不夠具體:

    開啟一個進程:

    在記憶體中開啟一個進程空間,然後將主進程的所有的資源數據複製一份,然後調用線程去執行代碼

    進程是資源單位, 線程是執行單位.

    以後你描述開啟一個進程:

    ​ 開啟一個進程:進程會在記憶體中開闢一個進程空間,將主進程的資料數據全部複製一份,線程會執行裡面的代碼.

  2. ==線程vs進程==

    1. 開啟進程的開銷非常大,比開啟線程的開銷大很多.
    2. 開啟線程的速度非常快.要快幾十倍到上百倍.
    3. 線程線程之間可以共用數據,進程與進程之間需藉助隊列等方法實現通信.
  3. ==線程的應用==

    1. 併發: 一個cpu 看起來像是同時執行多個任務.

      單個進程開啟三個線程.併發的執行任務.

      開啟三個進程併發的執行任務.

      文本編輯器:

      1. 輸入文字.
      2. 在屏幕上顯示.
      3. 保存在磁碟中.

      開啟多線程就非常好了:

      ​ 數據共用, 開銷小,速度快.

    主線程子線程沒有地位之分,但是,一個進程誰在幹活? 一個主線程在幹活,當幹完活了,你得等待其他線程幹完活之後,才能結束本進程

2.開啟線程的兩張方式

第一種方式:
    
from threading import Thread
import time


def task(name):
    print(f'{name} is running')
    time.sleep(1)
    print(f'{name} in gone')


if __name__ == '__main__':
    p1 = Thread(target=task, args=('常鑫',))
    p1.start()
    print('===主線程')
第二種方式:
    
from threading import Thread
import time


class MyThread(Thread):
    def __init__(self, name, l1, s1):
        super().__init__()
        self.name = name
        self.l1 = l1
        self.s1 = s1

    def run(self):
        print(f'{self.name} is running')
        print(f'{self.l1} is running')
        print(f'{self.s1} is running')
        time.sleep(1)
        print(f'{self.name} is gone')
        print(f'{self.l1} is gone')
        print(f'{self.s1} is gone')


if __name__ == '__main__':
    p1 = MyThread('常鑫', [1, 2, 3], '100')
    p1.start()
    print('===主線程')

3.線程vs進程的代碼對比

  1. 開啟速度對比,線程比進程

    from multiprocessing import Process
    
    
    def work():
        print('hello')
    
    
    if __name__ == '__main__':  # 在主進程下開啟線程
        t = Process(target=work)
        t.start()
        print('主線程/主進程')
    from threading import Thread
    import time
    
    
    def task(name):
        print(f'{name} is running')
        time.sleep(1)
        print(f'{name} is gone')
    
    
    if __name__ == '__main__':
        t1 = Thread(target=task, args=('海狗',))
        t1.start()
        print('===主線程')  # 線程是沒有主次之分的.
  2. 對比pid ==同一個pid==

    from threading import Thread
    import os
    
    
    def task():
        print(os.getpid())
    
    
    if __name__ == '__main__':
        t1 = Thread(target=task)
        t2 = Thread(target=task)
        t1.start()
        t2.start()
        print(f'===主線程{os.getpid()}')
  3. 同一個進程內線程共用內部數據

    同一進程內的資源數據對於這個進程的多個線程來說是共用的.
    
    from threading import Thread
    
    x = 3
    
    
    def task():
        global x
        x = 100
    
    
    if __name__ == '__main__':
        t1 = Thread(target=task)
        t1.start()
        t1.join()
        print(f'===主線程{x}')

4.線程的相關其他方法(瞭解)

    # Thread實例對象的方法
    p1.setName('子線程1')  # 設置線程名
    p1.getName()  # 返回線程名
 ---print(p1.name)  # 獲取線程名  ***
    print(p1.isAlive())  # 返回線程是否活動的。

    # threading模塊提供的一些方法:
    print(current_thread())  # 獲取當前線程的對象
    print(currentThread())  # 獲取當前線程的對象
    print(enumerate())  # 返回一個列表,包含所有的線程對象
 ---print(activeCount())  # *** 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
from threading import currentThread
from threading import enumerate
from threading import activeCount
import os
import time

x = 9


def task():
    print(currentThread())
    time.sleep(1)
    print('666')


if __name__ == '__main__':
    p1 = Thread(target=task, name='p1')  # name 設置線程名
    p2 = Thread(target=task, name='p2')  # name 設置線程名
    p1.start()
    p2.start()

    # Thread實例對象的方法
    p1.setName('子線程1')  # 設置線程名
    p2.setName('子線程1')  # 設置線程名
    p1.getName()  # 返回線程名
    p2.getName()  # 返回線程名
    print(p1.name)  # 獲取線程名  ***
    print(p2.name)  # 獲取線程名  ***
    print(p1.isAlive())  # 返回線程是否活動的。
    print(p2.isAlive())  # 返回線程是否活動的。

    # threading模塊提供的一些方法:
    print(currentThread())  # 獲取當前線程的對象
    print(enumerate())  # 返回一個列表,包含所有的線程對象
    print(activeCount())  # *** 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
    print(f'主線程{os.getpid()}')

5.守護線程(考點)

join: 阻塞 告知主線程要等待我子線程執行完畢之後再執行主線程
主線程什麼時候結束???
守護線程 等待非守護子線程以及主線程結束之後,結束.
from threading import Thread
import time


def foo():
    print(123)  # 1
    time.sleep(1)
    print("end123")  # 4


def bar():
    print(456)  # 2
    time.sleep(2)
    print("end456")  # 5


t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon = True
t1.start()
t2.start()
print("main-------")  # 3
# 結果:
# 123
# 456
# main-------
# end123
# end456

6.互斥鎖(考點)

正常情況加鎖之後編程串列
鎖之後加上延遲就不一定,有的可能就會出現插隊現象
from threading import Thread
from threading import Lock
import time
import random

x = 10


def task(lock):
    lock.acquire()
    time.sleep(random.randint(1, 3))  # 卡點
    global x
    temp = x
    time.sleep(0.1)
    temp = temp - 1
    x = temp
    lock.release()


if __name__ == '__main__':
    mutex = Lock()
    l1 = []
    for i in range(10):
        t = Thread(target=task, args=(mutex,))
        l1.append(t)
        t.start()

        time.sleep(1)
        print(f'主線程{x}')

7.死鎖現象與遞歸鎖

  1. 死鎖現象就是: A進程那著A鑰匙去找B鑰匙,B進程拿著B鑰匙去找A鑰匙
  2. 遞歸鎖: 可以解決死鎖現象,業務需要多個鎖時,優先考慮遞歸鎖
  3. 鎖必須寫成==lock_A = lock_B = RLock()==格式,原理是==pid==都一樣,每鎖一次,鎖的數量加一,解開的時候減一.鎖的數量如果不為零其他線程不能搶鎖==from threading import RLock==導入模塊
死鎖現象:
    
from threading import Thread
from threading import Lock
import time

lock_A = Lock()
lock_B = Lock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        lock_A.acquire()
        print(f'{self.name}拿到的A')
        lock_B.acquire()
        print(f'{self.name}拿到的B')

        lock_B.release()
        lock_A.release()

    def f2(self):
        lock_B.acquire()
        print(f'{self.name}拿到的B')
        time.sleep(0.1)
        lock_A.acquire()
        print(f'{self.name}拿到的A')

        lock_A.release()
        lock_B.release()


if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()
遞歸鎖:
    
from threading import Thread
from threading import RLock
import time

lock_A = lock_B = RLock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        lock_A.acquire()
        print(f'{self.name}拿到的A')
        lock_B.acquire()
        print(f'{self.name}拿到的B')

        lock_B.release()
        lock_A.release()

    def f2(self):
        lock_B.acquire()
        print(f'{self.name}拿到的B')
        lock_A.acquire()
        print(f'{self.name}拿到的A')

        time.sleep(1)
        lock_A.release()
        lock_B.release()


if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()

8.信號量

也是一種鎖, 控制併發數量

==from threading import current_thread==獲取當前線程的對象模塊

==from threading import Semaphore==導入信號量模塊

==sem = Semaphore(5)==實例化信號量 不寫時無窮大

==sem.acquire()==函數裡面獲取信號量

from threading import Thread
from threading import Semaphore
from threading import current_thread
import random
import time

sem = Semaphore(5)


def task():
    sem.acquire()
    print(f'{current_thread().name} 房間')
    time.sleep(random.randint(1, 3))
    sem.release()


if __name__ == '__main__':
    for i in range(30):
        t = Thread(target=task, )
        t.start()

9. ==GIL==全局解釋器鎖

  1. 好多自稱大神的說,GIL鎖就是python的致命缺陷,Python不能多核,併發不行等等 .....

  2. ==理論上來說:單個進程的多線程可以利用多核.==

    但是,開發Cpython解釋器的程式員,給進入解釋器的線程加了鎖.

  3. 為什麼加鎖?

    1. 當時都是單核時代,而且cpu價格非常貴.
    2. 如果不加全局解釋器鎖, 開發Cpython解釋器的程式員就會在源碼內部各種主動加鎖,解鎖,非常麻煩,各種死鎖現象等等.他為了省事兒,直接進入解釋器時給線程加一個鎖.
    3. ==優點: 保證了Cpython解釋器的數據資源的安全.==
    4. ==缺點: 單個進程的多線程不能利用多核.==
  4. Jpython沒有GIL鎖. pypy也沒有GIL鎖.

  5. 現在多核時代, 我將Cpython的GIL鎖去掉行麽?

    因為Cpython解釋器所有的業務邏輯都是圍繞著單個線程實現的,去掉這個GIL鎖,幾乎不可能.

  6. ==單個進程的多線程可以併發,但是不能利用多核,不能並行. 多個進程可以併發,並行.==

==io密集型: 單個進程的多線程合適,併發執行==

==計算密集型:多進程的並行==

10.==GIL==鎖與==lock==鎖的區別

  1. 相同點: 都是同種鎖,互斥鎖.
  2. 不同點:
    1. GIL鎖全局解釋器鎖,保護解釋器內部的資源數據的安全.
    2. GIL鎖 上鎖,釋放無需手動操作.
    3. 自己代碼中定義的互斥鎖保護進程中的資源數據的安全.
    4. 自己定義的互斥鎖必須自己手動上鎖,釋放鎖.

11.驗證計算密集型IO密集型的效率

  1. ==io密集型: 單個進程的多線程的併發效率高合適.併發執行==

  2. ==計算密集型:多進程的併發並行效率高.並行==

  3. 代碼驗證:

    計算密集型: 單個進程的多線程併發 vs 多個進程的併發並行
    
    from multiprocessing import Process
    from threading import Thread
    import time
    
    
    def task():
        count = 0
        for i in range(30000000):  # (三千萬)
            count += 1
    
    
    if __name__ == '__main__':
    
        # 多進程的併發,並行  2.3737263679504395秒
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Process(target=task,)
            l1.append(p)
            p.start()
        for i in l1:
            i.join()
        print(f'執行時間:{time.time()-start_time}')
    
        # 多線程的併發  6.290118932723999秒
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
        for i in l1:
            i.join()
        print(f'執行時間:{time.time()-start_time}')
    
    計算密集型: 多進程的併發並行效率高.
    # IO密集型: 單個進程的多線程併發 vs 多個進程的併發並行
    from multiprocessing import Process
    from threading import Thread
    import time
    
    
    def task():
        count = 0
        time.sleep(1)
        count += 1
    
    
    if __name__ == '__main__':
    
        # 多進程的併發,並行  3.0123958587646484秒
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Process(target=task, )
            l1.append(p)
            p.start()
    
        for p in l1:
            p.join()
    
        print(f'執行效率:{time.time() - start_time}')
    
        # 多線程的併發  1.0087950229644775秒
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
    
        for p in l1:
            p.join()
    
        print(f'執行效率:{time.time()- start_time}')
    
    對於IO密集型: 單個進程的多線程的併發效率高.

12.多線程實現==socket==通信

無論是多線程還是多進程,如果按照之前的寫法,來一個客戶端請求,我就開一個線程,來一個請求開一個線程,應該是這樣: 你的電腦允許範圍內,開啟的線程進程數量越多越好.

服務端:
    
from threading import Thread
import socket


def communicate(conn, addr):
    while 1:
        try:
            from_client_data = conn.recv(1024)
            print(f'來{addr[1]}的信息{from_client_data.decode("utf-8")}')
            to_client_data = input('>>>').strip()
            conn.send(to_client_data.encode('utf-8'))
        except Exception:
            break
    conn.close()


def _accket():
    server = socket.socket()
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    while 1:
        conn, addr = server.accept()
        t = Thread(target=communicate, args=(conn, addr))
        t.start()


if __name__ == '__main__':
    _accket()
客戶端:
    
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while 1:
    try:
        to_server_data = input('>>>').strip()
        client.send(to_server_data.encode('utf-8'))
        from_server_data = client.recv(1024)
        print(f'來自伺服器的消息: {from_server_data.decode("utf-8")}')
    except Exception:
        break
client.close()

13.進程池線程池

from concurrent.futures import ProcessPoolExecutor  # 線程池模塊
from concurrent.futures import ThreadPoolExecutor  # 進程池模塊
p = ProcessPoolExecutor()  # 預設不寫,進程池裡面的進程數與cpu核個數相等(並行(並行+併發))
t = ThreadPoolExecutor()  # 預設不寫, cpu核個數*5 線程數  (併發)
print(os.cpu_count())  # 查看電腦幾核
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import random
import time
import os


print(os.cpu_count())  # 查看電腦幾核


def task():
    print(f'pid號: {os.getpid()} 來了')

    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    # 開啟進程池 (並行(並行+併發))
    p = ProcessPoolExecutor()  # 預設不寫,進程池裡面的進程數與cpu個數相等
    for i in range(20):
        p.submit(task, )

    # 開啟線程池 (併發)
    t = ThreadPoolExecutor()  # 預設不寫, cpu個數*5 線程數
    for i in range(40):
        t.submit(task, )

14.阻塞 非阻塞 非同步 同步

問題出在哪裡?
1. 分析結果的過程是串列,效率低.
2. 你將所有的結果全部都爬取成功之後,放在一個列表中,分析.
問題1解決:
在開進程池,再開進程,耗費資源.

'''
爬取一個網頁需要2s,併發爬取10個網頁:2.多s.
分析任務: 1s.    10s. 總共12.多秒.

現在這個版本的過程:
    非同步發出10個爬取網頁的任務,然後4個進程併發(並行)的先去完成4個爬取網頁的任務,然後誰先結束,誰進行下一個
    爬取任務,直至10個任務全部爬取成功.
    將10個爬取結果放在一個列表中,串列的分析.
    
爬取一個網頁需要2s,分析任務: 1s,總共3s,總共3.多秒(開啟進程損耗).
.    10s.
下一個版本的過程:
    非同步發出10個 爬取網頁+分析 的任務,然後4個進程併發(並行)的先去完成4個爬取網頁+分析 的任務,
    然後誰先結束,誰進行下一個 爬取+分析 任務,直至10個爬取+分析 任務全部完成成功.
    '''
回調函數是主進程幫助你實現的, 回調函數幫你進行分析任務. 明確了進程的任務: 只有一個網路爬取.
分析任務: 回調函數執行了.對函數之間解耦.

極值情況: 如果回調函數是IO任務,那麼由於你的回調函數是主進程做的,所以有可能影響效率.

回調不是萬能的,如果回調的任務是IO,
那麼非同步 + 回調機制 不好.此時如果你要效率只能犧牲開銷,再開一個線程進程池.


非同步就是回調! 這個是錯的!! 非同步,回調是兩個概念.
'''
如果多個任務,多進程多線程處理的IO任務.
# 1. 剩下的任務 非IO阻塞.  非同步 + 回調機制
# 2. 剩下的任務 IO << 多個任務的IO  非同步 + 回調機制
# 3. 剩下的任務 IO >= 多個任務的IO  第二種解決方式,或者兩個進程線程池.
'''

15.非同步 調用機制

16.事件==Event==

17.協程的初識

18.協程


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在項目開發中,日誌系統是必不可少的,用`AOP`在Web的請求做入參和出參的參數列印,同時對異常進行日誌列印,避免重覆的手寫日誌,完整案例見文末源碼。 ...
  • 本文主要以一些簡單的小例子,簡述在SpringMVC開發過程中,經常用到的Request方面的內容,僅供學習分享使用,如有不足之處,還請指正。 ...
  • - False bool 類型的假值。 給 False 賦值是非法的並會引發 SyntaxError。 - True bool 類型的真值。 給 True 賦值是非法的並會引發 SyntaxError。 - None NoneType 類型的唯一值。 我理解為空值。 - NotImplemented ...
  • Tinymce富文本 前臺和後臺的使用 一,後臺 Admin 1. 於 settings.py 文件中修改 INSTALLED_APPS 2. 於 settings.py 文件中增添如下配置 1 # 富文本配置 2 TINYMCE_DEFAULT_CONFIG = { 3 'theme': 'adv ...
  • 修改列表中的元素: output: ['history','Chinese','English'] 列表中添加元素: 在末尾添加: output:['math','English'] ['math','English','Chinese'] 插入元素: output:['math','English ...
  • 在 IntelliJ IDEA 中,沒有類似於 Eclipse 工作空間(Workspace)的概念,而是提出了Project和Module這兩個概念。多module有一個父maven工程,多個子工程。在多個子工程中,可能有一個web工程,也可能有多個web工程。這樣的好處在於大大解耦各個modul... ...
  • 會話技術: 會話是什麼? 瀏覽器和伺服器交互,瀏覽器打開網頁訪問伺服器,會話開始,正常交互. 瀏覽器關閉,會話結束. 會話能幹什麼? 會話可以共用數據. Cookie和session將數據保存在不同的位置 進行數據共用 Cookie入門案例 1.創建一個cookie對象 a. Cookie cook ...
  • 1.iter補充 2.ntp_client和ntp_server 3.time複習 4.udp的客戶端與服務端通信 5.解決粘包 # from socket import * # ip_port=('127.0.0.1',8080) # back_log=5 # buffer_size=1024 # ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...