RDD的轉換操作,分三種:單value,雙value交互,(k,v)對

来源:https://www.cnblogs.com/liangyan131/archive/2019/12/10/12019351.html
-Advertisement-
Play Games

import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext} object Transformation { def main(args: Array[String]): U ...


import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Transformation {

  def main(args: Array[String]): Unit = {

    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Transformation")

    val sc = new SparkContext(config)

    val listRDD = sc.makeRDD(1 to 10)
    val listRDD2 = sc.makeRDD(Array(List(1, 2), List(3, 4)))
    val listRDD3 = sc.makeRDD(5 to 14)

    /***************************單value*****************************/

    /**
      * map(func)
      * 每次處理1條數據
      */

//    val mapRDD = listRDD.map(_ * 2)

    /**
      * mapPartitions(func)
      * 每次處理一組分區數據,效率高,但可能出現記憶體溢出(因為處理完一組分區後再釋放)
      */

//     val mapPartitionsRDD = listRDD.mapPartitions(datas=>{
//       datas.map(data => data * 2)
//     })


    /**
      * mapPartitionsWithIndex(func)
      * 函數的輸入多了分區號
      */

//    val tupleRDD: RDD[(Int, String)] = listRDD.mapPartitionsWithIndex {
//      case (num, datas) => {
//        datas.map((_, " 分區號:" + num))
//      }
//    }

    /**
      *  flatMap(func)
      *  將map後的數據扁平
      */

//    val flatMAPRDD: RDD[Int] = listRDD2.flatMap(datas => datas)

    /**
      * glom()
      * 將一個分區的數據放在一個數組裡
      */

//    val glomRDD: RDD[Array[Int]] = listRDD.glom()

    /**
      * groupBy(func)
      * 按照函數的返回值進行分組,分組後的數據(K:分組的key,V:分組的集合)
      */

//    val groupByRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(i => i%2)
//    groupByRDD.collect().foreach(println)

    /**
      * filter(func)
      * 按照返回值為true的過濾
      */

//    val filterRDD: RDD[Int] = listRDD.filter(x => x % 2 ==0)
//    filterRDD.collect().foreach(println)

    /**
      * sample(withReplacement : scala.Boolean, fraction : scala.Double, seed : scala.Long)
      * 隨機抽樣
      */

//    val sampleRDD: RDD[Int] = listRDD.sample(false, 0.4, 1)
//    sampleRDD.collect().foreach(println)

    /**
      * distinct()
      * 去重,且去重後會shuffler,可以指定去重後的分區數
      */

//    val distinctRDD: RDD[Int] = listRDD.distinct()
//    distinctRDD.collect().foreach(println)

    /**
      * coalesce(n)
      * 縮減分區的數量,可以簡單的理解為合併分區,預設,沒有shuffler,可以加參數true指定shuffler
      */

//    println("縮減分區前 = " + listRDD.partitions.size)
//    val coalesceRDD: RDD[Int] = listRDD.coalesce(2)
//    println("縮減分區前 = " + coalesceRDD.partitions.size)

    /**
      * repartition()
      * 重新分區,有shuffler。它其實就是帶true的coalesce
      */

//    listRDD.glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })
//    val repartitionRDD: RDD[Int] = listRDD.repartition(2)
//    repartitionRDD.glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * sortBy(f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length))
      * 根據函數排序
      */

//    val sortByRDD: RDD[Int] = listRDD.sortBy(n => n % 2, false)
//    sortByRDD.collect().foreach(println)

    /**************************雙value交互*****************************/

    /**
      * 雙value交互
      * A.union(B)         對A、B合併。(不去重)
      * A.subtract(B)      對A減去和B中的相同的
      * A.cartesian(B)     對A、B求笛卡爾乘積
      * A.zip(B)           將A、B組成(k,v),個數、分區數要相等
      * A.union(B) 對A、B求並集
      */

//    listRDD.union(listRDD3).collect().foreach(println)
//    listRDD.subtract(listRDD3).collect().foreach(println)
//    listRDD.intersection(listRDD3).collect().foreach(println)
//    listRDD.cartesian(listRDD3).collect().foreach(println)
//    listRDD.zip(listRDD3).collect().foreach(println)


    /**************************(k,v)對*******************************/

    val pairRDD1: RDD[(Int, String)] = sc.parallelize(Array((1, "aa"), (1, "bb"), (3, "cc"), (3, "dd")),  4)
    val pairRDD2: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4),
                                                            ("b", 3), ("c", 6), ("c", 8)),  2)
    val pairRDD3: RDD[(Int, String)] = sc.parallelize(Array((1, "zzz"), (3, "xxx")))

    /**
      * partitionBy(partitioner: Partitioner)
      * 按照分區器進行分區
      */

//    pairRDD1.partitionBy(new org.apache.spark.HashPartitioner(2))
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

