Spring Cloud Alibaba(五)RocketMQ 非同步通信實現

来源:https://www.cnblogs.com/tqlin/archive/2019/12/04/11983702.html
-Advertisement-
Play Games

本文探討如何使用 RocketMQ Binder 完成 Spring Cloud 應用消息的訂閱和發佈。 介紹 "RocketMQ" 是一款開源的分散式消息系統,基於高可用分散式集群技術,提供低延時的、高可靠的消息發佈與訂閱服務,廣泛應用於多個領域,包括非同步通信解耦、企業解決方案、金融支付、電信、電 ...


本文探討如何使用 RocketMQ Binder 完成 Spring Cloud 應用消息的訂閱和發佈。

介紹

RocketMQ 是一款開源的分散式消息系統,基於高可用分散式集群技術,提供低延時的、高可靠的消息發佈與訂閱服務,廣泛應用於多個領域,包括非同步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手游、視頻、物聯網、車聯網等。

RocketMQ 是阿裡巴巴在2012年開源的分散式消息中間件,目前已經捐贈給 Apache 軟體基金會,並於2017年9月25日成為 Apache 的頂級項目。作為經歷過多次阿裡巴巴雙十一這種“超級工程”的洗禮並有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用。

RocketMQ特點

  • 是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分散式等特點
  • Producer、Consumer、隊列都可以分散式
  • Producer 向一些隊列輪流發送消息,隊列集合稱為 Topic,Consumer 如果做廣播消費,則一個 Consumer 實例消費這個 Topic 對應的所有隊列,如果做集群消費,則多個 Consumer 實例平均消費這個 Topic 對應的隊列集合
  • 能夠保證嚴格的消息順序
  • 支持拉(pull)和推(push)兩種消息模式
  • 高效的訂閱者水平擴展能力
  • 實時的消息訂閱機制
  • 億級消息堆積能力
  • 支持多種消息協議,如 JMS、OpenMessaging 等
  • 較少的依賴

Spring Cloud Stream

Spring Cloud Stream 是一個構建消息驅動微服務的框架。

Spring Cloud Stream 提供了消息中間件配置的統一抽象,推出了 pub/sub,consumer groups,semantics,stateful partition 這些統一的模型支持。

Spring Cloud Stream 核心構件有:Binders、Bindings和Message,應用程式通過 inputs 或者 outputs 來與 binder 交互,通過我們配置來 binding ,而 binder 負責與中間件交互,Message為數據交換的統一數據規範格式。

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中間件與應用程式提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程式的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸。

  • Binder: 跟外部消息中間件集成的組件,用來創建 Binding,各消息中間件都有自己的 Binder 實現。

比如 Kafka 的實現 KafkaMessageChannelBinderRabbitMQ 的實現 RabbitMessageChannelBinder 以及 RocketMQ 的實現 RocketMQMessageChannelBinder

  • Message:是 Spring Framework 中的一個模塊,其作用就是統一消息的編程模型。

比如消息 Messaging 對應的模型就包括一個消息體 Payload 和消息頭 Header。

spring-cloud-stream 官網

Window搭建部署RocketMQ

下載

當前最新版本為4.6.0

下載出來解壓到:D:\rocketmq 目錄,目錄最好不要帶空格和太深,否則服務運行可能會報錯

啟動NameServer服務

在啟動之前需要配置系統環境,不然會報錯。

Please set the ROCKETMQ_HOME variable in your environment! 

系統環境變數名:ROCKETMQ_HOME

根據你解壓的目錄配置環境變數,比如我的變數值為:D:\rocketmq

進入window命令視窗,進入D:\rocketmq\bin目錄下,執行

start mqnamesrv.cmd

如上則NameServer啟動成功。使用期間,視窗不要關閉。

啟動Broker服務

進入bin目錄下,輸入

start mqbroker.cmd -n localhost:9876

如上的 ip+port 是rocketmq的服務地址和埠。

運行如上命令,可能會報如下錯誤。找不到或無法載入主類

如果出此情況,打開bin-->runbroker.cmd,修改%CLASSPATH%成"%CLASSPATH%"

保存再次執行如上命令。執行成功後,提示boot success 代表成功。

示例

