go併發 - channel

来源:https://www.cnblogs.com/asdfzxv/archive/2023/11/19/17841629.html
-Advertisement-
Play Games

公眾號「架構成長指南」,專註於生產實踐、雲原生、分散式系統、大數據技術分享。 在這篇文章中,我們將通過示例來學習 Java 函數式介面。 函數式介面的特點 只包含一個抽象方法的介面稱為函數式介面。 它可以有任意數量的預設靜態方法,但只能包含一個抽象方法。它還可以聲明對象類的方法。 函數介面也稱為單一 ...


概述

併發編程是利用多核心能力,提升程式性能,而多線程之間需要相互協作、共用資源、線程安全等。任何併發模型都要解決線程間通訊問題,毫不誇張的說線程通訊是併發編程的主要問題。go使用著名的CSP(Communicating Sequential Process,通訊順序進程)併發模型,從設計之初 Go 語言就註重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程式員專註於分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力。channel是線程簡通訊的具體實現之一,本質就是一個線程安全的 FIFO 阻塞隊列(先進先出),向隊列中寫入數據,在另一個線程從隊列讀取數據。很多語言都有類似實現,比如 Java 的線程池任務隊列。

基本使用

通道是引用類型,需要使用 make 創建,格式如下

通道實例 := make(chan 數據類型, 通道長度)
  • 數據類型:通道內傳輸的元素類型,可以基本數據類型,也可以使自定義數據類型。
  • 通道實例:通過make創建的通道句柄,與函數名稱一樣,指向通道的記憶體首地址。
  • 通道長度:通道本質是隊列,創建時候可指定長度,預設為0

創建通道

ch1 := make(chan int)                 // 創建一個整型類型的通道
ch2 := make(chan interface{})         // 創建一個空介面類型的通道, 可以存放任意格式
ch3 := make(chan *Equip)             // 創建Equip指針類型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10)         // 創建Equip指針類型的通道, 並指定隊列長度

通道本質就是線程安全的隊列,創建時候可以指定隊列長度,預設為0。

向通道寫入數據,使用語法非常形象,寫入channel <-,讀取<-channel

ch2 := make(chan interface{}, 10)
ch2<- 10			// 向隊列寫入
n := <-ch2 			// 從隊列讀取
fmt.Println(n)		// 10

箭頭語法雖然很形象,但是有些奇怪,也不利於擴展。使用函數方式感覺更好,也更主流,如func (p *chan) get() any func (p *chan) put(any) err,擴展性也更強,通過參數可增加超時、同步、非同步等技能。

箭頭符號並沒有規定位置,與C指針一樣,如下兩個語句等效

ch1 := make(chan int)
i := <-ch1			
i := <- ch1

箭頭語法的讀寫有相對性,可讀性一般,有時候無法分辨是讀或寫,看起來很奇怪,如下偽代碼

func main() {
	input := make(chan int, 2)
	output := make(chan int, 2)

	go func() {
		input <- 10
	}()
	output<- <-input
	fmt.Println(<-output)
}

管道是用於協程之間通訊,主流使用方式如下

ch2 := make(chan interface{}, 10)

go func() {
    data := <-ch2			// 用戶協程讀取
    fmt.Println(data)
}()
     
ch2 <- "hello"				// 主協程寫入
time.Sleep(time.Second)

管道也支持遍歷,與箭頭符號一樣,無數據時候迴圈將被阻塞,迴圈永遠不會結束,除非關閉管道

chanInt := make(chan int, 10)

for chanInt, ok := range chanInts {
    fmt.Println(chanInt)
}

管道也支持關閉,關閉後的管道不允許寫入,panic 異常

chanInts := make(chan int, 10)
close(chanInts)
chanInts <- 1		// panic: send on closed channel

讀取則不同,已有數據可繼續讀取,無數據時返回false,不阻塞

if value, ok := <-chanInts; ok {			// 從管道讀取數據不在阻塞
    fmt.Println("從管讀取=", value)
} else {
    fmt.Println("從管道讀取失敗", ok)
    return
}

單向管道

管道也支持單向模式,僅允許讀取、或者寫入

var queue <-chan string = make(chan string)

函數形參也可以定義定向管道

func customer(channel <-chan string) {		// 形參為只讀管道
    for {		
        message := <-channel				// 只允許讀取數據
        fmt.Println(message)
    }
}
channel := make(chan string)
go customer(channel)

管道阻塞

Go管道的讀寫都是同步模式,當管道容量還有空間,則寫入成功,否則將阻塞直到寫入成功。從管道讀取也一樣,有數據直接讀取,否則將阻塞直到讀取成功。

var done = make(chan bool)

func aGoroutine() {
    fmt.Println("hello")
    done <- true			// 寫管道
}

