RabbitMQ個人實踐

来源:https://www.cnblogs.com/konghuanxi/archive/2022/11/24/16921862.html
-Advertisement-
Play Games

前言 MQ(Message Queue)就是消息隊列,其有點有很多:解耦、非同步、削峰等等,本文來聊一下RabbitMQ的一些概念以及使用。 RabbitMq 案例 Springboot整合RabbitMQ簡單案例 基本概念 Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。 Que ...


前言

MQ(Message Queue)就是消息隊列,其有點有很多:解耦、非同步、削峰等等,本文來聊一下RabbitMQ的一些概念以及使用。

RabbitMq

案例

Springboot整合RabbitMQ簡單案例

基本概念

rabbitmq.png

  • Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
  • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  • Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  • Producer:消息生產者,就是投遞消息的程式。
  • Consumer:消息消費者,就是接受消息的程式。

發佈消息到RabbitMQ需要經過兩步:

  1. producer → exchange
  2. exchange 根據 exchange 的類型和 routing key 確定將消息投遞到哪個隊列

工作流程

瞭解了RabbitMQ的一些概念,我們來捋捋使用RabbitMQ的流程:

  1. 創建Exchange
  2. 創建Queue
  3. 將Queue綁定進Exchange中(此處會設置routing key)
  4. 生產者發佈消息
  5. 消費者訂閱消息

交換機(Exchange)

交換機可以綁定隊列,綁定時可以給隊列指定路由(Routing key)和參數(Arguments)

所有的消息發送都是經過交換機轉發到隊列的,而不是直接到隊列中

交換機類型:

  • direct

    根據確定的路由(routing key)轉發消息到隊列中(一條消息可以發到多個隊列,只要路由相同)

  • fanout

    路由無效,只要和該交換機綁定的隊列,都能接收到消息

  • topic

    允許路由使用*和#來進行模糊匹配

    *表示一個單詞

    表示任意數量(零個或多個)單詞

    例如:如果隊列的路由為com.# 那麼往交換機發消息是,路由填com.ccc 隊列就可以收到消息

  • headers

    忽略路由,由參數(Arguments)來確定轉發的隊列

消息過期時間TTL

有兩種方式設置TTL,創建隊列時設置整個隊列的TTL或者在發送消息時單獨設置每條消息的TTL,消息存活時間取兩者的最小值。

  1. 創建隊列時設置

    是消息的存活時間,不是隊列的存活時間,別搞混了。

    @Bean
    public Queue queue(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000); // 設置隊列中的消息5秒過期
        return new Queue("queueName",true, false, false, args);
    }
    
  2. 發送消息時設置

    public void makeOrder(String userid,String productid,int num){
        String exchangeName = "ttl_exchange";
        String routingKey = "ttlmessage";
        //給消息設置過期時間
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                // 設置消息5秒過期
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        }
        rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);
    }
    

死信隊列

死信隊列也是一個正常隊列,只是當綁定了死信隊列的隊列滿足相應條件,就會將滿足條件的消息轉移到死信隊列中。

進入死信隊列的條件:

  1. 消息被拒絕
  2. 消息過期(超時)
  3. 隊列達到最大長度

死信隊列的配置:

  1. 按照正常步驟定義一個隊列(交換機、隊列、綁定)

  2. 給需要綁定死信隊列的隊列添加x-dead-letter-exchange(死信隊列的交換機)和x-dead-letter-routing-key(死信隊列的路由)參數

    @Bean
    public Queue queue(){
        Map<String, Object> args = new HashMap<>();
    		args.put("x-dead-letter-exchange", "死信隊列交換機名稱"); 
    		args.put("x-dead-letter-routing-key", "死信隊列路由"); 
        return new Queue("queueName",true, false, false, args);
    }
    

如何保證MQ消息正確送達與消費

可靠性生產和推送

步驟:

  1. 發送消息前資料庫保存MQ消息發送日誌
  2. MQ消息發送後使用回調更新日誌狀態

實現:

上面我們講了,發佈消息到RabbitMQ需要經過兩步:

producer → exchange
exchange 根據 exchange 的類型和 routing key 確定將消息投遞到哪個隊列

