Spark: 單詞計數(Word Count)的MapReduce實現(Java/Python)

来源:https://www.cnblogs.com/orion-orion/archive/2022/05/26/16314837.html
-Advertisement-
Play Games

我們在上一篇博客中學習瞭如何用Hadoop-MapReduce實現單詞計數,現在我們來看如何用Spark來實現同樣的功能。Spark框架也是MapReduce-like模型,採用“分治-聚合”策略來對數據分佈進行分佈並行處理。不過該框架相比Hadoop-MapReduce,具有以下兩個特點:對大數據... ...


1 導引

我們在博客《Hadoop: 單詞計數(Word Count)的MapReduce實現 》中學習瞭如何用Hadoop-MapReduce實現單詞計數,現在我們來看如何用Spark來實現同樣的功能。

2. Spark的MapReudce原理

Spark框架也是MapReduce-like模型,採用“分治-聚合”策略來對數據分佈進行分佈並行處理。不過該框架相比Hadoop-MapReduce,具有以下兩個特點:

  • 對大數據處理框架的輸入/輸出,中間數據進行建模,將這些數據抽象為統一的數據結構命名為彈性分散式數據集(Resilient Distributed Dataset),併在此數據結構上構建了一系列通用的數據操作,使得用戶可以簡單地實現複雜的數據處理流程。

  • 採用了基於記憶體的數據聚合、數據緩存等機制來加速應用執行尤其適用於迭代和互動式應用。

Spark社區推薦用戶使用Dataset、DataFrame等面向結構化數據的高層API(Structured API)來替代底層的RDD API,因為這些高層API含有更多的數據類型信息(Schema),支持SQL操作,並且可以利用經過高度優化的Spark SQL引擎來執行。不過,由於RDD API更基礎,更適合用來展示基本概念和原理,後面我們的代碼都使用RDD API。

Spark的RDD/dataset分為多個分區。RDD/Dataset的每一個分區都映射一個或多個數據文件, Spark通過該映射讀取數據輸入到RDD/dataset中。

Spark的分區數和以下參數都有關係:

  • spark.default.parallelism (預設為CPU的核數)

  • spark.sql.files.maxPartitionBytes (預設為128 MB)讀取文件時打包到單個分區中的最大位元組數)

  • spark.sql.files.openCostInBytes (預設為4 MB) 該參數預設4M,表示小於4M的小文件會合併到一個分區中,用於減小小文件,防止太多單個小文件占一個分區情況。這個參數就是合併小文件的閾值,小於這個閾值的文件將會合併。

我們下麵的流程描述中,假設每個文件對應一個分區(實際上因為文件很小,導致三個文件都在同一個分區中,大家可以通過調用RDD對象的getNumPartitions()查看)。

Spark的Map示意圖如下:

Spark的Reduce示意圖如下:

3. Word Count的Java實現

項目架構如下圖:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ pom.xml
├─ src
│  ├─ main
│  │  └─ java
│  │     └─ WordCount.java
│  └─ test
└─ target

WordCount.java文件如下:

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;

public class WordCount {
	private static Pattern SPACE = Pattern.compile(" ");

	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: WordCount <intput directory> <output directory>");
			System.exit(1);
		}
        String input_path = args[0];
        String output_path = args[1];

		SparkSession spark = SparkSession.builder()
			.appName("WordCount")
			.master("local")
			.getOrCreate();

		JavaRDD<String> lines = spark.read().textFile(input_path).javaRDD();

		JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
		JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
		JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

		List<Tuple2<String, Integer>> output = counts.collect();

        String filePath = Paths.get(output_path, "result.txt").toString();
        BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
		for (Tuple2<?, ?> tuple : output) {
			out.write(tuple._1() + ": " + tuple._2() + "\n");
		}
		out.close();
        spark.stop();
	}
}

pom.xml文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.WordCount</groupId>
  <artifactId>WordCount</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>WordCount</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <!-- 集中定義版本號 -->
  <properties>
    <scala.version>2.12.10</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <project.timezone>UTC</project.timezone>
    <java.version>11</java.version>
    <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
    <site.plugin.version>3.7.1</site.plugin.version>
    <scalatest.version>3.1.2</scalatest.version>
    <scalatest-maven-plugin>2.0.0</scalatest-maven-plugin>
    <scala.maven.plugin.version>4.4.0</scala.maven.plugin.version>
    <maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version>
    <maven.javadoc.plugin.version>3.2.0</maven.javadoc.plugin.version>
    <maven.source.plugin.version>3.2.1</maven.source.plugin.version>
    <maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version>
    <nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
    <maven.help.plugin.version>3.2.0</maven.help.plugin.version>
    <maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
    <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <spark.version>3.2.1</spark.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--======SCALA======-->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
  </dependencies>


  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
          <configuration>
              <source>11</source>
              <target>11</target>
              <fork>true</fork>
              <executable>/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac</executable>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