func main() {
    go aGoroutine()
    <-done					// 讀阻塞
}

主協程從管道讀取數據時將被阻塞,直到用戶協程寫入數據。管道非常適合用於生產者消費者模式,需要平滑兩者的性能差異,可通過管道容量實現緩衝,所以除非特定場景,都建議管道容量大於零。

有些場景可以使用管道控制線程併發數

// 待補充

阻塞特性也帶來了些問題,程式無法控制超時(箭頭函數語法的後遺症),go 也提供瞭解決方案, 使用select關鍵,與網路編程的select函數類似,監測多個通道是否可讀狀態,都可讀隨機選擇一個,都不可讀進入Default分支,否則阻塞

select {
    case n := <-input:
        fmt.Println(n)
    case m := <-output:
        fmt.Println(m)
    default:
        fmt.Println("default")
}

當然也可以使用select向管道寫入數據,只要不關閉管道總是可寫入,此時加入default分支永遠不會被執行到,如下隨機石頭剪刀布

ch := make(chan string)
go func() {
    for {
        select {
            case ch <- "石頭":
            case ch <- "剪刀":
            case ch <- "布":
        }
    }
}()

for value := range ch {
    log.Println(value)
    time.Sleep(time.Second)
}

模擬線程池

由於go的管道非常輕量且簡潔,大部分直接使用,封裝線程池模式並不常見。案例僅作為功能演示,非常簡單幾十行代碼即可實現線程池的基本功能,體現了go併發模型的簡潔、高效。

type Runnable interface {
	Start()
}

// 線程池對象
type ThreadPool struct {
	queueSize int
	workSize  int
	channel   chan Runnable
	wait      sync.WaitGroup
}

// 工作線程, 執行非同步任務
func (pool *ThreadPool) doWorker(name string) {
	log.Printf("%s 啟動工作協程", name)
	for true {
		if runnable, ok := <-pool.channel; ok {
			log.Printf("%s 獲取任務, %v\n", name, runnable)
			runnable.Start()
			log.Printf("%s 任務執行成功, %v\n", name, runnable)
		} else {
			log.Printf("%s 線程池關閉, 退出工作協程\n", name)
			pool.wait.Done()
			return
		}
	}
}

// 啟動工作線程
func (pool *ThreadPool) worker() {
	pool.wait.Add(pool.workSize)
	for i := 0; i < pool.workSize; i++ {
		go pool.doWorker(fmt.Sprintf("work-%d", i))
	}
}

// Submit 提交任務
func (pool *ThreadPool) Submit(task Runnable) bool {
	defer func() { recover() }()
	pool.channel <- task
	return true
}

// Close 關閉線程池
func (pool *ThreadPool) Close() {
	defer func() { recover() }()
	close(pool.channel)
}

// Wait 等待線程池任務完成
func (pool *ThreadPool) Wait() {
	pool.Close()
	pool.wait.Wait()
}

// NewThreadPool 工廠函數,創建線程池
func NewThreadPool(queueSize int, workSize int) *ThreadPool {
	pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
	pool.worker()
	return pool
}

使用線程池

type person struct {
	name string
}

func (p *person) Start() {
	fmt.Println(p.name)
}

func main() {
	threadPool := executor.NewThreadPool(10, 2)		// 創建線程池, 隊列長度10, 工作線程2

	for i := 0; i < 5; i++ {
		threadPool.Submit(&person{name: "xx"})		// 提交十個任務
	}
        
	threadPool.Wait()								// 阻塞等待所有任務完成
}

模擬管道

任何線程之間的通訊都依賴底層鎖機制,channel是對鎖機制封裝後的實現對象,與Java中線程池任務隊列機制幾乎一樣,但要簡潔很多。使用切片簡單模擬
介面聲明

type Queue interface {
	// Put 向隊列添加任務, 添加成功返回true, 添加失敗返回false, 隊列滿了則阻塞直到添加成功
	Put(task interface{}) bool

	// Get 從隊列獲取任務, 一直阻塞直到獲取任務, 隊列關閉且沒有任務則返回false
	Get() (interface{}, bool)

	// Size 查看隊列中的任務數
	Size() int

	// Close 關閉隊列, 關閉後將無法添加任務, 已有的任務可以繼續獲取
	Close()
}

基於切片實現

// SliceQueue 使用切片實現, 自動擴容屬性隊列永遠都不會滿, 擴容時候會觸發數據複製, 性能一般
type SliceQueue struct {
	sync.Mutex
	cond  *sync.Cond
	queue []interface{}
	close atomic.Bool
}

