SparkSQL——用之惜之

来源:https://www.cnblogs.com/followees/archive/2018/04/22/8909859.html
-Advertisement-
Play Games

SparkSql作為Spark的結構化數據處理模塊,提供了非常強大的API,讓分析人員用一次,就會為之傾倒,為之著迷,為之至死不渝。在內部,SparkSQL使用額外結構信息來執行額外的優化。在外部,可以使用SQL和DataSet 的API與之交互。本文筆者將帶你走進SparkSql的世界,領略Spa ...


  SparkSql作為Spark的結構化數據處理模塊,提供了非常強大的API,讓分析人員用一次,就會為之傾倒,為之著迷,為之至死不渝。在內部,SparkSQL使用額外結構信息來執行額外的優化。在外部,可以使用SQL和DataSet 的API與之交互。本文筆者將帶你走進SparkSql的世界,領略SparkSql之諸多妙處。

一、DataSet和DataFrame

  當使用編程語言對結構化數據進行操作時候,SparkSql中返回的數據類型是DataSet/DataFrame,因此開篇筆者就先對這兩種數據類型進行簡單的介紹。

  Dataset 是分散式的數據集合。是Spark 1.6中添加的一個新介面,是特定域對象中的強類型集合,它可以使用函數或者相關操作並行地進行轉換等操作,數據集可以由JVM對象構造,然後使用函數轉換(map、flatmap、filter等)進行操作。Dataset 支持Scala和javaAPI,不支持Python API。

  DataFrame是由列組成的數據集,它在概念上等同於關係資料庫中的表或R/Python中的data frame,但在查詢引擎上進行了豐富的優化。DataFrame可以由各種各樣的源構建,例如:結構化數據文件、hive中的表、外部資料庫或現有的RDD。

二、SparkSQL基於DataFrame的操作

 

import org.apache.spark.sql.SparkSession
 2val spark = SparkSession
 3  .builder()
 4  .appName("Spark SQL basic example")
 5  .getOrCreate()
 6//引入Spark的隱式類型轉換,如將RDD轉換成 DataFrame
 7import spark.implicits._
 8val df = spark.read.json("/data/tmp/SparkSQL/people.json")
 9df.show() //將DataFrame的內容進行標準輸出
10//+---+-------+
11//|age|   name|
12//+---+-------+
13//|   |Michael|
14//| 19|   Andy|
15//| 30| Justin|
16//+---+-------+
17
18df.printSchema()  //列印出DataFrame的表結構
19//root
20// |-- age: string (nullable = true)
21// |-- name: string (nullable = true)
22
23df.select("name").show() 
24//類似於select name from DataFrame的SQL語句
25
26df.select($"name", $"age" + 1).show()
27//類似於select name,age+1 from DataFrame的SQL語句
28//此處註意,如果對列進行操作,所有列名前都必須加上$符號
29
30df.filter($"age" > 21).show()
31//類似於select * from DataFrame where age>21 的SQL語句
32
33df.groupBy("age").count().show()
34//類似於select age,count(age) from DataFrame group by age;
35
36//同時也可以直接寫SQL進行DataFrame數據的分析
37df.createOrReplaceTempView("people")
38val sqlDF = spark.sql("SELECT * FROM people")
39sqlDF.show()

  

 

三、SparkSQL基於DataSet的操作

  由於DataSet吸收了RDD和DataFrame的優點,所有可以同時向操作RDD和DataFrame一樣來操作DataSet。看下邊一個簡單的例子。

 1case class Person(name: String, age: Long)
 2// 通過 case類創建DataSet
 3val caseClassDS = Seq(Person("Andy", 32)).toDS()
 4caseClassDS.show()
 5// +----+---+
 6// |name|age|
 7// +----+---+
 8// |Andy| 32|
 9// +----+---+