所以,發佈消息的確認也分兩個部分,以下是確認步驟:

  1. 修改MQ應答機制(yml)

    spring:
      rabbitmq:
        username: rmq
        password: 123456
        virtual-host: /
        host: localhost
        port: 5672
        # 發送消息確認,producer -> exchange
        publisher-confirm-type: correlated
        # 發送消息確認,exchange -> queue
        publisher-returns: true
    
  2. 新增mq的回調方法

    /**
     * PostConstruct註解好多人以為是Spring提供的。其實是Java自己的註解。
     * Java中該註解的說明:@PostConstruct該註解被用來修飾一個非靜態的void()方法。
     * 被@PostConstruct修飾的方法會在伺服器載入Servlet的時候運行,並且只會被伺服器執行一次。
     * PostConstruct在構造函數之後執行,init()方法之前執行。
     * Constructor(構造方法) -> @Autowired(依賴註入) -> @PostConstruct(註釋的方法)
     */
    @PostConstruct
    private void regCallBack() {
        // producer -> exchange 成功或失敗都會觸發此回調
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // 這個id是在消息發送的時候傳入的
                String id = correlationData.getId();
                // 如果ack為true代表消息被mq成功接收
                if (!ack) {
                    // 應答失敗,修改日誌狀態
                    System.out.println("exchange 應答失敗,做失敗處理!");
                } else {
                    // 應答成功,修改日誌狀態
                    System.out.println("exchange 成功處理");
                }
            }
        });
    
        // 這個回調只有exchange -> queue 失敗時才會觸發
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("exchange -> queue 發送失敗");
            }
        });
    }
    
  3. 修改MQ發送消息的方法,增加日誌id的傳遞

    String correlationId = "這是日誌id";
    rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 消費者需要correlationId才做這個處理
            message.getMessageProperties().setCorrelationId(correlationId);
            return message;
        }
    }, new CorrelationData(correlationId));
    // 如果消費者不需要獲取correlationId,則用下麵這種即可
    rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));
    

可靠性消費

步驟:

  1. 開啟手動應答
  2. 監聽器增加手動應答邏輯

實現:

  1. 開啟手動應答

    spring:
      rabbitmq:
        username: rmq
        password: 123456
        virtual-host: /
        host: localhost
        port: 5672
        listener:
          simple:
            acknowledge-mode: manual # 將自動應答ack模式改成手動應答
    

    acknowledge-mode有三種類型:

    • nome:不進行ack,rabbitmq預設消費者正確處理所有請求
    • munual:手動確認
    • auto:自動確認消息(預設類型)。如果消費者拋出異常,則消息重回隊列。
  2. 監聽器增加手動應答邏輯

    @RabbitListener(queues = {"隊列名字"})
    public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{
        // 需要producer做相應處理,consumer才能拿到correlationId
        String correlationId = messages.getMessageProperties().getCorrelationId();
        System.out.println("消息為:" + orderMsg);
        long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString());
        try {
            // 消費成功,進行確認
            channel.basicAck(tag, false);
        } catch (IOException e) {
            // 消費失敗,重發
            // requeue代表是否重發,為false則直接將消息丟棄,有死信就進入死信隊列
            channel.basicNack(tag, false, true);
        }
    }
    

總結

本文介紹了RabbitMQ的一些概念和簡單使用,有不少東西其實是沒有講清楚的,比如publisher-confirm-type和acknowledge-mode的幾種類型的區別等等。主要是在官方文檔找不到相關的細緻描述,查文檔的能力還是有待提高。。。


參考資料

RabbitMq 技術文檔 - 騰訊雲開發者社區-騰訊雲 (tencent.com)

