FastDFS併發問題的排查經歷

来源:https://www.cnblogs.com/heavenTang/archive/2023/03/28/17266440.html
-Advertisement-
Play Games

附件用的fastdf上傳和下載的, 本地開發時就沒考慮過多文件上傳就會有併發的問題,比如多個只上傳成功了一個或者上傳了但是文檔內容缺失了,變成0位元組。 呵。。都是一次難忘的經歷。 經過本地模擬大批量的上傳下載, 發現fastdf是在啟動時就初始化了tracker和stroge, 每次調用過他的介面後 ...


附件用的fastdf上傳和下載的, 本地開發時就沒考慮過多文件上傳就會有併發的問題,比如多個只上傳成功了一個或者上傳了但是文檔內容缺失了,變成0位元組。

呵。。都是一次難忘的經歷。

經過本地模擬大批量的上傳下載, 發現fastdf是在啟動時就初始化了tracker和stroge, 每次調用過他的介面後都會關閉連接, 這樣就導致上傳的不完整或者不成功。也是後面找的博客看到的,非常感謝這篇文章。https://blog.csdn.net/AFSGEFEGH/article/details/109034532?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-3-109034532-blog-114929991.235^v27^pc_relevant_multi_platform_whitelistv3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-3-109034532-blog-114929991.235^v27^pc_relevant_multi_platform_whitelistv3&utm_relevant_index=6

記得方法上面加上synchronized

點擊查看代碼

	@RequestMapping(value = "/batchDownloadForThesisCheck2", method = RequestMethod.POST)
	public synchronized void batchDownloadForThesisCheck2(@RequestBody List<FileInfoDto> fileInfoList)  {
		if(CollectionUtils.isEmpty(fileInfoList)){
			throw new EducationException("下載附件失敗");
		}
		List<FileInfoDto> fileInfoDtos = differentFileName(fileInfoList);
//		List<FileInfoDto> fileInfoDtos = new ArrayList<>();

//		for(int fi=0;fi<300;fi++){
//			FileInfoDto it = new FileInfoDto();
//			it.setFileName("20210115_小魚兒"+fi+"_jjlw.doc");
//			fileInfoDtos.add(it);
//		}
		String zipName = request.getParameter("zipName");
		if(StringUtils.isEmpty(zipName)) zipName = "批量下載";
		ZipOutputStream zipOS = null ;
		InputStream is = null;

		OutputStream os = null;
		// 計算百分值
		int index =1;
		int totalSize =CollectionUtils.isNotEmpty(fileInfoDtos) ? fileInfoDtos.size():1;


		try {
			response.setContentType("application/octet-stream; charset=UTF-8");
			response.setHeader("Access-Control-Expose-Headers", "fileName");
			response.setHeader("fileName", URLEncoder.encode(zipName, "UTF-8"));
			os = response.getOutputStream();
			zipOS = new ZipOutputStream(os);
			for (FileInfoDto info : fileInfoDtos) {
				// 機檢論文換名字,學號_姓名_jjlw命名
				String itemFileName= info.getFileName();
//				itemFileName = "S20020804005_陳明鑫_jjlw .docx";
				int secondShowIndex = Common.findNumber(itemFileName,"_",2);
				int firstShowIndex = Common.findNumber(itemFileName,"_",1);
				if("1".equals(info.getPaperToName())){
					// 文件格式1:學校代碼_學號_LW.doc 2:學號_姓名_jjlw
					itemFileName = "10356_"+itemFileName.substring(0,firstShowIndex)+"_LW"+itemFileName.substring(itemFileName.lastIndexOf("."),itemFileName.length());
				}else{
					itemFileName = itemFileName.substring(0,secondShowIndex)+"_jjlw"+itemFileName.substring(itemFileName.lastIndexOf("."),itemFileName.length());
				}
				logger.error("已下載學生:{} "  ,itemFileName);
				zipOS.putNextEntry(new ZipEntry(itemFileName));
				try{
					is = fastDFS.downloadFile(info.getFileId());
//					is = fastDFS.downloadFile("group1/M00/00/C0/wKgjdWQYEJ-AbefsAdOyZXrKanw028.doc");
					int len = 0;
					byte[] buffer = new byte[1024*8];
					while ((len = is.read(buffer)) != -1) {
						zipOS.write(buffer, 0, len);
					}
//					is.close();

					// 計算進度,向下取整
					double nowProcess = Math.floor((index*100)/totalSize);
					logger.error("已下載”{}",index);
//					createProcessDownFile(nowProcess,"已下載"+nowProcess+"%",info.getTimeId(),info.getUserId());
					index ++;
				}catch(Exception ignored){

				}
				zipOS.flush();
//				zipOS.closeEntry();
			}
		} catch (IOException e) {
//			createProcessDownFile(100d,"批量下載發生錯誤",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
			logger.error("批量下載發生錯誤: " + e.getMessage(), e);
		} finally {
			try {
				if (zipOS != null) {
					zipOS.closeEntry();
					zipOS.close();
				}
				if (os != null) os.close();
//				createProcessDownFile(100d,"已下載100%",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
				logger.warn("關閉機檢下載 :"  );
			} catch (IOException e) {
//				createProcessDownFile(100d,"批量下載發生錯誤,關閉文件流失敗",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
				logger.warn("關閉文件流失敗, cause by :" + e.getMessage());

			}
//			finally {
//				createProcessDownFile(100d,"已下載100%",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
//				logger.warn("關閉機檢下載 :"  );
//			}

		}
	}