func (q *SliceQueue) Get() (data interface{}, ok bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if len(q.queue) == 0 {
			if q.close.Load() == true {
				return nil, false
			}
			q.cond.Wait()
		}
		if data := q.doGet(); data != nil {
			return data, true
		}
	}
	return
}

func (q *SliceQueue) doGet() interface{} {
	if len(q.queue) >= 1 {
		data := q.queue[0]
		q.queue = q.queue[1:]
		return data
	}
	return nil
}

func (q *SliceQueue) Put(task interface{}) bool {
	q.Lock()
	defer func() {
		q.cond.Signal()
		q.Unlock()
	}()

	if q.close.Load() == true {
		return false
	}
	q.queue = append(q.queue, task)
	return true
}

func (q *SliceQueue) Size() int {
	return len(q.queue)
}

func (q *SliceQueue) Close() {
	if q.close.Load() == true {
		return
	}

	q.Lock()
	defer q.Unlock()

	q.close.Store(true)
	q.cond.Broadcast()
}

func NewSliceQueue() Queue {
	sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
	sliceQueue.cond = sync.NewCond(sliceQueue)
	return sliceQueue
}

基於環行數組實現

type ArrayQueue struct {
	sync.Mutex
	readCond     *sync.Cond
	writeCond    *sync.Cond
	readIndex    int
	writeIndex   int
	queueMaxSize int
	close        atomic.Bool
	queue        []interface{}
}

func (q *ArrayQueue) Put(task interface{}) bool {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.close.Load() == true {
			return false
		}
		if q.IsFull() {
			q.writeCond.Wait()
			if q.IsFull() {
				continue
			}
		}
		q.queue[q.writeIndex] = task
		q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
		q.readCond.Signal()
		return true
	}
	return true
}

func (q *ArrayQueue) Get() (interface{}, bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.IsEmpty() {
			if q.close.Load() == true {
				return nil, false
			}
			q.readCond.Wait()
			if q.IsEmpty() {
				continue
			}
		}
		task := q.queue[q.readIndex]
		q.readIndex = (q.readIndex + 1) % q.queueMaxSize
		q.writeCond.Signal()
		return task, true
	}
	return nil, true
}

func (q *ArrayQueue) Size() int {
	return q.queueMaxSize
}

func (q *ArrayQueue) Close() {
	if q.close.Load() == true {
		return
	}
	q.Lock()
	q.Unlock()
	q.close.Store(true)
	q.readCond.Broadcast()
}

func (q *ArrayQueue) IsFull() bool {
	return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}

func (q *ArrayQueue) IsEmpty() bool {
	return q.readIndex == q.writeIndex
}

func NewArrayQueue(size int) Queue {
	queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
	queue.readCond = sync.NewCond(queue)
	queue.writeCond = sync.NewCond(queue)
	return queue
}

測試用例

