實戰乾貨|Spark 在袋鼠雲數棧的深度探索與實踐

来源:https://www.cnblogs.com/DTinsight/p/18159978
-Advertisement-
Play Games

Spark 是一個快速、通用、可擴展的大數據計算引擎,具有高性能、易用、容錯、可以與 Hadoop 生態無縫集成、社區活躍度高等優點。在實際使用中,具有廣泛的應用場景: · 數據清洗和預處理:在大數據分析場景下,數據通常需要進行清洗和預處理操作以確保數據質量和一致性,Spark 提供了豐富的 API ...


Spark 是一個快速、通用、可擴展的大數據計算引擎,具有高性能、易用、容錯、可以與 Hadoop 生態無縫集成、社區活躍度高等優點。在實際使用中,具有廣泛的應用場景:

· 數據清洗和預處理:在大數據分析場景下,數據通常需要進行清洗和預處理操作以確保數據質量和一致性,Spark 提供了豐富的 API,可以對數據進行清洗、過濾、轉換等操作

· 批處理分析:Spark 適用於各種應用場景下的批處理任務,包括統計分析、數據挖掘、特征提取等,用戶可以利用 Spark 強大的 API 和內置庫進行複雜的數據處理和分析,從而挖掘數據中的內在價值

· 互動式查詢:Spark 提供了支持 SQL 查詢的 Spark SQL 模塊,用戶可以使用標準的 SQL 語句進行互動式查詢和大規模數據分析

Spark 在袋鼠雲的使用

袋鼠雲數棧離線開發平臺,我們提供了三種使用 Spark 的方式:

● 創建 Spark SQL 任務

用戶可以直接通過編寫 SQL 的方式實現自己的業務邏輯。這種方式是目前數棧離線平臺使用 Spark 最廣泛的方式,也是最為推薦的一種方式。

● 創建 Spark Jar 任務

用戶需要在 IDEA 上使用 Scala 或者 Java 語言實現業務邏輯,然後對該項目進行編譯打包,並將得到的 Jar 包上傳到離線平臺,隨後在創建 Spark Jar 任務的時候引用這個 Jar 包,最後將任務提交到調度運行即可。

對於使用 SQL 難以實現或表達的需求,或者用戶有其他更深層次的需求,Spark Jar 任務無疑給用戶提供了一種更為靈活的使用 Spark 的方式。

● 創建 PySpark 任務

用戶可以直接編寫對應的 Python 代碼。在我們的客戶群體中,有相當一部分客戶,他們除了 SQL 之外,Python 可能是他們的主力語言。特別是針對有一定數據分析基礎、演算法基礎的用戶,他們往往會對處理好的數據進行更深層的分析,此時 PySpark 任務自然是他們的不二之選。

Spark 在袋鼠雲數棧離線開發平臺發揮著重要的作用,因此,我們內部對 Spark 做了也不少的優化,使客戶在使用 Spark 提交任務時更加方便。我們還基於 Spark 做了一些工具來增強整個數棧離線開發平臺的功能。

除此之外,在數據湖場景下,Spark 也發揮著相當重要的作用。在袋鼠雲的湖倉一體模塊中,已經支持了 Iceberg 和 Hudi 兩大數據湖,用戶可以使用 Spark 對湖表進行讀寫,湖表的治理底層也是通過使用 Spark 調用不同的存儲過程實現。

下文就將從引擎側和 Spark 本身兩個方面來闡述袋鼠雲內部所做的優化。

引擎端優化

袋鼠雲內部引擎端的功能主要是用於任務提交、任務狀態獲取、任務日誌獲取、停止任務、語法校驗等。每個功能點我們都做了不同程度的優化,下文通過兩個例子進行簡單介紹。

Spark on Yarn 提交速度提升

隨著引擎端 Spark 插件上新功能的不斷開發和完善,引擎端提交 Spark 任務所需的時間也在相應的增加,因此需要對提交 Spark 任務相關的代碼進行優化,以縮短 Spark 任務提交的時長,提升用戶體驗。

為此,我們做了以下工作,對於一些公用的配置文件,如 core-site.xml、yarn-site.xml、keytab 文件、spark-sql-application.jar 等,原來每次提交任務都需要預先從伺服器下載並提交這些配置文件。現在經過優化後,上述文件僅僅需要在客戶端 SparkYarnClient 初始化的時候下載一次,然後上傳到指定的 HDFS 路徑,後續提交 Spark 任務只需要通過參數的方式指定到對應的 HDFS 路徑即可。通過這種方式大大縮短了每次 Spark 任務的提交時間。