下麵是封裝的上傳FastDFS

點擊查看代碼
package fastdfs.config;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import javax.annotation.PostConstruct;

import org.apache.commons.lang.StringUtils;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.ClientGlobal;
import org.csource.fastdfs.ProtoCommon;
import org.csource.fastdfs.StorageClient1;
import org.csource.fastdfs.StorageServer;
import org.csource.fastdfs.TrackerClient;
import org.csource.fastdfs.TrackerGroup;
import org.csource.fastdfs.TrackerServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import com.xx.commons.exception.EducationException;

import lombok.extern.slf4j.Slf4j;



/**
 * FastDfs文件系統工具類
 * 連接Fast
 * 上傳圖片 
 * 返回上傳之後的路徑 用此路徑就能訪問此圖片
 *    group1/M00/00/01/wKjIgFWOYc6APpjAAAD-qk29i78248.jpg
 *
 */
@Slf4j
public class FastDFS {

    @Autowired
    private  FastDFSProperty fastDFSProperty;

    /**
     * 跟蹤器
     */
    private TrackerServer trackerServer;
    
    /**
     * 存儲器
     */
    private StorageServer storageServer;
    
    /**
     * 預設編碼
     */
    private static final String DEFAULT_ENCODING = "UTF-8";

    @PostConstruct
    public void init(){
        try {
            ClientGlobal.setG_charset(DEFAULT_ENCODING);
            ClientGlobal.setG_connect_timeout(fastDFSProperty.getConnect_timeout());
            ClientGlobal.setG_network_timeout(fastDFSProperty.getNetwork_timeout());
            ClientGlobal.setG_secret_key(fastDFSProperty.getSecret_key());
            ClientGlobal.setG_tracker_http_port(fastDFSProperty.getTracker_http_port());

            String tracker_server = fastDFSProperty.getTracker_server();
            InetSocketAddress isadd = new InetSocketAddress(
                    tracker_server.substring(0, tracker_server.indexOf(':')), 
                    Integer.parseInt(tracker_server.substring(tracker_server.indexOf(':') + 1, tracker_server.length())));
            InetSocketAddress[] tracker_servers  = {isadd};
            ClientGlobal.setG_tracker_group(new TrackerGroup(tracker_servers));

            TrackerClient trackerClient = new TrackerClient(ClientGlobal.g_tracker_group);
            trackerServer = trackerClient.getConnection();
            if (trackerServer == null) {
                throw new EducationException("getConnection return null");
            }
            storageServer = trackerClient.getStoreStorage(trackerServer);
            if (storageServer == null) {
                throw new EducationException("getStoreStorage return null");
            }
            ProtoCommon.activeTest(storageServer.getSocket());
        } catch (Exception e) {
            throw new EducationException("初始化 fastdfs 配置失敗", e);
        }
    }

