物聯網微消息隊列MQTT介紹-EMQX集群搭建以及與SpringBoot整合

来源:https://www.cnblogs.com/Tom-shushu/archive/2022/06/19/16390187.html
-Advertisement-
Play Games

項目全部代碼地址:https://github.com/Tom-shushu/work-study.git (mqtt-emqt 項目) 先看我們最後實現的一個效果 1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器) 2.控制台列印 MQTT基本簡介 MQTT ...


項目全部代碼地址:https://github.com/Tom-shushu/work-study.gitmqtt-emqt 項目)

先看我們最後實現的一個效果

1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器)

 2.控制台列印

MQTT基本簡介

MQTT 是用於物聯網 (IoT) 的 OASIS 標準消息傳遞協議。它被設計為一種極其輕量級的發佈/訂閱消息傳輸,非常適合連接具有小代碼足跡和最小網路帶寬的遠程設備。

MQTT協議簡介

MQTT 是客戶端伺服器發佈/訂閱消息傳輸協議。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通信,其中需要小代碼足跡和/或網路帶寬非常寶貴。

該協議通過 TCP/IP 或其他提供有序、無損、雙向連接的網路協議運行。其特點包括:

·         使用發佈/訂閱消息模式,提供一對多的消息分發和應用程式的解耦。

·         與有效負載內容無關的消息傳輸。

·         消息傳遞的三種服務質量:

o    “最多一次”,根據操作環境的最大努力傳遞消息。可能會發生消息丟失。例如,此級別可用於環境感測器數據,其中單個讀數是否丟失並不重要,因為下一個讀數將很快發佈。

o    “至少一次”,保證消息到達但可能出現重覆。

o    “Exactly once”,保證消息只到達一次。例如,此級別可用於重覆或丟失消息可能導致應用不正確費用的計費系統。

·         最小化傳輸開銷和協議交換以減少網路流量。

·         發生異常斷開時通知相關方的機制。

EMQX簡介

通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連接任何設備。使用 EMQX Enterprise 集群輕鬆擴展到數千萬併發 MQTT 連接。

並且EMQX還是開源的,又支持集群,所以還是一個比較不錯的選擇

EMQX集群搭建

前期準備:

1.兩台伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿裡雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和

 mqtt_service_txyun 吧。 2.一個功能變數名稱: mqtt.zhouhong.icu

安裝開始

1.分別在兩台伺服器上執行以下操作進行安裝(如果是單機:只需要進行下麵1、2操作就安裝完成了)
## 1.下載
wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 2.安裝
sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 3.修改配置文件
vim /etc/emqx/emqx.conf
## 4.修改以下內容
## 註意node.name是當前這台伺服器名稱
node.name = [email protected]
cluster.static.seeds = [email protected],[email protected]
cluster.discovery = static
cluster.name = my-mqtt-cluster
2.分別啟動兩台伺服器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 查看(隨便一臺都可以,預設賬號admin 密碼public),註意打開18083,1883 安全組

4.nginx負載均衡

nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可

stream {
  upstream mqtt.zhouhong.icu {
      zone tcp_servers 64k;
      hash $remote_addr;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;

  }

  server {
      listen 8883 ssl;
      status_zone tcp_server;
      proxy_pass mqtt.zhouhong.icu;
      proxy_buffer_size 4k;
      ssl_handshake_timeout 15s;
      ssl_certificate     /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem;
      ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key;
  }
}

與SpringBoot集成並實現伺服器端監控對應topic下的消息

1.項目搭建

  • 引入MQTT相關jar包
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  • yml配置文件 (如果大家沒搭建好的話,可以直接使用我搭建的這個)
server:
  port: 8080

mqtt:
 ## 單機版--只需要把功能變數名稱改為ip既可  hostUrl: tcp:
//mqtt.zhouhong.icu:1883 username: admin password: public ## 服務端 clientId (發送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
  • 屬性配置
/**
 * description:
 * date: 2022/6/16 15:51
 * @author: zhouhong
 */
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * 連接地址
     */
    private String hostUrl;

    /**
     * 客戶端Id,同一臺伺服器下,不允許出現重覆的客戶端id
     */
    private String clientId;

    /**
     * 預設連接主題
     */
    private String topic;

    /**
     * 超時時間
     */
    private int timeout;

    /**
     * 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端
     * 發送個消息判斷客戶端是否線上,但這個方法並沒有重連的機制
     */
    private int keepAlive;

    /**
     * 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連
     * 接記錄,這裡設置為true表示每次連接到伺服器都以新的身份連接
     */
    private Boolean cleanSession;

    /**
     * 是否斷線重連
     */
    private Boolean reconnect;

    /**
     * 連接方式
     */
    private Integer qos;
}
  • 發送消息回調
