Spark源碼解析(一):RDD之Transfrom運算元

来源:https://www.cnblogs.com/river97/archive/2023/03/31/17277318.html
-Advertisement-
Play Games

一、延遲計算 RDD 代表的是分散式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations) 在 RDD 的編程模型中,一共有兩種運算元,Transformations 類運算元和 Actions 類運算元。開發者需要使用 Transformations ...


一、延遲計算

RDD 代表的是分散式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations)

在 RDD 的編程模型中,一共有兩種運算元,Transformations 類運算元和 Actions 類運算元。開發者需要使用 Transformations 類運算元,定義並描述數據形態的轉換過程,然後調用 Actions 類運算元,將計算結果收集起來、或是物化到磁碟。

在這樣的編程模型下,Spark 在運行時的計算被劃分為兩個環節。

  1. 基於不同數據形態之間的轉換,構建計算流圖(DAG,Directed Acyclic Graph)
  2. 通過 Actions 類運算元,以回溯的方式去觸發執行這個計算流圖

換句話說,開發者調用的各類 Transformations 運算元,並不立即執行計算,當且僅當開發者調用 Actions 運算元時,之前調用的轉換運算元才會付諸執行。在業內,這樣的計算模式有個專門的術語,叫作“延遲計算”(Lazy Evaluation)。

二、Spark運算元分類

在 RDD 的開發框架下,哪些運算元屬於 Transformations 運算元,哪些運算元是 Actions 運算元呢?

這裡給出一張自己在極客看的課程中的圖

img

三、Transform運算元執行流程(源碼)

Map轉換算是 RDD 的經典轉換操作之一了.就以它開頭.Map的源碼如下:

image-20230224103912741

1. sc.clean(f)

首先掉了一個sc.clean(f) , 我們進到clean函數里看下:

image-20230224104117191

註釋中明確提到了這個函數的功能:clean 整理一個閉包,使其可以序列化併發送到任務.

這裡的代碼有些多,大概知道這個函數的功能是這樣就ok了,閉包的問題會在另一篇文章里仔細介紹

2. MapPartitionsRDD

進入到函數後源碼如下:

image-20230224105719758

這是一個MapPartitionsRDD。我們仔細看它的構成,從而來理解它是如何描述MapPartitionsRDD的.

2.1 var prev:RDD[T]

這裡的 prev 就是父RDD,f 則是Map中傳入的處理函數,除了這兩個就沒有了,也就是說明 RDD中沒有存儲具體的數據本身

這再次印證了轉換不會產生任何數據.它只是單純了記錄父RDD以及如何轉換的過程就完了,不會在轉換階段產生任何數據集

2.2 preservesPartitioning

preservesPartitioning 表示是否保持父RDD的分區信息.
如果為false(預設為false),則會對結果重新分區.也就是Map系預設都會分區
如果為true,保留分區. 則按照 firstParent 保留分區   

image-20230224110557226

可以看到根據 dependencies 找到其第一個父 RDD

image-20230224110711910

2.3 compute 計算邏輯
2.3.1 compute方法

RDD 抽象類要求其所有子類都必須實現 compute 方法,該方法接受的參數之一是一個Partition 對象,目的是計算該分區中的數據。

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

可以看到,compute 方法調用當前 RDD 內的第一個父 RDD 的 iterator 方法,該方的目的是拉取父 RDD 對應分區內的數據。

iterator 方法會返回一個迭代器對象,迭代器內部存儲的每個元素即父 RDD 對應分區內已經計算完畢的數據記錄。得到的迭代器作為 f 方法的一個參數。fRDD 類的 map 方法中指定,即實際的轉換函數。

compute 方法會將迭代器中的記錄一一輸入 f 方法,得到的新迭代器即為所求分區中的數據。

其他 RDD 子類的 compute 方法與之類似,在需要用到父 RDD 的分區數據時候,就會調用 iterator 方法,然後根據需求在得到的數據之上執行粗粒度的操作。換句話說,compute 函數負責的是父 RDD 分區數據到子 RDD 分區數據的變換邏輯。

2.3.2 iterator方法

此方法的實現在 RDD 這個抽象類中

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementers of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

interator首先檢查 存儲級別 storageLevel:此處可參考RDD持久化