在新版本的數棧中,對於臨時查詢,我們還會根據自定義的規則判斷待執行 SQL 的複雜度,將複雜度不高的 SQL 發送到引擎端啟動的 SparkSQLEngine 運行,以加快運行速度。這個內部的 SparkSQLEngine 在以前僅僅用於語法校驗,現在也承擔了一部分 SQL 執行的功能,並且 SparkSQLEngine 還可以根據運行的整體情況,動態擴縮資源,實現資源的有效利用。

語法校驗

在較老的數棧版本,對於 SQL 進行語法校驗,引擎端會先把 SQL 發送到 Spark Thrift Server。這個 Spark Thrift Server 是以 local 模式部署,不僅僅需要用於語法校驗,其他平臺上所有元數據的獲取都是通過發送 SQL 到這個 Spark Thrift Server 執行來獲取。這種方式弊端較大,為此我們做了一些優化。在 Engine 端以 local 模式啟動了一個 Spark 任務,在進行語法校驗的時候不再將 SQL 發送到 Spark Thrift Server,而是內部維護了一個 SparkSession,直接對 SQL 進行語法校驗。

這種方式雖然可以不需要再跟外部的 Spark Thrift server 強關聯,但是會給調度組件帶來一定的壓力,在實現的過程中 Engine-Plugins 的整體複雜度也增大了不少。

為了優化以上問題,我們做了更進一步的優化,調度組件在啟動的時候,提交了一個 Spark 任務 SparkSQLEngine 到 Yarn 上。可以理解為是一個遠程的運行在 Yarn 上的 Spark Thrift Server,引擎端時刻監控這個 SparkSQLEngine 的健康狀態。這樣,每次執行語法校驗的時候,引擎端將 SQL 通過 JDBC 的形式發送給 SparkSQLEngine 進行語法校驗。

通過上述的優化,使得離線開發平臺與 Spark Thrift Server 解耦合,EasyManager 不需要額外部署 Spark Thrift Server,使部署更輕量化。調度側也不用維護一個 local 模式的 Spark 常駐進程。也為離線開發平臺上 Spark SQL 任務互動式查詢增強做鋪墊。

離線開發平臺與 EasyManager 部署的 Spark Thrift Server 解耦合後會有以下好處:

· 能夠真正意義上的實現 Spark 多集群多版本共存

· EasyManager 標準部署可以去除 Spark Thrift Server,為一線運維減負

· Spark SQL 語法校驗變得更輕量,不用緩存 SparkContext,減少 Engine 的資源占用

Spark 功能優化

隨著業務的發展深入,我們發現開源的 Spark 在一些場景並沒有對應的功能實現。因此我們在開源 Spark 的基礎上開發了更多新的插件,以支持數棧更多的功能應用。

任務診斷

首先,我們對 Spark 的 metric sink 做了增強。Spark 內部提供了各種 Sink,除了 ConsoleSink 之外,還有 CSVSink、JmxSink、MetricsServlet、GraphiteSink、Slf4jSink、StatsdSink 等。在 Spark3.0 之後還新增了 PrometheusServlet,但這些還不能滿足我們的需求。

在開發任務診斷功能的時候,我們需要通過把 Spark 內部的指標統一推送到 PushGateway,由 Prometheus Server 周期性的從 PushGateway 中拉取指標,最後通過調用 Prometheus 提供的查詢介面可以近實時地查詢到 Spark 內部的指標。

file

但是 Spark 並沒有實現將內部指標 sink 到 PushGateway。因此我們新增了 spark-prometheus-sink 插件,並且自定義了 PrometheusPushGatewaySink 用於將 Spark 內部的指標 push 到 PushGateway。

file

除此之外,我們還自定義了一個新的指標用來描述 Spark SQL 臨時查詢展示任務執行進度。具體步驟如下:

· 通過自定義 JobProgressSource 來新增用於描述離線任務進度的指標,將該指標註冊到 Spark 內部管理系統中的指標管理系統中

