架構設計之NodeJS操作消息隊列RabbitMQ

来源:https://www.cnblogs.com/wukong-holmes/archive/2018/07/13/9306733.html
-Advertisement-
Play Games

一. 什麼是消息隊列? 消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字元串,也可以更複雜,可能包含嵌入對象。 消息隊列(Message Queue)是一種應用間的通信方式,消息發送後可以立即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而 ...


一. 什麼是消息隊列?

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字元串,也可以更複雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送後可以立即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。

二. 常用的消息隊列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至現在部分NoSQL也可做消息隊列,如Redis。

三. 消息隊列的使用場景?

  • 非同步處理

  • 應用解耦

  • 流量削峰

四. 使用案例

上規模的公司都會有自己的日誌分析系統,日誌系統是怎麼實現的呢?

 

圖解:用戶在訪問應用的時候,我們要記錄下用戶的操作記錄和系統的異常日誌,常規的做法是將系統產生的日誌保存到伺服器磁碟,在伺服器中開啟定時任務,定時將磁碟的日誌信息傳入mq中(生產者),也定時將mq中的消息取出並存到相應的資料庫,如ElasticSearch或Hive中。

五. 如何安裝RabbitMQ?

上面的案例介紹了MQ的一個使用場景,我這裡是用RabbitMQ舉例,現實項目中可能用到的是Kafka。

  1. 首先安裝brew(mac為例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 
  2. 安裝RabbitMQ

    brew install rabbitmq
  3. 運行RabbitMQ

    進入到 /usr/local/Cellar/rabbitmq/3.7.7,執行

    sbin/rabbitmq-server
  4. 啟動插件

    進入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management
  5. 登陸管理界面

    打開瀏覽器輸入:http://localhost:15672,RabbitMQ預設15672埠六. Nodejs操作RabbitMQ

     

 

網上可以找到好幾個相應的Node SDK,這裡推薦amqplib

1. 生產者

/**
 * 對RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

 

2. 消費者

/**
 * 對RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3. 通過生產者向MQ發送一個消息,並創建隊列

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
    console.log(error)
})

執行之後,我們打開管理平臺,發現RabbbitMQ已經接受到了一條消息:

並且RabbbitMQ新增了一個隊列testQueue

4. 獲取指定隊列的消息

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg)
})
// 輸出結果:my first message複製代碼

此時打開RabbitMQ管理平臺,消息數量已經變為0

綜上:我們簡單講述了消息隊列及RabbitMQ相關的一些知識,以及我們如何通過nodejs來生產與消費消息,上面講的比較簡單,之後會發表更多文章講述消息隊列集群搭建及容災的實現。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 效果如下: ...
  • 今天在學習angularjs的分頁插件時遇到了一個前端的問題,谷歌瀏覽器開發者模式調試的時候發現每次點擊分頁刷新按鈕會觸發兩次後臺請求,ajax向後臺發送了兩次請求,這對於強迫症患者來說是一個比較噁心和感到不舒服的事情。 於是在網上也找到了靠譜的解決方案:http://jqvue.com/tm.pa ...
  • 在IT界已經混了5年了,5年中瀏覽了不少的網站,在上面查詢自己想要的東西,解決工作中遇到的問題,心裡總想有天自己能夠有自己的博客,能給分享一些自己在生活中、工作中遇到的問題,讓其他有類似經歷的朋友能夠少走彎路,今天終於鼓起勇氣在博客園寫下第一篇隨筆。其他不做過多的介紹,下麵將介紹今天在工作中遇到的一 ...
  • 全局安裝是把包安裝在Node安裝目錄下的node_modules文件夾中,一般在 \Users\用戶名\AppData\Roaming\ 目錄下,可以使用npm root -g查看全局安裝目錄 本地(局部)安裝是把包安裝在指定項目(要在指定的根目錄下輸入命令)的node_modules文件夾下(若沒 ...
  • 主要用於調試,顯示信息,重點看例子在瀏覽器 JavaScript 中,通常 window 是全局對象, Node.js 中的全局對象是 global####__filename__filename 表示當前正在執行的腳本的文件名。它將輸出文件所在位置的絕對路徑,且和命令行參數所指定的文件名不一定相同 ...
  • 第一階段: C/S(client server) B/S(browser server) 網頁製作 + 技術棧: PhotoShop、HTML、CSS 第二階段: 從靜態到動態,從後端到前端 前端開發工程師 前後端分離 + 後臺: 完成數據的分析和業務邏輯編寫(包含API介面編寫) + 前端: 網頁 ...
  • 先上源碼,版本是ES6 13行常規(700bytes) shortest snake game.html 壓縮後的500bytes(當然兩處document還是可以用eval壓縮的) index.500bytes.html 之前很火的20行代碼地址(有BUG)(900bytes) hj7jay/ar ...
  • 冒泡的概念就是 當子元素觸發事件的時候 相應的祖宗十八代素也會觸發相同的事件(前提父元素也添加了一樣的事件)eg:兒子 有一個onclick 祖宗十八代 也有onclick 當點擊兒子的時候 祖宗十八代的點擊事件也會被觸發 有時候這種情況會導致很多問題 所以要阻止冒泡 只有被點擊的元素才觸發事件 不... ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...