【解決方案】MySQL5.7 百萬數據遷移到 ElasticSearch7.x 的思考

来源:https://www.cnblogs.com/Apluemxa/archive/2023/12/06/17879538.html
-Advertisement-
Play Games

在日常項目開發中,可能會遇到使用 ES 做關鍵詞搜索的場景,但是一般來說業務數據是不會直接通過 CRUD 寫進 ES 的。 因為這可能違背了 ES 是用來查詢的初衷,數據持久化的事情可以交給資料庫來做。那麼,這裡就有一個顯而易見的問題:ES 里的數據從哪裡來? 本文介紹的就是如何將 MySQL ... ...


目錄

前言

在日常項目開發中,可能會遇到使用 ES 做關鍵詞搜索的場景,但是一般來說業務數據是不會直接通過 CRUD 寫進 ES 的。

因為這可能違背了 ES 是用來查詢的初衷,數據持久化的事情可以交給資料庫來做。那麼,這裡就有一個顯而易見的問題:ES 里的數據從哪裡來?

本文介紹的就是如何將 MySQL 的表數據遷移到 ES 的全過程。

一、一次性全量

該方案的思路很簡單直接:將資料庫中的表數據一次性查出,放入記憶體,在轉換 DB 與 ES 的實體結構,遍歷迴圈將 DB 的數據 放入 ES 中。

但是對機器的性能考驗非常大:本地 MySQL 10w 條數據,電腦記憶體16GB,僅30秒鐘記憶體占用90%,CPU占用100%。太過於粗暴了,不推薦使用。

@Component05
@Slf4j
public class FullSyncArticleToES implements CommandLineRunner {

    @Resource
    private ArticleMapper articleMapper;

    @Resource
    private ArticleRepository articleRepository;

    /**
     * 執行一次即可全量遷移
     */
    //todo: 弊端太明顯了,數據量一大的話,對記憶體和 cpu 都是考驗,不推薦這麼簡單粗暴的方式
    public void fullSyncArticleToES() {
        LambdaQueryWrapper<Article> wrapper = new LambdaQueryWrapper<>();
        List<Article> articleList = articleMapper.selectList(wrapper);
        if (CollectionUtils.isNotEmpty(articleList)) {
            List<ESArticle> esArticleList = articleList.stream().map(ESArticle::dbToEs).collect(Collectors.toList());
            final int pageSize = 500;
            final int total = esArticleList.size();
            log.info("------------FullSyncArticleToES start!-----------, total {}", total);
            for (int i = 0; i < total; i += pageSize) {
                int end = Math.min(i + pageSize, total);
                log.info("------sync from {} to {}------", i, end);
                articleRepository.saveAll(esArticleList.subList(i, end));
            }
            log.info("------------FullSyncPostToEs end!------------, total {}", total);
        }
        else {
            log.info("------------DB no Data!------------");
        }
    }
    @Override
    public void run(String... args) {}
}

二、定時任務增量

這種方案的思想是按時間範圍以增量的方式讀取,比全量的一次性數據量要小很多。

也存在弊端:頻繁的資料庫連接 + 讀寫,對伺服器資源消耗較大。且在極端短時間內大量數據寫入的場景,可能會導致性能、數據不一致的問題(即來不及把所有數據都查到,同時還要寫到 ES)。

但還是有一定的可操作性,畢竟可能沒有那麼極端的情況,高併發寫入的場景不會時刻都有。

@Component
@Slf4j
public class IncSyncArticleToES {
    @Resource
    private ArticleMapper articleMapper;

    @Resource
    private ArticleRepository articleRepository;

    /**
     * 每分鐘執行一次
     */
    @Scheduled(fixedRate = 60 * 1000)
    public void run() {
        // 查詢近 5 分鐘內的數據,有 id 重覆的數據 ES 會自動覆蓋
        Date fiveMinutesAgoDate = new Date(new Date().getTime() - 5 * 60 * 1000L);
        List<Article> articleList = articleMapper.listArticleWithData(fiveMinutesAgoDate);
        if (CollectionUtils.isNotEmpty(articleList)) {
            List<ESArticle> esArticleList = articleList.stream().map(ESArticle::dbToEs).collect(Collectors.toList());
            final int pageSize = 500;
            int total = esArticleList.size();
            log.info("------------IncSyncArticleToES start!-----------, total {}", total);
            for (int i = 0; i < total; i += pageSize) {
                int end = Math.min(i + pageSize, total);
                log.info("sync from {} to {}", i, end);
                articleRepository.saveAll(esArticleList.subList(i, end));
            }
            log.info("------------IncSyncArticleToES end!------------, total {}", total);
        }
        else {
            log.info("------------DB no Data!------------");
        }
    }
}

