大數據Hadoop之——數據採集存儲到HDFS實戰(Python版本)

来源:https://www.cnblogs.com/liugp/archive/2022/05/24/16307510.html
-Advertisement-
Play Games

要實現這個示例,必須先安裝好hadoop和hive環境,環境部署可以參考我之前的文章: 大數據Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce) 大數據Hadoop之——數據倉庫Hive 【流程圖如下】 【示例代碼如下】 #!/usr/bin/env python # - ...


要實現這個示例,必須先安裝好hadoop和hive環境,環境部署可以參考我之前的文章:
大數據Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce)
大數據Hadoop之——數據倉庫Hive

【流程圖如下】

【示例代碼如下】

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author   : liugp
# @File     : Data2HDFS.py

"""
# pip install sasl可能安裝不成功
pip install sasl
# 可以選擇離線安裝
https://www.lfd.uci.edu/~gohlke/pythonlibs/#sasl
pip install sasl-0.3.1-cp37-cp37m-win_amd64.whl

pip install thrift
pip install thrift-sasl
pip install pyhive
pip install hdfs
"""
from selenium import webdriver
from pyhive import hive
from hdfs import InsecureClient

class Data2HDFS:
    def __init__(self):
        # 第一個步,連接到hive
        conn = hive.connect(host='192.168.0.113', port=11000, username='root', database='default')
        # 第二步,建立一個游標
        self.cursor = conn.cursor()

        self.fs = InsecureClient(url='http://192.168.0.113:9870/', user='root', root='/')

    """
    採集數據
    """
    def collectData(self):
        try:
            driver = webdriver.Edge("../drivers/msedgedriver.exe")
            # 爬取1-3頁數據,可自行擴展
            id = 1
            local_path = './data.txt'
            with open(local_path, 'w', encoding='utf-8') as f:
                for i in range(1, 2):
                    url = "https://ac.qq.com/Comic/index/page/" + str(i)
                    driver.get(url)
                    # 模擬滾動
                    js = "return action=document.body.scrollHeight"
                    new_height = driver.execute_script(js)
                    for i in range(0, new_height, 10):
                        driver.execute_script('window.scrollTo(0, %s)' % (i))
                    list = driver.find_element_by_class_name('ret-search-list').find_elements_by_tag_name('li')
                    data = []
                    for item in list:
                        imgsrc = item.find_element_by_tag_name('img').get_attribute('src')
                        author = item.find_element_by_class_name("ret-works-author").text
                        leixing_spanlist = item.find_element_by_class_name("ret-works-tags").find_elements_by_tag_name(
                            'span')
                        leixing = leixing_spanlist[0].text + "," + leixing_spanlist[1].text
                        neirong = item.find_element_by_class_name("ret-works-decs").text
                        gengxin = item.find_element_by_class_name("mod-cover-list-mask").text

                        itemdata = {"id": str(id), 'imgsrc': imgsrc, 'author': author, 'leixing': leixing, 'neirong': neirong,
                                    'gengxin': gengxin}
                        print(itemdata)
                        line = itemdata['id'] +"," + itemdata['imgsrc'] +"," + itemdata['author'] + "," + itemdata['leixing'] + "," + itemdata['neirong'] + itemdata['gengxin'] + "\n"
                        f.write(line)
                        id+=1
                    data.append(itemdata)
            # 上傳文件,
            d2f.uplodatLocalFile2HDFS(local_path)

        except Exception as e:
            print(e)

    """創建hive表"""
    def createTable(self):
        # 解決hive表中文亂碼問題
        """
        mysql -uroot -p
        use hive資料庫

        alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
        alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
        alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
        alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
        alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
        commit;
        :return:
        """
        self.cursor.execute("CREATE TABLE  IF NOT EXISTS default.datatable (\
        id INT COMMENT 'ID',\
        imgsrc STRING COMMENT 'img src',\
        author STRING COMMENT 'author',\
        leixing STRING COMMENT '類型',\
        neirong STRING COMMENT '內容',\
        gengxin STRING COMMENT '更新'\
        )\
        ROW FORMAT DELIMITED\
        FIELDS TERMINATED BY ','\
        COLLECTION ITEMS TERMINATED BY '-'\
        MAP KEYS TERMINATED BY ':'\
        LINES TERMINATED BY '\n'")

    """
    將本地文件推送到HDFS上
    """
    def uplodatLocalFile2HDFS(self, local_path):
        hdfs_path = '/tmp/test0508/'
        self.fs.makedirs(hdfs_path)
        # 如果文件存在就必須先刪掉
        self.fs.delete(hdfs_path + '/' + local_path)
        print(hdfs_path, local_path)
        self.fs.upload(hdfs_path, local_path)

    """
    將HDFS上的文件load到hive表
    """
    def data2Hive(self):
        # 先清空表
        self.cursor.execute("truncate table datatable")
        # 載入數據,這裡的路徑就是HDFS上的文件路徑
        self.cursor.execute("load data inpath '/tmp/test0508/data.txt' into table datatable")
        self.cursor.execute("select * from default.datatable")
        print(self.cursor.fetchall())