    /**
     * 
     * @param file
     *            文件
     * @param fileName
     *            文件名
     * @return 返回Null則為失敗
     */
    public String uploadFile(File file, String fileName) {
        InputStream fis = null;
        try {
            NameValuePair[] meta_list = null; 
            fis = Files.newInputStream(file.toPath());
            byte[] file_buff = new byte[1024];
            int len = fis.available();
            file_buff = new byte[len];
            while (fis.read(file_buff) > 0) {
                break;
            }
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            String fileid = storageClient1.upload_file1(file_buff, getFileExt(fileName), meta_list);
            return fileid;
        } catch (Exception ex) {
            throw new EducationException("上傳文件錯誤",ex);
        } finally{
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    log.error("Close {} InputStream failed", fis);
                }
            }
        }
    }

    /**
     * 上傳文件
     * @param bytes
     * @param name
     * @param size
     * @return
     */
    public String uploadFile(byte[] bytes, String name, Long size) {
        try {
            //擴展名
            String ext = name.substring(name.lastIndexOf('.')+1);
            NameValuePair[] meta_list = new NameValuePair[3];
            meta_list[0] = new NameValuePair("filename",name);
            meta_list[1] = new NameValuePair("fileext",ext);
            meta_list[2] = new NameValuePair("filesize",String.valueOf(size));
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.upload_file1(bytes, ext, meta_list);
        } catch (Exception ex) {
            throw new EducationException("上傳文件錯誤",ex);
        }
    }

    /**
     * 根據組名和遠程文件名來刪除一個文件
     * 
     * @param groupName
     *            例如 "group1" 如果不指定該值,預設為group1
     * @param fileName
     *            例如"M00/00/00/wKgxgk5HbLvfP86RAAAAChd9X1Y736.jpg"
     * @return 0為成功,非0為失敗,具體為錯誤代碼
     */
    public int deleteFile(String groupName, String fileName) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.delete_file(StringUtils.isBlank(groupName)? "group1" : groupName, fileName);
        } catch (Exception ex) {
            throw new EducationException("return null",ex);
        }
    }

    /**
     * 根據fileId來刪除一個文件(我們現在用的就是這樣的方式,上傳文件時直接將fileId保存在了資料庫中)
     * 
     * @param fileId
     *            file_id源碼中的解釋file_id the file id(including group name and filename);例如 group1/M00/00/00/ooYBAFM6MpmAHM91AAAEgdpiRC0012.xml
     * @return 0為成功,非0為失敗,具體為錯誤代碼
     */
    public int deleteFile(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.delete_file1(fileId);
        } catch (Exception ex) {
            throw new EducationException("刪除文件錯誤",ex);
        }
    }

    /**
     * 修改一個已經存在的文件
     * 
     * @param oldFileId
     *            原來舊文件的fileId, file_id源碼中的解釋file_id the file id(including group name and filename);例如 group1/M00/00/00/ooYBAFM6MpmAHM91AAAEgdpiRC0012.xml
     * @param file
     *            新文件
     * @param filePath
     *            新文件路徑
     * @return 返回空則為失敗
     */
    public String modifyFile(String oldFileId, File file, String filePath) {
        String fileid = null;
        try {
            // 先上傳
            fileid = uploadFile(file, filePath);
            if (fileid == null) {
                return null;
            }
            // 再刪除
            int delResult = deleteFile(oldFileId);
            if (delResult != 0) {
                return null;
            }
        } catch (Exception ex) {
            throw new EducationException("修改一個已經存在的文件錯誤",ex);
        }
        return fileid;
    }

    /**
     * 文件下載
     * 
     * @param fileId
     * @return 返回一個流
     */
    public InputStream downloadFile(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            byte[] bytes = storageClient1.download_file1(fileId);
            return new ByteArrayInputStream(bytes);
        } catch (Exception ex) {
            throw new EducationException("文件下載錯誤",ex);
        }
    }

    /**
     * 
     * @param fileId
     * @return 返回一個位元組數組
     */
    public byte[] downloadFileToByte(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.download_file1(fileId);
        } catch (Exception ex) {
            throw new EducationException("文件下載錯誤",ex);
        }
    }

    /**  
     * 批量文件下載,在map中給出文件名:fileName, 文件對應路徑:filePath;   
     * 方法返回  由所有文件生成的壓縮包ZIP   
     * @param fileList 在map中給出文件名:fileName, 文件對應路徑:filePath;  
     * @return 返回一個流  
     */  
    public InputStream downloadFile(List<Map<String, String>> fileList){  
        if(CollectionUtils.isEmpty(fileList)){  
            throw new EducationException("文件下載錯誤,fileList為空!");  
        }  
        Date date = new Date();  
        long timeStr = date.getTime();  
        
        StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
        
        try(ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(Paths.get(".", timeStr+".zip")));) {  
              
            ZipEntry entry;  
            int count, bufferLen = 1024;    
            byte data[] = new byte[bufferLen];  
            for(Map<String, String> map : fileList){  
                entry = new ZipEntry(map.get("fileName"));  
                  
                zos.putNextEntry(entry);  
                byte[] bytes = storageClient1.download_file1(map.get("filePath"));  
                try(BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(bytes));){  
                    while ((count = bis.read(data, 0, bufferLen)) != -1) {    
                        zos.write(data, 0, count);    
                    }    
                    zos.closeEntry();  
                }  
            }  
            return Files.newInputStream(Paths.get(".", timeStr+".zip"));  
        } catch (Exception ex) {  
            throw new EducationException("文件下載錯誤", ex);  
        }  
    } 

    /**
     * 獲取文件尾碼名(不帶點).
     * 
     * @return 如:"jpg" or "".
     */
    private  String getFileExt(String fileName) {
        if (StringUtils.isBlank(fileName) || !fileName.contains(".")) {
            return "";
        } else {
            return fileName.substring(fileName.lastIndexOf('.') + 1); // 不帶最後的點
        }
    }
}