· 自定義 JobProgressListener,並將 JobProgressListener 註冊到 Spark 內部管理系統中的 ListenerBus。其中,JobProgressListener 的 onJobStart 方法的邏輯是計算當前 Job 下所有的 Task 數量;onTaskEnd 方法的邏輯是在每個 Task 完成後計算並更新當前離線任務進度;onJobEnd 方法的邏輯是在每個 Job 完成後計算並更新當前離線任務進度

對接商業版 Hadoop 集群

隨著袋鼠雲客戶越來越多,客戶的環境也是各不相同。有的客戶使用的是開源版本的 Hadoop 集群,也有相當一部分客戶使用的是 HDP、CDH、CDP、TDH 等。我們在對接這些客戶的集群的時候,開發側往往需要進行新的適配,運維側每次部署升級的時候也需要配置額外的參數或者有其他額外的操作。

以 HDP 為例,在對接 HDP 的時候,我們使用的 Spark 是 HDP 自帶的 Spark2.3,並且我們還需要在運維側新增一些參數,並將 HDP 自帶的 Spark 的所有 Jar包 移動到指定目錄。這些操作其實會給運維帶來一定的困惑和麻煩,不同類型的集群,運維需要維護不同的運維文檔,部署的過程也比較容易出錯。並且我們其實對 Spark 的源碼做了功能增強和 bug 的修複,如果使用的是 HDP 自帶的 Spark,那麼就享受不到我們內部維護的 Spark 帶來的所有好處。

為瞭解決上面這些問題,我們內部的 Spark 對現有市場上已有的、常見的發行商都做了適配。換句話來說,我們內部的 Spark 可以在所有不同的 Hadoop 集群上運行。這樣,無論對接哪一種類型的 Hadoop 集群,運維只需要部署同一個 Spark 即可,這大大減輕了運維部署的壓力。更重要的是,客戶可以直接使用我們內部的 Spark 穩定版本,享受到更多的新特性和更大的性能提升。

Spark3.2 新特性-AQE

較老的數棧版本中,預設的 Spark 版本是 2.1.3,後來我們將 Spark 的版本升級到 2.4.8,從數棧6.0開始,Spark3.2 也可以使用了。這裡著重介紹一下 AQE,這也是 Spark3.x 中最重要的新特性。

AQE 概述

Spark3.2 之前,AQE 預設是關閉的,需要通過將 spark.sql.adaptive.enabled 設置為 true,才能開啟 AQE。Spark3.2 之後,AQE 預設是開啟的,任務在運行過程中只要滿足 AQE 的觸發條件,即可享受 AQE 帶來的優化。

需要註意的是,AQE 的優化只會發生在 shuffle 階段,如果 SQL 在運行過程中並沒有涉及到 shuffle 操作,那麼即使 spark.sql.adaptive.enabled 的值為 true,AQE 也不會發揮作用。更準確來說,只有物理執行計劃包含 exchange 節點或者包含子查詢,AQE 才會生效。

AQE 在運行期間,會收集 shuffle map 階段所生成的中間文件的信息,並將這些信息進行統計,結合已有的規則動態的調整尚未執行的 Optimized Logical Plan 和 Spark Plan,從而對原來的 SQL 語句進行運行時優化。

file

從 Spark 源碼來看,AQE 涉及到以下4個優化規則:

file

我們知道,RBO 是根據一系列的規則(rule)來對 SQL 進行優化,包括謂詞下推、列剪枝、常量替換等。這些靜態規則本身已經內置在 Spark 中,Spark 在執行 SQL 的過程中,這些 rule 會一一作用到 SQL 中。

AQE 的優勢

CBO 這個特性是 Spark2.2 之後才有的,相比於 RBO,CBO 會結合表的統計信息,並根據這些統計信息和代價模型(Cost Model)選擇出較為優化的執行計劃。

但是,CBO 僅僅支持註冊到 Hive Metastore 的表。對於存儲在分散式文件系統的 parquet、orc 等文件,CBO 是不支持的。並且,如果 Hive 表缺少元數據信息,CBO 收集統計信息的時候就會收集不到,這可能會導致 CBO 失效。

CBO 的另外一個劣勢在於 CBO 在優化之前需要先執行 ANALYZE TABLE COMPUTE STATISTICS 來收集統計信息。該語句在執行過程中如果碰到大表則會較為耗時,收集效率較低。

無論是 CBO 還是 RBO,它們都屬於靜態優化。在物理執行計劃提交後,如果任務在運行過程中,數據量、數據分佈情況發生變化,CBO 也不會對已有的物理執行計划進行優化。