如果存儲級別不是NONE, 說明分區的數據說明分區的數據要麼已經存儲在文件系統當中,要麼當前 RDD 曾經執行過 cachepersise 等持久化操作,此時需要從存儲空間讀取分區數據,調用 getOrCompute 方法

image-20230224114953570

getOrCompute 方法會根據 RDD 編號:id分區編號:partition.index 計算得到當前分區在存儲層對應的塊編號:blockId,通過存儲層提供的數據讀取介面提取出塊的數據。

代碼中的這幾句註釋給的非常到位,大致的判斷順序如下:

  • 塊命中的情況:也就是數據之前已經成功存儲到介質中,這其中可能是數據本身就在存儲介質中(比如通過讀取HDFS創建的RDD),也可能是 RDD 在經過持久化操作並且經歷了一次計算過程,這個時候我們就能成功讀取數據並將其返回
  • 塊未命中的情況:可能是數據已經丟失,或者 RDD 經過持久化操作,但是是當前分區數據是第一次被計算,因此會出現拉取得到數據為 None 的情況。這就意味著我們需要計算分區數據,繼續調用 RDDcomputeOrReadCheckpoint 方法來計算數據,並將計算得到的數據緩存到存儲介質中,下次就無需再重覆計算。

如果當前RDD的存儲級別為 None,說明為未經持久化的 RDD,需要重新計算 RDD 內的數據,這時候調用 RDD 類的 computeOrReadCheckpoint 方法,該方法也在持久化 RDD 的分區獲取數據失敗時被調用。

image-20230224142431572

computeOrReadCheckpoint 方法會檢查當前 RDD 是否已經被標記成檢查點,如果未被標記成檢查點,則執行自身的 compute 方法來計算分區數據,否則就直接拉取父 RDD 分區內的數據。

需要註意的是,對於標記成檢查點的情況,當前 RDD 的父 RDD 不再是原先轉換操作中提供數據的父 RDD,而是被 Apache Spark 替換成一個 CheckpointRDD 對象,該對象中的數據存放在文件系統中,因此最終該對象會從文件系統中讀取數據並返回給 computeOrReadCheckpoint 方法

參考文章:

