Eureka應用註冊與集群數據同步源碼解析

来源:https://www.cnblogs.com/zhixiang-org-cn/archive/2019/10/21/11716916.html
-Advertisement-
Play Games

在之前的 "EurekaClient自動裝配及啟動流程解析" 一文中我們提到過,在構造 類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程 客戶端發起註冊 這個方法中包含的 和`instanceInfo`兩個對象在之前的文章中都已經單獨拿出來了: "Eureka中重要的對象" 服務端接受註冊 ...


在之前的EurekaClient自動裝配及啟動流程解析一文中我們提到過,在構造DiscoveryClient類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程

客戶端發起註冊
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

這個方法中包含的registrationClientinstanceInfo兩個對象在之前的文章中都已經單獨拿出來了:Eureka中重要的對象

服務端接受註冊

服務端接受註冊的Controller在ApplicationResource類中,這裡需要註意的是普通客戶端註冊時其中參數isReplication為false,這個參數就是控制集群是否同步的表示

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // 一系列的參數校驗
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // AWS的一些東西,不用細看
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
    //註冊
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

接著往下看註冊的處理邏輯

 public void register(final InstanceInfo info, final boolean isReplication) {
   // 租約過期時間
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
   // 註冊
        super.register(info, leaseDuration, isReplication);
   //集群複製
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

Lease租約這個類之前也分析過了,不再展開了

服務端保存註冊信息

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
          //獲取鎖
            read.lock();
          //獲取該實例的註冊信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
              //如果不存在則添加  
              gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 當存在這個應用的註冊信息時
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                //使用註冊時間長的一方的應用信息
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {

                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
          //創建租約
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
          //如果存在租約則更新開始時間
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
              //添加到最近註冊隊列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

           // 獲得應用實例最終狀態
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
  1. 實例信息是由一個map對象保存的,在這個map中,key是應用的appName,value是另外一個map,在這個map中key是應用的id,而value則是應用的租約信息,map對象如下:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 
registry        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. 大體流程為,先查看是否存在該appName的註冊信息,如不存在則創建。
  2. 接著查看是否存在該id的註冊信息,如果存在判斷客戶端的最後修改時間,記錄最後修改的實例,如果不存在則設置自我保護模式的幾個參數
  3. 根據實例創建對應的租約信息,然後添加到map對象中
  4. 添加到最近註冊隊列
  5. 添加到應用實例覆蓋狀態映射
  6. 設置應用實例最終狀態,添加最近租約變更隊列,設置緩存等
集群數據同步

isReplication屬性為true的時候,就會牽扯到集群信息同步了

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info ,
                                  InstanceStatus newStatus , boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

關於PeerEurekaNode在之前的文章也提到了,保存了集群節點信息,這裡可以看到是迴圈所有的集群節點,然後排除本身的信息之後調用了replicateInstanceActionsToPeers方法

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

這裡重點關註Register分支

public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

在不關心Eureka自有的調度處理相關的前提下,核心代碼是replicationClient.register(info),也就是說,如果當客戶端註冊時指定需要同步集群信息時,Eureka會把這個客戶端再註冊到它所在的集群其它的節點上

服務端發起集群數據同步

在之前得EurekaServer自動裝配及啟動流程解析一文中,我們提到過在初始化服務端的時候會從EurekaServer集群中同步數據,也就是下麵這段代碼:

public class EurekaServerBootstrap {   
    protected void initEurekaServerContext() throws Exception {
  //xxxx
    
                EurekaServerContextHolder.initialize(this.serverContext);
    
                log.info("Initialized server context");
    
                // 從其他 Eureka-Server 拉取註冊信息
                int registryCount = this.registry.syncUp();
                this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
                // Register all monitoring statistics.
                EurekaMonitors.registerAllStats();
        }

數據同步的代碼在syncUp

 public int syncUp() {
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
              //未讀取到註冊信息則開始等待
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
           // 獲取註冊信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                      //判斷是否可以註冊,這裡基於AWS環境的判斷,如果不是AWS環境直接返回true,故不需要關心這個問題
                        if (isRegisterable(instance)) {
                          //註冊
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

這裡的處理流程就是當前EurekaServer獲取到客戶端的註冊信息之後,就會再次調用上邊咱們提到的服務端Controller調用的register方法,當然這次調用時isReplication參數就變為true了

本文由博客一文多發平臺 OpenWrite 發佈!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Python是一種面向對象的編程語言,語法簡潔而清晰,具有豐富和強大的類庫。Python在設計上堅持了清晰劃一的風格,這使得Python成為一門易讀、易維護,並且被大量用戶所歡迎的、用途廣泛的語言。對於初學編程者來說,首選Python是個非常棒的選擇。 零基礎學編程 零基礎學編程,用python入門 ...
  • 兩種併發編程模型 多進程 進程間通信常用的幾種方式: 文件 管道 消息隊列 多線程 一個進程中存在的多個線程,通常通過共用記憶體來通信,(說的非常非常粗俗,就是通過類似“全局變數”的一些數據對象來通信。不知道這種說對不對) 兩者優缺點 多線程優點 線程被稱為“輕量級進程”,一般啟動更快,而開啟一個進程 ...
  • 庸置疑,Spring 早已成為 Java 後端開發事實上的行業標準,無數的公司選擇 Spring 作為基礎的開發框架,大部分 Java 後端程式員在日常工作中也會接觸到 Spring ,因此,如何用好 Spring ,也就成為 Java 程式員的必修課之一。 同時,Spring Boot 和 Spr ...
  • 本文接著上一篇寫的《Java微服務(二):服務消費者與提供者搭建》,上一篇文章主要講述了消費者與服務者的搭建與簡單的實現。其中重點需要註意配置文件中的幾個坑。 本章節介紹一些零散的內容:服務的負載均衡,序列化和熔斷 1.服務負載均衡 負載均衡可分為軟體負載均衡和硬體負載均衡。在我們日常開發中,一般很 ...
  • 最近用到NodeInstantiator批量加入實體 剛開始用的時候一直程式崩潰 錯誤代碼大致如下: 大體上代碼結構類似上面這樣,簡單起見NodeInstantiator的model就寫成2 manage_system是c++寫的一個QObject子類,存儲一些載入進來的數據 manage_syst ...
  • 表單驗證分為前端驗證和伺服器端驗證。 伺服器端驗證方面,Java提供了主要用於數據驗證的JSR 303規範,而Hibernate Validator實現了JSR 303規範。 項目依賴加入spring-boot-starter-thymeleaf時,預設就會加入Hibernate Validat... ...
  • ASCII碼:只有英文和拉丁字元,一個字元占一個位元組,8位 gb2312:只有6700個中文 1980年 gbk10:存了2萬多個中文 1995年 gb18030:27000中文 2000年 utf-32:一個字元占4個位元組 utf-16:一個字元占2個位元組或2個位元組以上 utf-8:英文用ASCI ...
  • 本文源碼: "GitHub·點這裡" || "GitEE·點這裡" 一、生活場景描述 1、請假審批流程 公司常見的請假審批流程:請假天數 2、流程圖解 3、代碼實現 二、責任鏈模式 1、基礎概念 責任鏈模式是一種對象的行為模式。在責任鏈模式里,很多對象由每一個對象對其下個的引用而連接起來形成一條鏈式 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 插件化的需求主要源於對軟體架構靈活性的追求,特別是在開發大型、複雜或需要不斷更新的軟體系統時,插件化可以提高軟體系統的可擴展性、可定製性、隔離性、安全性、可維護性、模塊化、易於升級和更新以及支持第三方開發等方面的能力,從而滿足不斷變化的業務需求和技術挑戰。 一、插件化探索 在WPF中我們想要開 ...
  • 歡迎ReaLTaiizor是一個用戶友好的、以設計為中心的.NET WinForms項目控制項庫,包含廣泛的組件。您可以使用不同的主題選項對項目進行個性化設置,並自定義用戶控制項,以使您的應用程式更加專業。 項目地址:https://github.com/Taiizor/ReaLTaiizor 步驟1: ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • Channel 是乾什麼的 The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consume ...
  • efcore如何優雅的實現按年分庫按月分表 介紹 本文ShardinfCore版本 本期主角: ShardingCore 一款ef-core下高性能、輕量級針對分表分庫讀寫分離的解決方案,具有零依賴、零學習成本、零業務代碼入侵適配 距離上次發文.net相關的已經有很久了,期間一直在從事java相關的 ...
  • 前言 Spacesniffer 是一個免費的文件掃描工具,通過使用樹狀圖可視化佈局,可以立即瞭解大文件夾的位置,幫助用戶處理找到這些文件夾 當前系統C盤空間 清理後系統C盤空間 下載 Spacesniffer 下載地址:https://spacesniffer.en.softonic.com/dow ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • 一、ReZero簡介 ReZero是一款.NET中間件 : 全網唯一開源界面操作就能生成API , 可以集成到任何.NET6+ API項目,無破壞性,也可讓非.NET用戶使用exe文件 免費開源:MIT最寬鬆協議 , 一直從事開源事業十年,一直堅持開源 1.1 純ReZero開發 適合.Net Co ...
  • 一:背景 1. 講故事 停了一個月沒有更新文章了,主要是忙於寫 C#內功修煉系列的PPT,現在基本上接近尾聲,可以回頭繼續更新這段時間分析dump的一些事故報告,有朋友微信上找到我,說他們的系統出現了大量的http超時,程式不響應處理了,讓我幫忙看下怎麼回事,dump也抓到了。 二:WinDbg分析 ...
  • 開始做項目管理了(本人3年java,來到這邊之後真沒想到...),天天開會溝通整理需求,他們講話的時候忙裡偷閑整理一下常用的方法,其實語言還是有共通性的,基本上看到方法名就大概能猜出來用法。出去打水的時候看到外面太陽好好,真想在外面坐著曬太陽,回來的時候好兄弟三年前送給我的鍵盤D鍵不靈了,在打"等待 ...