記得配置輸入參數inputoutput代表輸入目錄和輸出目錄(在VSCode中在launch.json文件中配置)。編譯運行後可在output目錄下查看result.txt

Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1

可見成功完成了單詞計數功能。

4. Word Count的Python實現

先使用pip按照pyspark==3.8.2

pip install pyspark==3.8.2

註意PySpark只支持Java 8/11,請勿使用更高級的版本。這裡我使用的是Java 11。運行java -version可查看本機Java版本。

(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)

項目架構如下:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ src
│  └─ word_count.py

word_count.py編寫如下:

from pyspark.sql import SparkSession
import sys
import os
from operator import add

if len(sys.argv) != 3:
    print("Usage: WordCount <intput directory> <output directory>", file=sys.stderr)
    exit(1)
     
input_path, output_path = sys.argv[1], sys.argv[2]

spark = SparkSession.builder.appName("WordCount").master("local").getOrCreate()

lines = spark.read.text(input_path).rdd.map(lambda r: r[0])

counts = lines.flatMap(lambda s: s.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(add)

output = counts.collect()

with open(os.path.join(output_path, "result.txt"), "wt") as f:
    for (word, count) in output:
        f.write(str(word) +": " + str(count) + "\n")

spark.stop()

使用python word_count.py input output運行後,可在output中查看對應的輸出文件result.txt

Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1

可見成功完成了單詞計數功能。

參考

數學是符號的藝術,音樂是上界的語言。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.Docker基本介紹 Docker就是虛擬化的一種輕量級替代技術,基於Go語言的開源應用容器引擎。Docker的容器技術不依賴任何語言、框架或系統,可以將應用程式變成一種標準化的、可移植的、自管理的組件,並脫離伺服器硬體在任何主流系統中開發、調試和運行。 光看這個介紹還不足以知道Docker是什 ...
  • Linux查看系統硬體信息(2021.06.22) 1. CPU # 查看 cpu 的統計信息 $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 64 On-li ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 在VMware上搭建docker的時候報了Failed to start docker.service: Unit not found。查看了好多 博主的分享,但是因為圖片有限,不能確定是否問題一樣,查到這位博主的時候眼前一亮,一毛一樣啊!並且博 ...
  • 本篇關鍵詞:內核重定位、MMU、SVC棧、熱啟動、內核映射表 內核彙編相關篇為: v74.01 鴻蒙內核源碼分析(編碼方式) | 機器指令是如何編碼的 v75.03 鴻蒙內核源碼分析(彙編基礎) | CPU上班也要打卡 v76.04 鴻蒙內核源碼分析(彙編傳參) | 如何傳遞複雜的參數 v77.01 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 部署rocketmq和可視化客戶端 一、 伺服器資源 服務名稱:Linux伺服器 IP:[請查看資源分配文檔] 操作系統:CentOS 7.8 x64 二、rocketmq安裝 2.1下載 下載地址:rocketmq.apache.org/dow ...
  • tree Linux tree命令用於以樹狀圖列出目錄的內容。 執行tree指令,它會列出指定目錄下的所有文件,包括子目錄里的文件。 語法 tree [-aACdDfFgilnNpqstux][-I <範本樣式>][-P <範本樣式>][目錄...] 參數說明: - -a 顯示所有文件和目錄。 - ...
  • 一、ZooKeeper概述 Apache ZooKeeper 是一個集中式服務,用於維護配置信息、命名、提供分散式同步和提供組服務,ZooKeeper 致力於開發和維護一個開源伺服器,以實現高度可靠的分散式協調,其實也可以認為就是一個分散式資料庫,只是結構比較特殊,是樹狀結構。官網文檔:https: ...
  • Explain簡介 MySQL優化器在基於成本的計算和基於規則的SQL優化會生成一個所謂的執行計劃,我們就可以使用執行計劃查看MySQL對該語句具體的執行方式。 介紹這個好啰嗦就是了,我們可以通過這個優化器展示的執行計劃,查看優化器對我們的SQL進行優化的步驟,連接轉換成單表訪問時的優化。以及對於之 ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...