//    pairRDD1.partitionBy(new MyPartitioner(3))
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * groupByKey()
      * 單純把key相等的value放在一起,生成序列
      */
//    pairRDD1.groupByKey().collect().foreach(println)


    /**
      * reduceByKey(func)
      * 按key聚合,並且按函數對key相等的value進行操作
      */

//    pairRDD1.reduceByKey(_ + _)
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })


    /**
      * aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
      * zeroValue:每個分區的每一個key的初始值
      * seqOp:每個分區里的聚合函數
      * seqOp:分區間的聚合函數
      */


    // 取出每個分區相同對key的最大值,在相加
//    pairRDD2.aggregateByKey(0)(math.max(_,_), _+_)
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * foldByKey(zeroValue: V)(func: (V, V) => V)
      * 其實就是aggregateByKey的簡化版,seqOp和seqOp相同
      */

//    pairRDD2.foldByKey(0)(_ + _)
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * combineByKey[C](
      * createCombiner: V => C,
      * mergeValue: (C, V) => C,
      * mergeCombiners: (C, C) => C,
      * partitioner: Partitioner,
      * mapSideCombine: Boolean = true,
      * serializer: Serializer = null)
      *
      * 主要就是比aggregateByKey多了一個createCombiner,用於計算初始值
      */

    // 計算相同key的value的均值
//    pairRDD2.combineByKey(
//      (_, 1),
//      (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
//      (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
//      .map{case (key, value) => (key, value._1 / value._2.toDouble)}
//      .collect().foreach(println)

    /**
      * sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      * 按key排序
      */

//    pairRDD1.sortByKey(true)
//      .collect().foreach(println)


    /**
      * mapValues(func)
      * 只對value做轉換
      */

//    pairRDD1.mapValues(value => value + "|||")
//      .collect().foreach(println)

    /**
      * A.join(B, numP)
      * 把key相同的value組合在一起(性能較低)
      */

//    pairRDD1.join(pairRDD3)
//      .collect().foreach(println)

    /**
      * A.cogroup(B)
      * (k, v1) 和 (k, v2)cogroup 後,得到(k, v1集合,v2集合)
      */

    pairRDD1.cogroup(pairRDD3)
      .collect().foreach(println)

    sc.stop()

  }
}

// 自定義分區器
class MyPartitioner (partitions: Int) extends Partitioner {
  override def numPartitions: Int = {
    partitions
  }

  override def getPartition(key: Any): Int = {
    1
  }
}

  //只寫代碼不讓我發出來--忽略這一行


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • '''''' ''' 1、簡述面向對象三大特性並用示例解釋說明?【背寫】 1、封裝 狹義的封裝:把一組屬性封裝到一個對象,創建對象的時候 廣義的封裝:代碼塊,函數、對象、類、模塊-py文件都是封裝 把封裝後的對象看成一個黑盒子,只需要關註輸入和輸出,不必關註黑盒子內部的實現 2、繼承 1、避免代..... ...
  • 今天在使用go與php的AES加解密交互中,一直有個問題那就是在go中加密後,在php端始終都是無法解密,經過排查最後發現是加密key長度引起的問題, 這裡簡單記錄下。 go的AES使用的是第三方的庫, "openssl" ,因為用的匆忙,沒註意看文檔,所以就直接弄了示例代碼,才發現和php端無法解 ...
  • 此模式通過一個模板方法來定義程式的框架或演算法,通常模板方法定義在基類中,即原始的模板,然後子類就可以根據不同的需要實現或重寫模板方法中的某些演算法步驟或者框架的某部分,最後達到使用相同模板實現不同功能的效果。 核心思想: 使用一個模板方法定義好總的演算法框架。 子類中根據需要重新定義某些操作,但是不能修 ...
  • 有個需求,從某個介面下載的一個zip壓縮包,往裡面添加一個說明文件。搜索了一下,沒有找到往zip直接添加文件的方法,最終解決方法是先解壓、再壓縮。具體過程如下: ...
  • 屬性 語法格式:修飾符 類型 屬性名 = 初值; 說明: 修飾符:public、protected、private:用於表示成員變數的訪問許可權。static:表示該成員變數為類變數,也稱為靜態變數。final:表示將該成員變數聲明為常量,其值無法更改。 類型:表示變數的類型。 屬性名:表示變數名稱。 ...
  • Java類的初始化順序 多說無益,以下是本人親自試驗的代碼,一目瞭然: 1 package test1; 2 3 public class Test { 4 public static void main(String[] argc) { 5 new Child(); 6 System.out.pr ...
  • composer install thinkphp6 報錯 Parse error: syntax error, unexpected ':', expecting '{' in vendor\topthink\think-helper\src\helper.php on line 233 ...
  • 前言本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。作者:萬能搜吧 都是copy的百度SDK文檔,簡單說說怎麼用。 1、沒安裝Python的參見此文:Python學習筆記系列 1 ——安裝調試Python開發軟體 2、win+r輸 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...