/**
 * description: 發生消息成功後 的 回調
 * date: 2022/6/16 15:55
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttSendCallBack implements MqttCallbackExtended {

    /**
     * 客戶端斷開後觸發
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("發送消息回調: 連接斷開,可以做重連");
    }

    /**
     * 客戶端收到消息觸發
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("發送消息回調:  接收消息主題 : " + topic);
        log.info("發送消息回調:  接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發佈消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("發送消息回調:  向主題:" + topic + "發送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("發送消息回調:  消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
    }
}
  • 接收消息回調
/**
 * description: 接收消息後的回調
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptCallback implements MqttCallbackExtended {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 客戶端斷開後觸發
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("接收消息回調:  連接斷開,可以做重連");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            log.info("接收消息回調:  emqx重新連接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客戶端收到消息觸發
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收消息回調:  接收消息主題 : " + topic);
        log.info("接收消息回調:  接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發佈消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("接收消息回調:  向主題:" + topic + "發送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("接收消息回調:  消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
        // 以/#結尾表示訂閱所有以test開頭的主題
        // 訂閱所有機構主題
        mqttAcceptClient.subscribe("topic111", 0);
    }
}
  • 發消息
/**
 * description: 發送消息
 * date: 2022/6/16 16:01
 *
 * @author: zhouhong
 */
@Component
public class MqttSendClient {

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-","");
            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            try {
                // 設置回調
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 發佈消息
     * 主題格式: server:report:$orgCode(參數實際使用機構代碼)
     *
     * @param retained    是否保留
     * @param pushMessage 消息體
     */
    public void publish(boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 關閉連接
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.close();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 接收消息
/**
 * description: 伺服器段端連接訂閱消息、監控topic
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptClient {

    @Autowired
    @Lazy
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客戶端連接
     */
    public void connect() {
        MqttClient client;
        try {
            // clientId 使用伺服器 yml裡面配置的 clientId
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            try {
                // 設置回調
                client.setCallback(mqttAcceptCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新連接
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題
     *
     * @param topic 主題
     * @param qos   連接方式
     */
    public void subscribe(String topic, int qos) {
        log.info("==============開始訂閱主題==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消訂閱某個主題
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        log.info("==============開始取消訂閱主題==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 服務端啟動時連接訂閱主題並監控
/**
 * description: 啟動後連接 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic發送的消息
 * date: 2022/6/16 15:57
 * @author: zhouhong
 */
@Configuration
public class MqttConfig {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}
  • 發消息控制類
/**
 * description: 發消息控制類
 * date: 2022/6/16 15:58
 *
 * @author: zhouhong
 */
@RestController
public class SendController {

    @Resource
    private MqttSendClient mqttSendClient;

    @PostMapping("/mqtt/sendmessage")
    public void sendMessage(@RequestBody SendParam sendParam) {
        mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent());
    }
}

2.測試

  • postman調用發消息介面

  •  控制台日誌

  •  使用另外一個移動端MQTT調試工具測試
  1. 手機端向主題 topic111 發送消息,並接收。

 

 

  2. 控制台列印

 

本文來自博客園,作者:Tom-shushu,轉載請註明原文鏈接:https://www.cnblogs.com/Tom-shushu/p/16390187.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • C++ 標準庫提供了原子操作。(我已經懶得寫序言了) 先來說原子操作的概念: 原子操作是多線程當中對資源進行保護的一種手段,主要作用是和互斥量(Mutex)一樣,避免對資源的併發訪問、修改。 互斥量的粒度衡量是作用域(哪怕作用域內只有一個變數),而原子的粒度衡量則是以一個變數或對象為單位。因此,原子 ...
  • 介紹 這是很久之前的一個項目了,最近剛好有些時間,就來總結一下吧! 推薦初步熟悉項目後閱讀本文: https://gitee.com/smalldyy/easy-msg-cpp 從何而來 這要從我從事Qt開發的那些日子說起了,項目說大不大,說小也不小,人倒是一茬又一茬,需求也換了又換,後來的事情大家 ...
  • 引言:沒想到2022年還有很多工業軟體公司依然使用MFC,微軟也一直在更新MFC的庫,這次使用MFC封裝的CFileDialog類,寫一個獲得選定文件路徑,名稱,擴展名的程式。 個人技術博客(文章整理+源碼): https://zobolblog.github.io/LearnWinAPI/ 最終效 ...
  • 一個挺著啤酒肚,身穿格子衫,髮際線嚴重後移的中年男子,手拿著保溫杯,胳膊夾著MacBook向你走來,看樣子是架構師級別。 面試開始, 直入正題。 面試官: 你有沒有參與過秒殺系統的設計? 我: 沒有,我平時都是開發後臺管理系統、OA辦公系統、內部管理系統,從來沒有開發過秒殺系統。 面試官: 嗯... ...
  • 大佬理解->Java集合之ArrayList 1、ArrayList的特點 存放的元素有序 元素不唯一(可以重覆) 隨機訪問快 插入刪除元素慢 非線程安全 2、底層實現 底層初始化,使用一個Object類型的空對象數組,初始長度為0; 源碼 //Object類型對象數組引用 transient Ob ...
  • 大佬的理解->《Java IO(五) -- 字元流進階及BufferedWriter,BufferedReader》 1、BufferedReader BufferedReader高效字元流讀取文件基本用法,自帶緩衝區,讀取文件效率高,支持逐行讀取; 1.1 初始化 BufferedReader(R ...
  • 游戲的世界精彩紛呈,有動作類、策略類、角色扮演類等諸多類型,還有很多難以分類的小游戲,讓人玩起來往往愛不釋手 ...
  • 大佬的理解->《Java IO(四) -- 字元流》 FileReader字元流讀取文件,更適合用於讀取文件,可以讀取中文 1、FileReader 1.1 初始化 FileReader(File file) FileReader(String fileName) 1.2 讀取文件內容 read() ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...