Cache 和 Checkpoint


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言: https://www.cnblogs.com/LoveBB/p/17277662.html 什麼是範型 JDK 1.5開始引入Java泛型(generics)這個特性,該特性提供了編譯時類型安全檢測機制,允許程式員在編譯時檢測到非法的類型。停,業餘的客套話就不多說了,這些術語,講了N遍,不 ...
  • 還不會 Quartz?如果你還沒有接觸過Quartz,那麼你可能錯過了一個很棒的任務調度框架!Quartz 提供了一種靈活、可靠的方式來管理和執行定時任務,讓咱們的定時任務更加優雅。 ...
  • 1.標識符 程式中對類、變數等的命名,稱為標識符; 標識符命名規則: 由數字、字母、下劃線、美元符組成,不能以數字開頭; 嚴格區分大小寫; 不能與關鍵字或保留字重名; 標識符的命名最好能反應出其作用。 2.關鍵字 程式中對編譯器有特殊意義的詞,例如class被用來定義類,當程式執行遇到class時, ...
  • 模型之間的關係(Relations Between Models) 上一章介紹了為包含基本欄位的模型創建自定義視圖。然而,在任何真實的業務場景中,我們都需要不止一個模型。此外,模型之間的鏈接是必要的。人們可以很容易地想象一個模型包含客戶,另一個模型則包含用戶列表。你可能需要參考任何現有業務模型上的客 ...
  • ChatGPT是一個基於GPT-3.5架構的自然語言處理工具,它具有文本生成、文本分類、對話生成等多種能力。作為一種強大的自然語言處理工具,ChatGPT可以應用於智能客服、智能問答、內容創作等多個領域。如果您對ChatGPT感興趣,可以通過關註本公眾號瞭解更多信息,並體驗基於ChatGPT的小程式... ...
  • Spring Spring為簡化開發而生,讓程式員只關心核心業務的實現,儘可能的不在關註非業務邏輯代碼(事務控制,安全日誌等)。 1,Spring八大模塊 這八大模塊組成了Spring 1.1 Spring Core模塊 這是Spring框架的最基礎的部分,它提供了依賴註入(DependencyIn ...
  • React Native 備忘清單 適合初學者的綜合 React Native 備忘清單,在開始 React Native 之前需要先掌握 react 庫入門,為開發人員分享快速參考備忘單。 React Native (簡稱RN)是Facebook於2015年4月開源的跨平臺移動應用開發框架,是Fa ...
  • React 備忘清單 IT寶庫整理適合初學者入門的React開發速查備忘清單,為開發人員分享快速參考備忘單。 React是用於構建用戶界面的JavaScript庫,起源於Facebook的內部項目,該公司對市場上所有 JavaScript MVC框架都不滿意,決定自行開發一套,用於架設Instagr ...
一周排行
    -Advertisement-
    Play Games
  • 人臉識別技術在現代社會中扮演著越來越重要的角色,比如人臉識別門禁、人臉識別支付、甚至人臉識別網站登錄等。 最近有群友問.NET有沒有人臉識別的組件,小編查閱相關資料介紹下麵幾種.NET人臉識別組件供大家參考。 **1、Microsoft Azure Face API** 簡介:Microsoft A ...
  • # 1. 與 .NET Core 緩存的關係和差異 ABP 框架中的緩存系統核心包是 [Volo.Abp.Caching](https://www.nuget.org/packages/Volo.Abp.Caching) ,而對於分散式緩存的支持,abp 官方提供了基於 Redis 的方案,需要安裝 ...
  • 最近ET做熱更重載dll的時候,返回登陸會重新檢測新的dll,首次登錄之前已經Assembly.Load()過一次dll,第二次返回登陸再次load dll到記憶體中,Invoke執行方法的時候,異常了,有些方法執行了,有些未執行,於是查資料,看到些老資料說Assembly.Load重覆載入同名dll ...
  • 1. 擴展方法 擴展方法使你能夠向現有類型“添加”方法,而無需創建新的派生類型、重新編譯或以其他方式修改原始類型。 擴展方法是一種靜態方法,但可以像擴展類型上的實例方法一樣進行調用。 對於用 C#、F# 和 Visual Basic 編寫的客戶端代碼,調用擴展方法與調用在類型中定義的方法沒有明顯區別 ...
  • 以前在隨筆《Winform開發框架之客戶關係管理系統(CRM)的開發總結系列1-界面功能展示 》的幾篇隨筆中介紹過基於WInform開發框架開發的CRM系統,系統的功能主要也是圍繞著客戶相關信息來進行管理的。本篇隨筆介紹在最新的《SqlSugar開發框架》中整合CRM系統模塊的功能。 ...
  • 隨著技術的發展,ASP.NET Core MVC也推出了好長時間,經過不斷的版本更新迭代,已經越來越完善,本系列文章主要講解ASP.NET Core MVC開發B/S系統過程中所涉及到的相關內容,適用於初學者,在校畢業生,或其他想從事ASP.NET Core MVC 系統開發的人員。 經過前幾篇文章... ...
  • [toc] 這篇文章是我之前總結的一篇文章,因為整理博客的原因,原有博客已經註銷,但這篇文章對一些讀者很有用,所以現在新瓶裝舊酒重新整理回來分享給大家。 最近一段時間生產環境頻繁出問題,每次都會生成一個hs_err_pid*.log文件,因為工作內容的原因,在此之前並沒有瞭解過相關內容,趁此機會學習 ...
  • # 前言 在上一篇文章中,給大家講解了泛型的概念、作用、使用場景,以及泛型集合、泛型介面和泛型類的用法,但受限於篇幅,並沒有把泛型的內容講解完畢。所以今天我們會繼續學習泛型方法、泛型擦除,以及通配符等的內容,希望大家繼續做好學習的準備哦。 *** 全文大約【**4600】** 字,不說廢話,只講可以 ...
  • 昨天遇到參數key大小寫不一致導致校驗簽名失敗的問題,查了很長時間才找到原因。看了一下FastJson源碼,發現JSON.toObject中轉換成對象的時候會忽略大小寫。 所以,當使用了JSON.toObject將json轉成Java對象後,再用JSON.toObject轉成json,key值就變了 ...
  • 基於java的線上商城設計與實現,線上購物平臺,校園購物商城,商品銷售平臺,基於Java的電商平臺;電商平臺,買家和賣家可以在此平臺上進行銷售和交易,節約了大量的線下時間成本,購物車的功能,校園交易平臺等等; ...