if __name__ == "__main__":
    d2f = Data2HDFS()
    # 收集數據
    d2f.collectData()
    # 創建hive表
    # d2f.createTable()
    # 將數據存儲到HDFS
    d2f.data2Hive()

【溫馨提示】hiveserver2的預設埠是10000,我是上面寫的11000埠,是因為我配置文件里修改了,如果使用的是預設埠,記得修改成10000埠,還有就是修改成自己的host地址。這個只是一種實現方式,還有其它方式。

如果小伙伴有疑問的話,歡迎給我留言,後續會更新更多關於大數據的文章,請耐心等待~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我們平時在 Linux 中使用 cp 命令時,當把文件從一個目錄複製到另一個目錄,且目錄中具有同名文件時,系統會提示輸入 y 來確認是否覆蓋同名文件。 如果文件少的話,也無關緊要,但文件多的話,要一個一個確認簡直太累了。更要命的是,即使我們加了 -rf 參數,還是會提示。 為什麼會這樣呢? 原因 原 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 群暉NAS設置IPV6公網訪問 最近入手了一個群暉nas,記錄下設置公網訪問的過程。 NAS:群暉NAS220+ 路由器:小米AX3600 1、打開路由器上的IPV6功能。 現在路由器預設的還是使用IPV4,IPV6還是需要手動打開的,再去nas ...
  • 解決 Win10 Wsl2 IP 變化問題(2021.2.10) Win10 Wsl2 的 IP 地址每次重啟後都會變化,如果經常需要在 Win10 訪問 Wsl2 內的服務的話會比較麻煩,因此筆者尋找一種解決方案併在此記錄。 1. 產生環境 WSL2; Ubuntu 20.04 focal(on ...
  • echo echo 命令是 Linux bash 和 C shell中最常用的內置命令之一,通常用於腳本語言和批處理文件,用於標準輸出以及顯示文本內容等。echo命令在生產環境腳本中還是使用的非常多的,很多時候都要查看腳本執行是否正常,以及腳本執行到哪裡,都是通過echo命令來列印來定位 。 在寫腳 ...
  • 1、作業控制技巧 Bash環境中通過命令運行一個進程的時候,使【&】 符可以使改進程進入後臺 (base) [root@localhost ~]# sh test.sh & [1] 46963 (base) [root@localhost ~]# 將該進程放入後臺並暫停執行 Ctrl+z (base ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 準備工作:兩個U盤,一個大的作為系統盤,一個小的作為引導盤。 U盤分區 為什麼分盤 我們將u盤作為啟動盤之後,u盤文件不易區分整理,萬一不小心刪除了啟動盤的文件就不好了,所以我們可以將u盤一分為二,一部分作為啟動盤,另一部分作為讀寫盤,這樣就很合 ...
  • chmod怎麼用,Linux文件許可權管理 本文翻譯自Linux官網的Linux入門文章《File Permissions - chmod》,其中一些部分自作主張做了些修改 原文鏈接:File Permissions - chmod 原文 導言 Linux從UNIX繼承了文件所有權和許可權的觀念。這是因 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 1. 問題描述 電腦上成功安裝VMware虛擬機後,安裝Ubuntu系統。Ubuntu系統無法聯網,多方檢查發現問題:宿主機的網路連接中沒有VMware Network Adapter VMnet1和VMware Network Adapter ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...