與 CBO、RBO 不同的是,AQE 在運行過程中,會對 shuffle map 過程中所產生的中間文件進行分析,動態的調整並優化尚未開始執行的邏輯執行計劃和物理執行計劃,相對靜態優化的 CBO 和 RBO 而言,AQE 的處理能得到更加優化的物理執行計劃

AQE 三大特性

● 自動分區合併

Shuffle 過程分為 Map 階段和 Reduce 兩個階段,Reduce 階段會將 Map 階段產生的中間臨時文件拉取到對應的 Executor 下,如果 Map 階段所處理的數據分佈非常不均勻,有很多 key 其實僅僅只有幾條數據,數據經過處理後可能會形成比較多的小文件。

為了避免上述情況,可以開啟 AQE 的自動分區合併功能,可以避免啟動過多的 reduce task 去拉取 Map 階段生成的小文件。

file

● 自動數據傾斜處理

應用場景主要在 Data Joins 中,當發生數據傾斜,AQE 能夠自動檢測到傾斜分區,並對傾斜分區按照一定的規則進行拆分。目前,在 Spark3.2 中,對 SortMergeJoin 和 ShuffleHashJoin 都支持自動數據傾斜處理。

● Join 策略調整

AQE 會動態的將 Hash Join、Sort Merge Join,降級調整為 Broadcast Join。

我們知道,Spark 任務一旦開始執行,並行度就已經確定。比如說,shuffle map 階段,並行度為分區的個數;shuffle reduce 階段並行度則為 spark.sql.shuffle.partitions 的值,預設為200。如果 Spark 任務在運行的過程中數據量變小導致大部分的分區的大小變小,這時如果仍然啟動那麼多的線程去處理小的數據集就會導致資源的浪費。

而 AQE 在執行過程會根據 shuffle 後生成的中間臨時結果,在一定條件下,通過應用 CoalesceShufflePartitions 規則,結合用戶提供的參數自動合併分區,其實就是調整 reducer 的數量。原來一個 reduce 線程只會拉取一個處理後的分區的數據,現在一個 reduce 線程會根據實際情況拉取更多的分區的數據,這樣就能減少資源的浪費,提高任務執行效率。
《行業指標體系白皮書》下載地址:https://www.dtstack.com/resources/1057?src=szsm

《數棧產品白皮書》下載地址:https://www.dtstack.com/resources/1004?src=szsm

《數據治理行業實踐白皮書》下載地址:https://www.dtstack.com/resources/1001?src=szsm