10
11// 通過基本類型創建DataSet
12importing spark.implicits._
13val primitiveDS = Seq(1, 2, 3).toDS()
14primitiveDS.map(_ + 1).collect() 
15// Returns: Array(2, 3, 4)
16
17// 將DataFrames轉換成DataSet
18val path = "examples/src/main/resources/people.json"
19val peopleDS = spark.read.json(path).as[Person]
20peopleDS.show()
21// +----+-------+
22// | age|   name|
23// +----+-------+
24// |null|Michael|
25// |  30|   Andy|
26// |  19| Justin|
27// +----+-------+

  在上邊的例子中能夠發現DataSet的創建是非常簡單的,但是筆者需要強調一點,DataSet是強類型的,也就是說DataSet的每一列都有指定的列標識符和數據類型。下邊的列子將進一步介紹DataSet與RDD的交互。

 1import spark.implicits._
 2//將RDD轉換成DataFrame
 3val peopleDF = spark.sparkContext
 4  .textFile("examples/src/main/resources/people.txt")
 5  .map(_.split(","))
 6  .map(attributes=>Person(attributes(0),attributes(1).trim.toInt))
 7  .toDF()
 8// 將RDD註冊為一個臨時視圖
 9peopleDF.createOrReplaceTempView("people")
10//對臨時視圖進行Sql查詢
11val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
12
13// 對teenagersDF 對應的DataFrame進行RDD的運算元map操作
14teenagersDF.map(teenager => "Name: " + teenager(0)).show()
15// +------------+
16// |       value|
17// +------------+
18// |Name: Justin|
19// +------------+
20
21// 與上一條語句效果一樣
22teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
23// +------------+
24// |       value|
25// +------------+
26// |Name: Justin|
27// +------------+

  

 

四、SparkSQL操作HIve表

  Spark SQL支持讀取和寫入存儲在Apache HIVE中的數據。然而,由於Hive具有大量的依賴關係,預設情況下這些依賴性不包含在Spark分佈中。如果能在classpath路徑找到Hive依賴文件,Spark將自動載入它們。另外需要註意的是,這些Hive依賴項須存在於所有Spark的Worker節點上,因為它們需要訪問Hive序列化和反序列化庫(SerDes),以便訪問存儲在Hive中的數據。

1import java.io.File
  2import org.apache.spark.sql.{Row, SaveMode, SparkSession}
  3
  4case class Record(key: Int, value: String)
  5
  6// 設置hive資料庫預設的路徑
  7val warehouseLocation = new File("spark-warehouse").getAbsolutePath
  8
  9val spark = SparkSession
 10  .builder()
 11  .appName("Spark Hive Example")
 12  .config("spark.sql.warehouse.dir", warehouseLocation)
 13  .enableHiveSupport()
 14  .getOrCreate()
 15
 16import spark.implicits._
 17import spark.sql
 18
 19//創建hive表,導入數據,並且查詢數據
 20sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
 21sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
 22sql("SELECT * FROM src").show()
 23
 24// +---+-------+
 25// |key|  value|
 26// +---+-------+
 27// |238|val_238|
 28// | 86| val_86|
 29// |311|val_311|
 30// ...
 31
 32//對hive表數據進行聚合操作
 33sql("SELECT COUNT(*) FROM src").show()
 34// +--------+
 35// |count(1)|
 36// +--------+
 37// |    500 |
 38// +--------+
 39
 40// sql執行的查詢結果返回DataFrame類型數據,支持常用的RDD操作
 41val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
 42val stringsDS = sqlDF.map {
 43  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
 44}
 45stringsDS.show()
 46// +--------------------+
 47// |               value|
 48// +--------------------+
 49// |Key: 0, Value: val_0|
 50// |Key: 0, Value: val_0|
 51// |Key: 0, Value: val_0|
 52// ...
 53
 54// 通過DataFrames創建一個臨時視圖val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
 55recordsDF.createOrReplaceTempView("records")
 56
 57// 查詢操作可以將臨時的視圖與HIve表中數據進行關聯查詢
 58sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
 59// +---+------+---+------+
 60// |key| value|key| value|
 61// +---+------+---+------+
 62// |  2| val_2|  2| val_2|
 63// |  4| val_4|  4| val_4|
 64// |  5| val_5|  5| val_5|
 65// ...
 66
 67// 創建一個Hive表,並且以parquet格式存儲數據
 68sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
 69// 講DataFrame中數據保存到Hive表裡
 70val df = spark.table("src")
 71df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
 72sql("SELECT * FROM hive_records").show()
 73// +---+-------+
 74// |key|  value|
 75// +---+-------+
 76// |238|val_238|
 77// | 86| val_86|
 78// |311|val_311|
 79// ...
 80
 81// 在指定路徑創建一個Parquet文件並且寫入數據
 82val dataDir = "/tmp/parquet_data"
 83spark.range(10).write.parquet(dataDir)
 84// 創建HIve外部表
 85sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
 86sql("SELECT * FROM hive_ints").show()
 87// +---+
 88// |key|
 89// +---+
 90// |  0|
 91// |  1|
 92// |  2|
 93// ...
 94
 95// Turn on flag for Hive Dynamic Partitioning
 96spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
 97spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
 98// 通過DataFrame的API創建HIve分區表
 99df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