Spring AMQP


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在vue2中,提供了provide和inject配置,可以讓開發者在高層組件中註入數據,然後在後代組件中使用 除了相容vue2的配置式註入,vue3在composition api 中添加了provide和inject方法,可以在setup函數中註入 和使用數據 基本使用 provide('key' ...
  • 自定義 封裝單列模式! global state 由於vue3的響應式系統本身可以脫離組件而存在,因此可以充分利用這一點,輕鬆製造多個全局響應式數據, 並且通過和vuex一樣 通過某個模塊指定方法修改數據,不能直接修改數據,並且讓數據成為全局響應式 並且在代碼體積上絕對的輕量級!比市面上的任何第三方 ...
  • 一.小結 1.程式模塊化和可重用性是軟體工程的中心目標之一。java提供了很多有助於完成這一目標的有效結構。方法就是一個這樣的結構。 2.方法指定方法的修飾符,返回值類型,方法名和參數。比如靜態修飾符static。 3.方法可以返回一個值。返回值類型returnValueType是方法要返回的值數據 ...
  • 伺服器端渲染技術01 為什麼需要jsp? 在之前的開發過程中,我們可以發現servlet做界面非常不方便: 引出jsp技術=> jsp=html+java代碼+標簽+javascript+css 1.JSP基本介紹 JSP全稱是Java Server Pages,Java的伺服器頁面,就是伺服器端渲 ...
  • 目錄 一.OpenGL 褐色 1.IOS Object-C 版本 1.Windows OpenGL ES 版本 2.Windows OpenGL 版本 二.OpenGL 褐色 GLSL Shader 三.猜你喜歡 零基礎 OpenGL ES 學習路線推薦 : OpenGL ES 學習目錄 >> Op ...
  • 多線程理解 繼承Thread類 子類繼承Thread類具備多線程能力 啟動線程:子類對象.start() 不建議使用:避免oop單繼承局限性 實現Runnable介面 實現介面Runnable具有多線程能力 啟動線程:傳入目標對象+Thread對象.start() 推薦使用:避免單繼承局限性,可能一 ...
  • 之前我們已經知道什麼是 數組(一維數組)java 基礎——數組,數組的存取 這裡補充一點: 數組本身是引用數據類型 ,數組的元素 可以是 基本數據類型 跟 引用數據類型 那麼?什麼是二維數組 ? 官方定義:以一維數組作為一維數組元素的數組 要是有點繞,不好理解,沒關係,簡單來說,就是一維數組裡面存一 ...
  • 一、介紹 Java由Sun Microsystems發明併在1995年發佈,是世界上使用最廣泛的編程語言之一。Java是一個通用編程語言。由於它擁有功能強大的庫、運行時、簡單的語法、平臺無關(Write Once, Run Anywhere - WORA)以及令人敬畏的社區從而吸引了很多的開發者。 ...
一周排行
    -Advertisement-
    Play Games
  • C#TMS系統代碼-基礎頁面BaseCity學習 本人純新手,剛進公司跟領導報道,我說我是java全棧,他問我會不會C#,我說大學學過,他說這個TMS系統就給你來管了。外包已經把代碼給我了,這幾天先把增刪改查的代碼背一下,說不定後面就要趕鴨子上架了 Service頁面 //using => impo ...
  • 委托與事件 委托 委托的定義 委托是C#中的一種類型,用於存儲對方法的引用。它允許將方法作為參數傳遞給其他方法,實現回調、事件處理和動態調用等功能。通俗來講,就是委托包含方法的記憶體地址,方法匹配與委托相同的簽名,因此通過使用正確的參數類型來調用方法。 委托的特性 引用方法:委托允許存儲對方法的引用, ...
  • 前言 這幾天閑來沒事看看ABP vNext的文檔和源碼,關於關於依賴註入(屬性註入)這塊兒產生了興趣。 我們都知道。Volo.ABP 依賴註入容器使用了第三方組件Autofac實現的。有三種註入方式,構造函數註入和方法註入和屬性註入。 ABP的屬性註入原則參考如下: 這時候我就開始疑惑了,因為我知道 ...
  • C#TMS系統代碼-業務頁面ShippingNotice學習 學一個業務頁面,ok,領導開完會就被裁掉了,很突然啊,他收拾東西的時候我還以為他要旅游提前請假了,還在尋思為什麼回家連自己買的幾箱飲料都要叫跑腿帶走,怕被偷嗎?還好我在他開會之前拿了兩瓶芬達 感覺感覺前面的BaseCity差不太多,這邊的 ...
  • 概述:在C#中,通過`Expression`類、`AndAlso`和`OrElse`方法可組合兩個`Expression<Func<T, bool>>`,實現多條件動態查詢。通過創建表達式樹,可輕鬆構建複雜的查詢條件。 在C#中,可以使用AndAlso和OrElse方法組合兩個Expression< ...
  • 閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~ 首先定義一個事件約定的空介面 public interface IEvent{} 然後定義事件訂閱者介面 public interface I ...
  • 1. 案例 成某三甲醫預約系統, 該項目在2024年初進行上線測試,在正常運行了兩天後,業務系統報錯:The connection pool has been exhausted, either raise MaxPoolSize (currently 800) or Timeout (curren ...
  • 背景 我們有些工具在 Web 版中已經有了很好的實踐,而在 WPF 中重新開發也是一種費時費力的操作,那麼直接集成則是最省事省力的方法了。 思路解釋 為什麼要使用 WPF?莫問為什麼,老 C# 開發的堅持,另外因為 Windows 上已經裝了 Webview2/edge 整體打包比 electron ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...