三、強一致性問題

如果大家看完以上兩個方案,可能會有一個問題:

無論是增量還是全量, MySQL 和 ES 進行連接/讀寫是需要耗費時間的,如果這個過程中如果有大量的數據插到 MySQL 里,那麼有沒有可能寫入 ES 里的數據並不能和 MySQL 里的完全一致?

答案是:在數據量大和高併發的場景下,是很有可能會發生這種情況的。

如果需要我們自己寫代碼來保證一致性,可以怎麼做才能較好地解決呢?

思路:由於 ES 查詢做了分頁,每次查只有10 條,那麼每次調用查詢的時候,就拿這10條數據的唯一標識 id 再去 MySQL 中查一下,MySQL 里有的就會被查出來,那麼返回這些結果就好,就不直接返回 ES 的查詢結果了;同時刪除掉 ES 里那些在資料庫中被刪除的數據,做個”反向同步“。這個思路有幾個明顯的優點:

1、單次數據量很小,在記憶體中操作幾乎就是毫秒級的;

2、返回的是 MySQL 的源數據,不再 ”信任“ ES 了,保證強一致性;

3、反向刪除 ES 中的那些已經被 MySQL 刪除了的數據。

以下是代碼,註釋很詳細,應該很好理解:

@Override
public PageInfo<Article> testSearchFromES(ArticleSearchDTO articleSearchDTO){
    // 獲取查詢對象的結果, searchQuery 這裡忽略,就當查詢條件已經寫好了,可以查到數據
    SearchHits<ESArticle> searchHits = elasticTemplate.search(searchQuery, ESArticle.class);
    //todo: 以下考慮使用 MySQL 的源數據,不再以 ES 的數據為準
    List<Article> resultList = new ArrayList<>();
    // 從 ES 查出結果後,再與 db 獲的數據進行對比,確認後再組裝返回
    if (searchHits.hasSearchHits()) {
        // 收集 ES 里業務對象的 Id 成 List
        List<String> articleIdList = searchHits.getSearchHits().stream()
            .map(val -> val.getContent().getId())
            .collect(Collectors.toList());
        // 獲取資料庫的符合體條件的數據,由於是分頁的,一次性的數據量小(10條而已),剩下的都是記憶體操作,性能可以保證
        List<Article> articleList = baseMapper.selectBatchIds(articleIdList);
        if (CollectionUtils.isNotEmpty(articleList)) {
            //根據 db 里業務對象的 Id 進行分組
            Map<String , List<Article>> idArticleMap = articleList.stream().collect(Collectors.groupingBy(Article::getId));
            //對 ES 中的 Id 的集合進行 for 迴圈,經過對比後添加數據
            articleIdList.forEach(articleId -> {
                // 如果 ES 里的 Id 在資料庫里有,說明數據已經同步到 ES 了,兩邊的數據是一致的
                if (idArticleMap.containsKey(articleId)) {
                    // 則把符合的數據放入 page 對象中
                    resultList.add(idArticleMap.get(articleId).get(NumberUtils.INTEGER_ZERO));
                } else {
                    // 刪除 ES 中那些在資料庫中被刪除的數據;因為資料庫都沒有這條資料庫了,那麼 ES 里也不能有,算是一種反向同步吧
                    String delete = elasticTemplate.delete(String.valueOf(articleId), PostEsDTO.class);
                    log.info("delete post {}", delete);
                }
            });
        }
    }
    // 初始化 page 對象
    PageInfo<Article> pageInfo = new PageInfo<>();
    pageInfo.setList(resultList);
    pageInfo.setTotal(searchHits.getTotalHits());
    System.out.println(pageInfo);
    return pageInfo;
}

然而,以上的所有內容並不是今天文章的重點。只是為引入 canal 做的鋪墊,引入、安裝、配置好 canal 後可以解決以上的全部問題。對,就是全部。


四、canal 框架

4.1基本原理

canal 是 Alibaba 開源的一個用於 MySQL 資料庫增量數據同步工具。它通過解析 MySQL 的 binlog 來獲取增量數據,並將數據發送到指定位置。

canal 會模擬 MySQL slave 的交互協議,偽裝自己為 MySQL 的 slave ,向 MySQL master 發送 dump 協議。MySQL master 收到 dump 請求,開始推送 bin-log 給 slave (即 canal )。

canal 簡單原理

canal 的高可用分為兩部分:canal server 和 canal client。

canal server 為了減少對 MySQL dump 的請求,不同 server 上的實例要求同一時間只能有一個處於 running 狀態;

canal client 為了保證有序性,一份實例同一時間只能由一個 canal client 進行 get/ack/rollback 操作來保證順序。

canal 高可用