本示例實現三種消息的發佈以及訂閱接收。

創建 RocketMQ 消息生產者

創建 ali-rocketmq-producer 工程,埠為:28081

  • pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactId>cloud-alibaba</artifactId>
        <groupId>com.easy</groupId>
        <version>1.0.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <artifactId>ali-rocketmq-producer</artifactId>
    <packaging>jar</packaging>

    <dependencies>

        <!--rocketmq依賴-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <!--web依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  • 配置 Output 的 Binding 信息並配合 @EnableBinding 註解使其生效

application.yml配置

server:
  port: 28081

spring:
  application:
    name: ali-rocketmq-producer
  cloud:
    stream:
      rocketmq:
        binder:
          # RocketMQ 伺服器地址
          name-server: 127.0.0.1:9876
      bindings:
        output1: {destination: test-topic1, content-type: application/json}
        output2: {destination: test-topic2, content-type: application/json}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

ArProduceApplication.java

@SpringBootApplication
@EnableBinding({MySource.class})
public class ArProduceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArProduceApplication.class, args);
    }
}
  • 消息生產者服務

MySource.java

package com.easy.arProduce;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    @Output("output1")
    MessageChannel output1();

    @Output("output2")
    MessageChannel output2();
}

SenderService.java

package com.easy.arProduce;

import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
public class SenderService {

    @Autowired
    private MySource source;

    /**
     * 發送字元串
     *
     * @param msg
     */
    public void send(String msg) {
        Message message = MessageBuilder.withPayload(msg)
                .build();
        source.output1().send(message);
    }

    /**
     * 發送帶tag的字元串
     *
     * @param msg
     * @param tag
     */
    public void sendWithTags(String msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .build();
        source.output1().send(message);
    }

    /**
     * 發送對象
     *
     * @param msg
     * @param tag
     * @param <T>
     */
    public <T> void sendObject(T msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        source.output2().send(message);
    }
}

編寫 TestController.java 控制器方便測試

package com.easy.arProduce;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "test")
public class TestController {
    @Autowired
    SenderService senderService;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public String send(String msg) {
        senderService.send(msg);
        return "字元串消息發送成功!";
    }

    @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET)
    public String sendWithTags(String msg) {
        senderService.sendWithTags(msg, "tagStr");
        return "帶tag字元串消息發送成功!";
    }

    @RequestMapping(value = "/sendObject", method = RequestMethod.GET)
    public String sendObject(int index) {
        senderService.sendObject(new Foo(index, "foo"), "tagObj");
        return "Object對象消息發送成功!";
    }
}

創建 RocketMQ 消息消費者

創建 ali-rocketmq-consumer 工程,埠為:28082

  • pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactId>cloud-alibaba</artifactId>
        <groupId>com.easy</groupId>
        <version>1.0.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>

    <artifactId>ali-rocketmq-consumer</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

-配置 Input 的 Binding 信息並配合 @EnableBinding 註解使其生效

application.yml配置

server:
  port: 28082

spring:
  application:
    name: ali-rocketmq-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 #rocketmq 服務地址
        bindings:
          input1: {consumer.orderly: true}  #是否排序
          input2: {consumer.tags: tagStr}   #訂閱 帶tag值為tagStr的字元串
          input3: {consumer.tags: tagObj}   #訂閱 帶tag值為tabObj的字元串
      bindings:
        input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}
        input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1}
        input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

ArConsumerApplication.java

package com.easy.arConsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({MySource.class})
public class ArConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArConsumerApplication.class, args);
    }
}
  • 消息消費者服務

MySource.java

package com.easy.arConsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySource {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();

    @Input("input3")
    SubscribableChannel input3();
}

ReceiveService.java

package com.easy.arConsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ReceiveService {

    @StreamListener("input1")
    public void receiveInput1(String receiveMsg) {
        log.info("input1 接收到了消息:" + receiveMsg);
    }

    @StreamListener("input2")
    public void receiveInput2(String receiveMsg) {
        log.info("input2 接收到了消息:" + receiveMsg);
    }

    @StreamListener("input3")
    public void receiveInput3(@Payload Foo foo) {
        log.info("input3 接收到了消息:" + foo);
    }
}