func TestWith(t *testing.T) {
	q := NewSliceQueue()
	go func() {
		time.Sleep(time.Second * 2)
		q.Put(true)  // 向隊列寫入數據, 與 chan<- 功能相同
	}()

	q.Get()			// 阻塞直到讀取數據, 與 <-chan 功能相同
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • python 版本:3.6 win32 版本(因為一些特殊原因必須使用3.6) pymssql 版本:2.2.0 連接資料庫: import pymssql** def InitMssql(self): try: host = self.IniConfig.get('default','dbhost ...
  • 學習完基礎的圖像演算法,開始接觸OpenCV學習: 灰度圖中,一個像素點上的灰度級需要一個位元組(byte,2^8,8 bit)進行存儲,此時的灰度圖是二維的。而當我們需要轉換為彩色圖時,即三維,便會產生顏色通道(Channel),這個時候,一個像素點上的灰度級便會需要三個位元組來進行存儲。 可以藉助笛卡 ...
  • 本文只發佈於利用OpenCV實現尺度不變性與角度不變性的特征找圖演算法和知乎 一般來說,利用OpenCV實現找圖功能,用的比較多的是模板匹配(matchTemplate)。筆者比較喜歡裡面的NCC演算法。但是模板有個很明顯的短板,面對尺度改變,角度改變的目標就無能為力了。因此本文旨在做到模板匹配做不到的 ...
  • 目錄: Redis是什麼? Redis優缺點? Redis為什麼這麼快? 講講Redis的線程模型? Redis應用場景有哪些? Memcached和Redis的區別? 為什麼要用 Redis 而不用 map/guava 做緩存? Redis 數據類型有哪些? SortedSet和List異同點? ...
  • 14.1、概述 在實際工作中,一般使用配置類和註解代替web.xml和SpringMVC配置文件的功能; 在 Servlet3.0 環境中,容器會在類路徑中查找實現了 javax.servlet.ServletContainerInitializer 介面的類, 如果找到了的話,就會用它來配置 Se ...
  • int i=1; i=i++; int j=i++; int k=i + ++i * i++; System.out.println("i="+i); System.out.println("j="+j); System.out.println("k="+k); ...
  • 寫在前面 今天狀態很不好,我發現學這部分知識的時候,會出現溜號或者註意力無法集中的情況。 我能想到的是,大概率是這部分知識,應該是超出了我現在的水平了,也就是說我存在知識斷層了,整體感覺真的是一知半解。 那有同學會問了,那你能說明白嗎? 我理解的肯定能呀,來往下看! Flask的使用 1、消息閃現的 ...
  • rust中的枚舉有什麼用?枚舉可以嵌入類型的好處是什麼 你可以在同一個枚舉中既有單個值,也有元組或結構體。 枚舉的每個變體可以擁有不同數量和類型的關聯數據。 這增加了類型的靈活性和表達力,使你能夠更精確地建模你的數據。 我知道rust可以為枚舉創建方法,那在哪種場景下枚舉會比結構體會有優勢 表示多個 ...
一周排行
    -Advertisement-
    Play Games
  • MQTTnet 是一個高性能的MQTT類庫,支持.NET Core和.NET Framework。 MQTTnet 原理: MQTTnet 是一個用於.NET的高性能MQTT類庫,實現了MQTT協議的各個層級,包括連接、會話、發佈/訂閱、QoS(服務質量)等。其原理涉及以下關鍵概念: MqttCli ...
  • 在WPF中,源屬性(Source Property)指的是提供數據的屬性,通常是數據模型或者其他控制項的屬性,而目標屬性(Target Property)則是數據綁定的目標,通常是綁定到控制項的屬性,例如TextBlock的Text屬性。數據綁定將源屬性的值自動更新到目標屬性中。 主要包含以下幾個事件: ...
  • async/await 是 C# 中非同步編程的關鍵特性,它使得非同步代碼編寫更為簡單和直觀。下麵深入詳細描述了 async/await 的使用場景、優點以及一些高級使用方法,並提供了相應的實例源代碼。 使用場景: I/O 操作: 非同步編程特別適用於涉及 I/O 操作(如文件讀寫、網路請求等)的場景。在 ...
  • 使用過office的visio軟體畫圖的小伙伴都知道,畫圖軟體分為兩部分,左側圖形庫,存放各種圖標,右側是一個畫布,將左側圖形庫的圖標控制項拖拽到右側畫布,就會生成一個新的控制項,並且可以自由拖動。那如何在WPF程式中,實現類似的功能呢?今天就以一個簡單的小例子,簡述如何在WPF中實現控制項的拖拽和拖動,... ...
  • 1、Blazor Hybrid簡介 Blazor Hybrid 使開發人員能夠將桌面和移動本機客戶端框架與 .NET 和 Blazor 結合使用。在 Blazor Hybrid 應用中,Razor 組件在設備上是本機運行的。 這些組件通過本地互操作通道呈現到嵌入式 Web 視圖控制項。 組件不在瀏覽器 ...
  • 除了內置的數據集,scikit-learn還提供了隨機樣本的生成器。通過這些生成器函數,可以生成具有特定特性和分佈的隨機數據集,以幫助進行機器學習演算法的研究、測試和比較。 目前,scikit-learn庫(v1.3.0版)中有20個不同的生成樣本的函數。本篇重點介紹其中幾個具有代表性的函數。 1. ...
  • 從0到1,手把手帶你開發截圖工具ScreenCap------002實現通過文件對話框,選擇合適的文件夾,自定義預設的圖片保存位置,簡單易學 ...
  • 每次談到容器的時候,除了Docker之外,都會說起 Kubernetes,那麼什麼是 Kubernetes呢?今天就來一起學快速入門一下 Kubernetes 吧!希望本文對您有所幫助。 Kubernetes,一種用於管理和自動化雲中容器化工作負載的工具。 想象一下你有一個管弦樂隊,將每個音樂家視為 ...
  • 目錄 基本說明 安裝 Nginx 部署 VUE 前端 部署 Django 後端 Django admin 靜態文件(CSS,JS等)丟失的問題 總結 1. 基本說明 本文介紹了在 windows 伺服器下,通過 Nginx 部署 VUE + Django 前後端分離項目。本項目前端運行在 80 埠 ...
  • 從0到1,手把手帶你開發截圖工具ScreenCap------003實現最小化程式到托盤運行,- 為了方便截圖乾凈,實現最小化程式到托盤運行,簡潔,勿擾,實現最小化程式到托盤運行, 實現托盤菜單功能,實現回顯主窗體, 實現托盤開始截屏, 實現氣泡信息提示,實現托盤程式提示,實現托盤退出程式, 封裝完... ...