4.2安裝使用(重點)

  • 版本說明
    • Centos 7(這個關係不大)
    • JDK 11(這個很關鍵)
    • MySQL 5.7.36(只要5.7.x都可)
    • Elasticsearch 7.16.x(不要太高,比較關鍵)
    • cannal.server: 1.1.5(有官方鏡像,放心拉取)
    • canal.adapter: 1.1.5(無官方鏡像,但問題不大)

註:我這裡由於自己的個人伺服器的一些中間件版本問題,始終無法成功安裝上 canal-adapter,所以沒有最終將數據遷移到 ES 里去。

主要原因在於兩點:

  1. JDK 版本需要 JDK11及以上,我自己個人伺服器現用的是 JDK 8,但 canal 並不相容 JDK 8;
  2. 我的 ES 的版本太高用的是7.6.1,這可能導致 canal 版本與它不相容,可能實際需要降低到7.16.x 左右。

但是本人在工作中是有過項目實踐的,推薦使用 docker 安裝 canal,步驟參考:https://zhuanlan.zhihu.com/p/465614745

4.3引入依賴(測試)

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

4.4代碼示例(測試)

以下代碼 demo 來自官網,僅用於測試。

首先需要連接上4.2小節中的 canal-server 配置,然後啟動該類中的 main 方法後會不斷去監聽對應的 MySQL 庫-表數據是否有變化,有的話就列印出來。

public class CanalClientUtils {
    public static void main(String[] args) {
        // 創建連接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress
                ("你的公網ip地址", 11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 1000;
            while (emptyCount < totalEmptyCount) {
                // 獲取指定數量的數據
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                // 提交確認
                connector.ack(batchId);
                // 處理失敗, 回滾數據
                //connector.rollback(batchId);
            }
            System.out.println("empty too many times, exit");
        } finally {
            // 關閉連接
            connector.disconnect();
        }
    }
    private static void printEntry(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of error-event has an error , data:" + entry, e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.printf(
                    "-----------binlog[%s:%s] , name[%s,%s] , eventType:%s%n ------------",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType);
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("---------before data----------");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("---------after data-----------");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + ",update status:" + column.getUpdated());
        }
    }
}

預期的結果會表明涉及的庫、表名稱,以及操作的類型,同時還可以知道欄位的狀態:true 為有變化,false 為無變化。如下圖所示:

canal 監聽示例

以上的4.3和4.4小節都是用來測試效果的,在伺服器上安裝配置好 canal 以後,實際無需在項目中寫關於 canal 的操作代碼。

每一步的 MySQL 操作 binlog 都會被 canal 獲取到,然後將數據同步到 ES 中,這些操作都是在伺服器上進行的,基本上對於開發人員來說是無感的。

阿裡雲上有專門的產品來支持數據從 MySQL 遷移到 ES 的場景,真正的商業項目開發,還是可以選擇雲廠商現有的方案(我不是打廣告):

https://help.aliyun.com/zh/dts/user-guide/migrate-data-from-an-apsaradb-rds-for-mysql-instance-to-an-elasticsearch-cluster?spm=a2c4g.11186623.0.0.33626255Aql88M


五、文章小結

到這裡我就和大家分享完了關於數據從 MySQL 遷移到 ES 全過程的思考,如有錯誤和不足,期待大家的指正和交流。

