基於非同步隊列的生產者消費者C#併發設計

来源:https://www.cnblogs.com/zhiyong-ITNote/archive/2018/01/19/8315621.html
-Advertisement-
Play Games

繼上文<<基於阻塞隊列的生產者消費者C#併發設計>>的併發隊列版本的併發設計,原文code是基於<<.Net中的並行編程-4.實現高性能非同步隊列>>修改過來的,前面的幾篇文章也詳細介紹了併發實現的其它方案及實現。直接給code: 調用code: 併發系列應該就這樣完了,回頭整理成目錄,自己查起來也方 ...


繼上文<<基於阻塞隊列的生產者消費者C#併發設計>>的併發隊列版本的併發設計,原文code是基於<<.Net中的並行編程-4.實現高性能非同步隊列>>修改過來的,前面的幾篇文章也詳細介紹了併發實現的其它方案及實現。直接給code:

public class MyAsyncQueue<T>
    {
        //隊列是否正在處理數據
        private int isProcessing;
        //有線程正在處理數據
        private const int Processing = 1;
        //沒有線程處理數據
        private const int UnProcessing = 0;
        //隊列是否可用
        private volatile bool enabled = true;
        // 消費者線程
        private Task currentTask;
        // 消費者線程處理事件
        public event Action<T> ProcessItemFunction;
        //
        public event EventHandler<EventArgs<Exception>> ProcessException;
        // 併發隊列
        private ConcurrentQueue<T> queue;
        // 消費者的數量
        private int _internalTaskCount;
        // 存儲消費者隊列
        List<Task> tasks = new List<Task>();

        public MyAsyncQueue()
        {
            _internalTaskCount = 3;
            queue = new ConcurrentQueue<T>();
            Start();
        }

        public int Count
        {
            get
            {
                return queue.Count;
            }
        }
        // 開啟監聽線程
        private void Start()
        {
            Thread process_Thread = new Thread(PorcessItem);
            process_Thread.IsBackground = true;
            process_Thread.Start();
        }

        // 生產者生產
        public void Enqueue(T items)
        {
            if (items == null)
            {
                throw new ArgumentException("items");
            }

            queue.Enqueue(items);
            DataAdded();
        }

        //數據添加完成後通知消費者線程處理
        private void DataAdded()
        {
            if (enabled)
            {
                if (!IsProcessingItem())
                {
                    // 開啟消費者消費隊列
                    ProcessRangeItem();
                }
            }
        }

        //判斷是否隊列有線程正在處理 
        private bool IsProcessingItem()
        {
            return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
        }

        private void ProcessRangeItem()
        {
            for(int i=0; i< _internalTaskCount; i++)
            {
                currentTask = Task.Factory.StartNew(() => ProcessItemLoop());
                tasks.Add(currentTask);
            }
        }
        // 消費者處理事件
        private void ProcessItemLoop()
        {
            Console.WriteLine("正在執行的Task的Id: {0}", Task.CurrentId);
            // 隊列為空,並且隊列不可用
            if (!enabled && queue.IsEmpty)
            {
                Interlocked.Exchange(ref isProcessing, 0);
                return;
            }
            //處理的線程數 是否小於當前最大任務數
            //if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount)
            //{
            T publishFrame;

            while(enabled)
            {
                if (queue.TryDequeue(out publishFrame))
                {
                    try
                    {
                        // 消費者處理事件
                        ProcessItemFunction(publishFrame);
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex);
                    }
                }
                else
                {
                    Console.WriteLine("線程Id{0}取隊列失敗,跳出迴圈", Task.CurrentId);
                    break;
                }
            }
        }

        /// <summary>
        ///定時處理線程調用函數  
        ///主要是監視入隊的時候線程 沒有來的及處理的情況
        /// </summary>
        private void PorcessItem(object state)
        {
            int sleepCount = 0;
            int sleepTime = 1000;
            while (enabled)
            {
                //如果隊列為空則根據迴圈的次數確定睡眠的時間
                if (queue.IsEmpty)
                {
                    // Task消費者消費完了隊列中的數據....註銷掉消費者線程
                    if(tasks.Count==_internalTaskCount)
                    {
                        Flush();
                    }
                    if (sleepCount == 0)
                    {
                        sleepTime = 1000;
                    }
                    else if (sleepCount <= 3)
                    {
                        sleepTime = 1000 * 3;
                    }
                    else
                    {
                        sleepTime = 1000 * 50;
                    }
                    sleepCount++;
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    //判斷是否隊列有線程正在處理 
                    if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
                    {
                        if (!queue.IsEmpty)
                        {
                            currentTask = Task.Factory.StartNew(ProcessItemLoop);
                            tasks.Add(currentTask);
                        }
                        else
                        {
                            //隊列為空,已經取完了
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        sleepCount = 0;
                        sleepTime = 1000;
                    }
                }
            }
        }

        //更新並關閉消費者
        public void Flush()
        {
            Stop();
            foreach(var t in tasks)
            {
                if (t != null)
                {
                    t.Wait();
                    Console.WriteLine("Task已經完成");
                }
            }

            // 消費者未消費完
            while (!queue.IsEmpty)
            {
                try
                {
                    T publishFrame;
                    if (queue.TryDequeue(out publishFrame))
                    {
                        ProcessItemFunction(publishFrame);
                    }
                }
                catch (Exception ex)
                {
                    OnProcessException(ex);
                }
            }
            currentTask = null;
            tasks.Clear();
        }

        public void Stop()
        {
            this.enabled = false;
        }

        private void OnProcessException(System.Exception ex)
        {
            var tempException = ProcessException;
            Interlocked.CompareExchange(ref ProcessException, null, null);

            if (tempException != null)
            {
                ProcessException(ex, new EventArgs<Exception>(ex));
            }
        }
}

調用code:

class ComInfo
    {
        public int ComId { get; set; }

        public DateTime Date { get; set; }
    }
    class Program
    {
        static MyAsyncQueue<ComInfo> queue = new MyAsyncQueue<ComInfo>();
        static void Main(string[] args)
        {
            Console.WriteLine("開始======");
            queue.ProcessItemFunction += A;
            queue.ProcessException += C; //new EventHandler<EventArgs<Exception>>(C);

            ComInfo info = new ComInfo();

            for (int i = 1; i < 50; i++)
            {
                Task.Factory.StartNew((param) =>
                {
                    info = new ComInfo();
                    info.ComId = int.Parse(param.ToString());
                    info.Date = DateTime.Now.Date;
                    queue.Enqueue(info);
                }, i);
            }

            Console.WriteLine("結束======");
            
            Console.ReadKey();
        }

        static void A(ComInfo info)
        {
            Console.WriteLine(info.ComId + "====" + queue.Count);
        }

        static void C(object ex, EventArgs<Exception> args)
        {
            Console.WriteLine("出錯了");
        }
    }

併發系列應該就這樣完了,回頭整理成目錄,自己查起來也方便


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...