fastdf源碼中storageServer每次用完都會關閉

image


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 問題1 問題原因:在數據源配置類中沒有創建事務管理 在數據源配置類中添加好事務管理器的Bean即可 問題2 其實出現這個問題實質就是mapper介面和mapper.xml文件沒有映射起來。 常見的錯誤如下: 1.mapper.xml中的namespace和實際的mapper文件不一致 這個問題其實很 ...
  • order by是怎麼工作的? 在你開發應用的時候,一定會經常碰到需要根據指定的欄位排序來顯示結果的需求。還是以我們前面舉例用過的市民表為例,假設你要查詢城市是“杭州”的所有人名字,並且按照姓名排序返回前 1000 個人的姓名、年齡。 假設這個表的部分定義是這樣的: CREATE TABLE `t` ...
  • Spring Boot 應用,在啟動的時候,如果想做一些事情,比如預先載入並緩存某些數據,讀取某些配置等等。總而言之,做一些初始化的操作時,那麼 Spring Boot 就提供了兩個介面幫助我們實現。 ...
  • L2-001 緊急救援 分數 25 作為一個城市的應急救援隊伍的負責人,你有一張特殊的全國地圖。在地圖上顯示有多個分散的城市和一些連接城市的快速道路。每個城市的救援隊數量和每一條連接兩個城市的快速道路長度都標在地圖上。當其他城市有緊急求助電話給你的時候,你的任務是帶領你的救援隊儘快趕往事發地,同時, ...
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹 VLD 配置文件中配置項 ReportFile 的使用方法。 ...
  • 項目練習01 1.項目介紹 這是一個簡單的項目練習,用於掌握新學習的SpringBoot技術。 項目操作界面 ● 技術棧 Vue3+ElementPlus+Axios+MyBatisPlus+SpringBoot 前後端分離 前後端分離開發,前端主體框架 Vue3 + 後端基礎框架 SpringBo ...
  • 什麼是Base64 Base64編碼是將字元串以每3個8比特(bit)的位元組子序列拆分成4個6比特(bit)的位元組(6比特有效位元組,最左邊兩個永遠為0,其實也是8比特的位元組)子序列,再將得到的子序列查找Base64的編碼索引表,得到對應的字元拼接成新的字元串的一種編碼方式。 每個3位8比特數據拆分成 ...
  • 藍橋杯【答疑】 題目描述 分析 這是一個貪心演算法,要所得的時刻之和最小,而且下一個同學需要等上一個同學結束以後才能進行,因此需要對所耗總時間進行有小到大的排序,總時間相同的同學則對前兩步時間之和有小到大進行排序,最後算出時間之和即可。 代碼 import java.util.Arrays; impo ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...