參考文檔:

  1. 阿裡巴巴 canal 的 GitHub 開源項目地址:https://github.com/alibaba/canal
  2. 安裝以及配置步驟:https://zhuanlan.zhihu.com/p/465614745

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 針對遇到的各種複雜形狀的主體,大多情況下,我們可以求得一個近似的多邊形來簡化視覺圖像處理,因為多邊形是由直線組成的,這樣就可以準確的劃分區域來便捷後續的操作。 cv2.arcLength() Method: 參數: curve:要計算周長的輪廓,可以是一個矩形、圓形、多邊形等封閉曲線。 closed ...
  • 寫在前面 這篇文章被擱置真的太久了,不知不覺拖到了周三了,當然,也算跟falsk系列說再見的時候,真沒什麼好神秘的,就是個資料庫操作,就大家都知道的CRUD吧。 Flask SQLAlchemy的使用 1、Flask SQLAlchemy簡介 Flask SQLAlchemy 是基於 Flask w ...
  • MinTray 說明 實現程式關閉時最小化托盤的功能 托盤實現顯示主頁面和退出的功能 支持擴展,直接引用TrayIcon類即可,對外暴露介面 單例實現,可復用 警告 註:博主所有資源永久免費,若有幫助,請點贊轉發是對我莫大的幫助 註:博主本人學習過程的分享,引用他人的文章皆會標註原作者 註:本人文章 ...
  • 變數、運算符、表達式、輸入與輸出 tip:[start]編程是一種控制電腦的方式,和我們平時雙擊打開文件、關機、重啟沒有任何區別——閆學燦tip:[end] 1.編寫一個簡單的Java程式–手速練習 /* step1:創建一個java源文件:HelloWorld.java 將編寫的java代碼保存 ...
  • 你是否曾想過為什麼在 Spring Boot 應用中緩存是如此重要?答案在於它通過減少數據檢索時間來提高性能。在本文中,我們將深入探討緩存對微服務模式的影響,並探討根據操作易用性、速度、可用性和可觀測性等因素選擇正確緩存的重要性。我們還將探討如何最大程度地提高緩存性能和可用性。 1 緩存實現 1.1 ...
  • 項目背景 原有的啟動平臺公共組件庫comm-util的瀏覽器工具類BrowserUtils是基於UserAgentUtils的,但是該項目最後一個版本發佈於 2018/01/24,之至今日23年底,已有5年沒有維護更新,會造成最新版本的部分瀏覽器不能正確獲取到瀏覽器信息。(至於為什麼停更了獲取不到最 ...
  • 現象描述:Spring Boot項目,啟動的時候卡住了,一直卡在那裡不動,沒有報錯,也沒有日誌輸出 但是,奇怪的是,本地可以正常啟動 好吧,姑且先不深究為什麼本地可以啟動而部署到伺服器上就無法啟動的問題,這個不是重點,重點是怎麼讓它啟動起來。(PS:我猜測可能是環境不同造成的,包括操作系統不同和JD ...
  • 如圖所示,項目中定義了這樣幾個模塊: pdd-workflow-build :定義項目版本,及全局配置 pdd-workflow-dependencies :外部依賴管理,統一管理所有用到的外部依賴的版本 pdd-workflow-service :項目service模塊 pdd-workflow- ...
一周排行
    -Advertisement-
    Play Games
  • 1、預覽地址:http://139.155.137.144:9012 2、qq群:801913255 一、前言 隨著網路的發展,企業對於信息系統數據的保密工作愈發重視,不同身份、角色對於數據的訪問許可權都應該大相徑庭。 列如 1、不同登錄人員對一個數據列表的可見度是不一樣的,如數據列、數據行、數據按鈕 ...
  • 前言 上一篇文章寫瞭如何使用RabbitMQ做個簡單的發送郵件項目,然後評論也是比較多,也是準備去學習一下如何確保RabbitMQ的消息可靠性,但是由於時間原因,先來說說設計模式中的簡單工廠模式吧! 在瞭解簡單工廠模式之前,我們要知道C#是一款面向對象的高級程式語言。它有3大特性,封裝、繼承、多態。 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 介紹 Nodify是一個WPF基於節點的編輯器控制項,其中包含一系列節點、連接和連接器組件,旨在簡化構建基於節點的工具的過程 ...
  • 創建一個webapi項目做測試使用。 創建新控制器,搭建一個基礎框架,包括獲取當天日期、wiki的請求地址等 創建一個Http請求幫助類以及方法,用於獲取指定URL的信息 使用http請求訪問指定url,先運行一下,看看返回的內容。內容如圖右邊所示,實際上是一個Json數據。我們主要解析 大事記 部 ...
  • 最近在不少自媒體上看到有關.NET與C#的資訊與評價,感覺大家對.NET與C#還是不太瞭解,尤其是對2016年6月發佈的跨平臺.NET Core 1.0,更是知之甚少。在考慮一番之後,還是決定寫點東西總結一下,也回顧一下.NET的發展歷史。 首先,你沒看錯,.NET是跨平臺的,可以在Windows、 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 添加節點(nodes) 通過上一篇我們已經創建好了編輯器實例現在我們為編輯器添加一個節點 添加model和viewmode ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...
  • 類型檢查和轉換:當你需要檢查對象是否為特定類型,並且希望在同一時間內將其轉換為那個類型時,模式匹配提供了一種更簡潔的方式來完成這一任務,避免了使用傳統的as和is操作符後還需要進行額外的null檢查。 複雜條件邏輯:在處理複雜的條件邏輯時,特別是涉及到多個條件和類型的情況下,使用模式匹配可以使代碼更 ...
  • 在日常開發中,我們經常需要和文件打交道,特別是桌面開發,有時候就會需要載入大批量的文件,而且可能還會存在部分文件缺失的情況,那麼如何才能快速的判斷文件是否存在呢?如果處理不當的,且文件數量比較多的時候,可能會造成卡頓等情況,進而影響程式的使用體驗。今天就以一個簡單的小例子,簡述兩種不同的判斷文件是否... ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...