想瞭解或咨詢更多有關大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 數據結構 雙向迴圈鏈表 雙向迴圈鏈表的增刪改查 /***************************************************************************************************************** * * file na ...
  • 題目: ​ 設計一個進位轉換程式,使用順序棧設計一個把十進位數轉換為十六進位數的介面,實現當通過鍵盤輸入一個非負的十進位數,可以在終端輸出對應的十六進位數。 例如: 100 --> 0x64 --> 100%16 = 4 --> 100/16 = 6 6%16 = 6 題目分析: ​ 該題需要對用戶 ...
  • 棧(stack) 原理說明: ​ 學習數據結構的目的是為了更好的處理和存儲數據,對於順序表而言改查比較容易,增刪比較麻煩,對於鏈式表而言,增刪比較簡單,改查比較麻煩,所以每種數據結構都有不同的特點,用戶需要選擇合適的數據結構。 ​ 棧記憶體自頂向下進行遞增,其實棧和順序表以及鏈式表都一樣,都屬於線性結 ...
  • Kafka通過一系列機制來確保數據不丟失,這些機制涵蓋了生產者、Broker和消費者等關鍵環節。以下是Kafka保證數據不丟失的主要方式: 生產者生產數據不丟失: 同步方式:生產者發送數據給Kafka後,會等待Kafka的確認。如果在一定時間內(如10秒)沒有收到Broker的ack響應,生產者會認 ...
  • 聲明: 以下內容為個人筆記,內容不完全正確,請謹慎參考。 文本處理工具 cut: cut 工作是“剪”,具體來說就是在文件中負責剪切數據。cut 命令從文件的每個行剪切位元組、字元和欄位輸出。 1、基本語法: cut [選項參數] filename 說明:預設分隔符是副表符 2、選項參數說明 選項參數 ...
  • 雙向迴圈鏈表 原理與應用 雙向迴圈鏈表與雙向鏈表的區別:指的是雙向迴圈鏈表的首結點中的prev指針成員指向鏈表的尾結點,並且雙向迴圈鏈表的尾結點里的next指針成員指向鏈表的首結點,所以雙向迴圈鏈表也屬於環形結構。 雙向迴圈鏈表各功能實現 (1)為了管理雙向迴圈鏈表,需要構造頭結點的數據類型以及構造 ...
  • 1.綜述 本文以HiveSQL語法進行代碼演示。 對於其他資料庫來說同樣也適用,比如SparkSQL,FlinkSQL以及Mysql8,Oracle,SqlServer等傳統的關係型資料庫。 已更新第一類聚合函數類,點擊這裡閱讀 ①SQL視窗函數系列一之聚合函數類 ②SQL視窗函數系列二之分組排序窗 ...
  • 隨著企業數據規模的增長和業務多元化發展,海量數據實時、多維地靈活查詢變成業務常見訴求。同時多套資料庫系統成為常態,這既帶來了數據管理的複雜性,又加大了數據使用的難度,面對日益複雜的數據環境和嚴格的數據安全要求,需要解決多資料庫系統並存、數據孤島嚴重、許可權管理混亂和數據查詢提取困難等問題。與此同時,企 ...
一周排行
    -Advertisement-
    Play Games
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...
  • 目錄前言PostgreSql安裝測試額外Nuget安裝Person.cs模擬運行Navicate連postgresql解決方案Garnet為什麼要選擇Garnet而不是RedisRedis不再開源Windows版的Redis是由微軟維護的Windows Redis版本老舊,後續可能不再更新Garne ...
  • C#TMS系統代碼-聯表報表學習 領導被裁了之後很快就有人上任了,幾乎是無縫銜接,很難讓我不想到這早就決定好了。我的職責沒有任何變化。感受下來這個系統封裝程度很高,我只要會調用方法就行。這個系統交付之後不會有太多問題,更多應該是做小需求,有大的開發任務應該也是第二期的事,嗯?怎麼感覺我變成運維了?而 ...
  • 我在隨筆《EAV模型(實體-屬性-值)的設計和低代碼的處理方案(1)》中介紹了一些基本的EAV模型設計知識和基於Winform場景下低代碼(或者說無代碼)的一些實現思路,在本篇隨筆中,我們來分析一下這種針對通用業務,且只需定義就能構建業務模塊存儲和界面的解決方案,其中的數據查詢處理的操作。 ...
  • 對某個遠程伺服器啟用和設置NTP服務(Windows系統) 打開註冊表 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer 將 Enabled 的值設置為 1,這將啟用NTP伺服器功 ...
  • title: Django信號與擴展:深入理解與實踐 date: 2024/5/15 22:40:52 updated: 2024/5/15 22:40:52 categories: 後端開發 tags: Django 信號 松耦合 觀察者 擴展 安全 性能 第一部分:Django信號基礎 Djan ...
  • 使用xadmin2遇到的問題&解決 環境配置: 使用的模塊版本: 關聯的包 Django 3.2.15 mysqlclient 2.2.4 xadmin 2.0.1 django-crispy-forms >= 1.6.0 django-import-export >= 0.5.1 django-r ...
  • 今天我打算整點兒不一樣的內容,通過之前學習的TransformerMap和LazyMap鏈,想搞點不一樣的,所以我關註了另外一條鏈DefaultedMap鏈,主要調用鏈為: 調用鏈詳細描述: ObjectInputStream.readObject() DefaultedMap.readObject ...
  • 後端應用級開發者該如何擁抱 AI GC?就是在這樣的一個大的浪潮下,我們的傳統的應用級開發者。我們該如何選擇職業或者是如何去快速轉型,跟上這樣的一個行業的一個浪潮? 0 AI金字塔模型 越往上它的整個難度就是職業機會也好,或者說是整個的這個運作也好,它的難度會越大,然後越往下機會就會越多,所以這是一 ...
  • @Autowired是Spring框架提供的註解,@Resource是Java EE 5規範提供的註解。 @Autowired預設按照類型自動裝配,而@Resource預設按照名稱自動裝配。 @Autowired支持@Qualifier註解來指定裝配哪一個具有相同類型的bean,而@Resourc... ...