使用示例

示例關聯項目

本示例我們創建了兩個項目實現

  • ali-rocketmq-producer:RocketMQ 消息服務生產者,服務名:ali-rocketmq-producer,埠:28081

  • ali-rocketmq-consumer:RocketMQ 消息服務消費者,服務名:ali-rocketmq-producer,埠:28082

運行示例測試

首先要啟動ali-rocketmq-producer服務及ali-rocketmq-consumer服務

  • 訪問消息服務生產者地址: http://localhost:28081/test/send?msg=yuntian

查看服務消費者控制台,輸出

2019-12-04 15:37:47.859  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input1 接收到了消息:yuntian
2019-12-04 15:37:47.859  INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 ms

表示字元串消費成功被input1消費了

  • 訪問消息服務生產者地址: http://localhost:28081/test/sendWithTags?msg=tagyuntian

查看服務消費者控制台,輸出

2019-12-04 15:38:09.586  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input2 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input1 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 ms

表示帶tag的字元串成功被input2和input1消費了,因為input1也訂閱了test-topic1,並且沒有我們沒有加tag過濾,預設表示接收所有消息,所以也能成功接收tagyuntian字元串

  • 訪問消息服務生產者地址: http://localhost:28081/test/sendObject?index=1

查看服務消費者控制台,輸出

2019-12-04 15:41:15.285  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input3 接收到了消息:Foo{id=1, bar='foo'}

表示input3成功接收到了tag帶tagObj的對象消息了,而input1卻沒有輸出消息,這是因為sendObject發佈的消息走的是test-topic2消息管道,所以不會發佈給input1及input2訂閱者

資料


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • --部門表 create table dept( deptno int primary key,--部門編號 dname nvarchar(30),--部門名 loc nvarchar(30)--地址 ); --雇員表 create table emp( empno int primary key, ...
  • 基本查詢: 實例表 1 示例表 2 --部門表 3 4 create table dept( 5 6 deptno int primary key,--部門編號 7 8 dname nvarchar(30),--部門名 9 10 loc nvarchar(30)--地址 11 12 ); 13 14 ...
  • 表: 學生(*學號,姓名,性別,年齡,專業) create table student( sno char(13) primary key, sname varchar(20) not null, ssex char(2), sage smallint, sdept varchar(30) ); 課 ...
  • 很多用於當SQL Server2017 安裝完成後開始菜單找不到啟動項無法啟動SQL Server2017 其實你只需要安裝一下SSMS-Setup-CHS就可以了 安裝完成之後就有了 SSMS-Setup-CHS 下載鏈接: 鏈接:https://pan.baidu.com/s/18EcH16Ok ...
  • 函數式編程(Functional Programming)是一種編程風格,它是相對於指令式編程風格而言的,常見的面向對象編程就是指令式編程風格。 指令式編程是面向電腦硬體的抽象,有變數(對應著存儲單元),賦值語句(獲取、存儲指令),表達式(記憶體引用和算術運算)和控制語句(跳轉語句)。 而函數式編程 ...
  • 1、到Navicat官網下載使用版本進行安裝,具體操作不再詳述。Navcat官網下載鏈接:http://www.navicat.com.cn/download/navicat-premium ; 2、到GitHub下載DoubleLabyrinth大神發佈的Navicat Keygen,具體操作不再 ...
  • 一 資料庫初識 1.1 什麼是資料庫 資料庫(DataBase,簡稱DB),簡而言之可視為電子化的文件櫃 存儲電子文件的處所,用戶可以對文件中的數據運行新增,截取,更新,刪除等操作. 所謂資料庫是以一定方式儲存在一起,能予多個用戶 共用,具有儘可能小的冗餘度,與應用程式彼此獨立的數據集合. 資料庫的 ...
  • 最近項目遇到根據關鍵字匹配度排序,要求關鍵字匹配相等排在第一,關鍵字匹配最左邊排第二,關鍵字匹配最右邊排第三,關鍵字匹配中間排最後;遇到這樣查詢排序場景,用MySQL如何實現?用搜索引擎Elasticsearch如何實現? 方法一:按照上面需求用聯合查詢,可以實現方案,但是當數據量很大時,聯合查詢效 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...