100sql("SELECT * FROM hive_part_tbl").show()
101// +-------+---+
102// |  value|key|
103// +-------+---+
104// |val_238|238|
105// | val_86| 86|
106// |val_311|311|
107// ...
108
109spark.stop()

  當然SparkSql的操作遠不止這些,它可以直接對文件快執行Sql查詢,也可以通過JDBC連接到關係型資料庫,對關係型資料庫中的數據進行一些運算分析操作。如果讀者感覺不過癮,可以留言與筆者交流,也可以通過Spark官網查閱相關例子進行學習。下一篇關於Spark的文章,筆者將詳細的介紹Spark的常用運算元,以滿足渴望進行數據分析的小伙伴們的求知的欲望。

 

 

更多精彩內容,歡迎掃碼關註以下微信公眾號:大數據技術宅。大數據、AI從關註開始

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、硬碟介面類型 硬碟的介面主要有IDE、SATA、SCSI 、SAS和光纖通道等五種類型。其中IDE和SATA介面硬碟多用於家用產品中,也有部分應用於伺服器,SATA是一種新生的硬碟介面類型,已經取代了大部分IDE介面應用。SCSI 、SAS主要應用於伺服器上,普通家用設備一般不支持SCSI和SA ...
  • 一、淺談id、whoami、su、chage 本篇是續寫上一篇<Linux 用戶篇——用戶管理命令之useradd、passwd、userdel、usermod>。 (1)id命令 命令格式:id username(用戶名) 命令解釋:查看用戶的UID(用戶ID)、GID(組ID)。 (2)whoa ...
  • 我的系統是unbuntu14.04,我先是按照官方教程的安裝,後來也百度了一點別人的教程,算是一個雜燴。 註意,為什麼要使用privoxy? 因為如果不使用的話,就是全局代理,使用全局代理會使所有的連接通過shadowsocks伺服器中轉,一般不建議使用全局代理。另外,gnome桌面的代理設置無法正 ...
  • 7.1 關機&重啟命令 基本介紹: shutdown -h now 立刻進行關機 shutdown -h 1 1分鐘後關機 shutdown -r now 現在重啟電腦 halt 關機,作用和上面一樣 reboot 重啟 sync 把記憶體的數據同步到磁碟 註意細節: 不管是重啟系統還是關閉系統,首 ...
  • Xshell5和Xftp5的安裝包 鏈接:https://pan.baidu.com/s/1q3-ch75TW3lvC3KX25klNQ 密碼:m31n 說明: 公司開發的時候,具體情況是這樣的: 1、linux伺服器是開發小組共用的; 2、正式上線的項目是運行在公網的; 3、因此程式員需要遠程登錄 ...
  • 結構化異常處理(**structured exception handling**,下文簡稱:**SEH**),是作為一種系統機制引入到操作系統中的,本身與語言無關。在我們自己的程式中使用**SEH**可以讓我們集中精力開發關鍵功能,而把程式中所可能出現的異常進行統一的處理,使程式顯得更加簡潔且增加... ...
  • 本文目錄:1.update語句2.delete語句 2.1 單表刪除 2.2 多表刪除3.truncate table 1.update語句 update用於修改表中記錄。 先簡單介紹下各子句和關鍵字相關的功能,後文將詳細解釋它們。 low_priority只對使用表級鎖的存儲引擎有效(如MyISA ...
  • MySQL 8 正式版 8.0.11 已發佈,官方表示 MySQL 8 要比 MySQL 5.7 快 2 倍,還帶來了大量的改進和更快的性能! 以下為本人2018.4.23日安裝過程的記錄。整個過程大概需要一個小時,make && make install過程需要的時間較長。 一.環境 CentOS ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...