<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 如何單機排序一個大數據文件? ## **問題來源:** 針對一個大文件,如何對里面的元素進行排序 ## **問題描述:** 24GTxt文件,每行1個大整數,1-100位不等 純JDK排序。 ## **解決方案:** > 程序流程 1. 源文件采用單線程NIO按行讀 2. 讀到的每一行入到隊列A 3. 開啟16個線程(根據CPU核數),去消費這個隊列 4. 消費之后,把數據寫入相關的文件待排序 5. 開啟8個線程并發排序每個待排序文件(讀進來,排序,寫) 6. 按文件名做合并 ***** > 經驗總結 1. 文件的讀取先要看清楚是按行還是按字節。 如果按行讀,不能用多線程,方法是讀1個BUFFERED,判斷結束是否是換行,如果不是,就按字節讀,一直讀到是換行為止,或者按BUFFERED讀,然后按換行截取,剩下的就拼在下一個BUFFERED的頭部。如果按字節讀,可以用多線程(RandomAccessFile 2. 讀和寫,最好設置緩存大小。16M剛好 3. Eclipse運行的java程序是獨立的JVM,如果內存不夠,可以加參數-Xms3072m -Xmx6072m 4. 遇到高并發自增,可以采用AtomicInteger 5. ByteBuffer.array() 返回的 array 長度為 ByteBuffer allocate的長度,并不是里面所含的內容的長度 ``` //這樣會導致,最后讀取的肯定不是allocate的長度,但是array返回的帶有上一次的冗余數據 //解決辦法如下,重新按照剩余容量來制作一個新的byte byte[] data; if(buffer.remaining() != buffer.capacity()){ data = new byte[buffer.remaining()]; buffer.get(data, 0, data.length); }else{ data = buffer.array(); } String content = new String(data); ``` 6.如果中斷線程池里面的線程 可以使用Pool.shutdown. 但是前提是線程里面有阻斷方法。如Sleep或者阻塞隊列等等。 7.對于阻塞隊列,入隊和出隊所占用的時間比較長,做實時性的性能差,因為阻塞涉及到加鎖 8.線程池不能設置setDaemon。如果線程池里面的線程讀守候,那線程就無法回收了。矛盾 9.同1時刻,1個CPU運行1個或者多個線程,如8核兩線程,那就是一共16個線程 ***** > 測試報告 * 運行結果 1. SSD 10分鐘跑完24G 2. 機械硬盤 80分鐘跑完24G * 程序啟動使用內存 | | 32位JDK啟動程序使用內存 | 64位JDK啟動程序使用 | |---|---|---| | -Xms1g | 11M | 5M | | -Xms1.1g | 12M | | | -Xms1.2g | 報錯 | | | -Xms2g | 報錯 | 10M | | -Xms3g | 報錯 | 15M | | -Xms5g | 報錯 | 25M | | -Xms6g | 報錯 | 30M | * BufferedWriter占用內存數(基于64位JDK,-Xms5g) ``` BufferedWriter bw = new BufferedWriter (new FileWriter(new File("D:\\temp\\bigdata\\des3g\\"+i+".txt")),內存大小); ``` * BufferedWriter 緩存 5M 每個對象大概占用10M | 創建對象數量 | 占用內存| |---|---| | 2 | 25M | | 3 | 35M | | 4 | 45M | | 500 | 1265M(GC) | * BufferedWriter 緩存 3M 每個對象大概占用6M | 創建對象數量 | 占用內存| |---|---| | 4 | 25M | | 5 | 31M | | 6 | 37M | | 500 | 507M 1265M (GC) | * BufferedWriter 緩存 1M 每個對象大概占用2M | 創建對象數量 | 占用內存| |---|---| | 12 | 25M | | 13 | 27M | | 14 | 29M | | 500 | 1006M(GC) | ***** > 程序代碼 * 排序代碼 ``` package com.bingo4; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class BigSort { /**************************************** 可配置項 ***********************************/ // 是否開啟內存監控,每2秒打印內存情況 public boolean isRamMonitor = false; // 待排序文件 public String SRC_DATA = "d://temp//bigdata/src/100m.txt"; // 排序完畢生成的文件地址 public String DES_DATA_PATH = "d://temp//bigdata//des//"; // 默認開啟,每1位數,就分發成10個待排序文件,如果待排序文件里面最大是60位數,就分發成600個待排序文件.源文件如果超過8G左右,必須開啟,否則后面單個文件做排序會導致內存溢出 // 如果關閉,每1位數,就分發成1個待排序文件,這個對于源文件不大的情況下,速度極快。 public boolean isDeliverTen = true; // 讀入待排序文件緩存 final static int BSIZE = 1024 * 1024 * 1; // 3M // 寫入數據區間文件緩存 final static int WRITE_SORT_BSIZE = 1024 * 1024 * 3; // 3M // 排序讀寫緩存 final static int SORT_READER_BSIZE = 1024 * 1024 * 1; // 5M final static int SORT_WRITE_BSIZE = 1024 * 1024 * 1; // 5M // 合并讀寫緩存 final static int MERGE_BSIZE = 1024 * 1024 * 2; // 5M // 分發數據線程大小 public static int DELIVER_DATA_QUEUE_SIZE = 16; // 每個數據區間監聽隊列的線程數, 這里設置為1,效率最高 public static int RANG_QUEUE_SIZE = 1; // 并發排序線程數 public static int SORT_THREAD_SIZE = 8; /**************************************** 可配置項 ***********************************/ public String DES_SORT_DATA_PATH = DES_DATA_PATH + "sort//"; public String MERGE_FILE = DES_DATA_PATH + "merge//merge.txt"; public String MERGE_FILE_PATH = DES_DATA_PATH + "merge//"; int cpuNums = Runtime.getRuntime().availableProcessors(); // 分發數據隊列 public ConcurrentLinkedQueue<String> deliverDataQueue = new ConcurrentLinkedQueue<String>(); // 分發數據線程的執行線程池 public ExecutorService deliverDataThreadES = Executors.newFixedThreadPool(DELIVER_DATA_QUEUE_SIZE); // 數據分布范圍集合 public Map<Integer, ConcurrentLinkedQueue<String>> dataRangMap = new HashMap<Integer, ConcurrentLinkedQueue<String>>(); // 數據分布寫入對象 public Map<Integer, BufferedWriter> dataWriteMap = new ConcurrentHashMap<Integer, BufferedWriter>(); // 數據區間線程池 public ExecutorService dataRangeThreadES = Executors.newFixedThreadPool(1); // CAS:將這個變量更新為新值,但是如果從我上次看到這個變量之后其他線程修改了它的值,那么更新就失敗” // 已經讀取完畢的數據行數 public AtomicInteger hasReaderDataLine = new AtomicInteger(0); // 通過多線程,已經按數據區間處理好的數據行數 public AtomicInteger hasDataRangeWriteLine = new AtomicInteger(0); // 已排序的總行數 public AtomicInteger hasSortedDataLine = new AtomicInteger(0); // 已經讀到內存等待排序的總行數 public AtomicInteger hasWaitSortedDataLine = new AtomicInteger(0); // 已排序的文件數 public AtomicInteger hasSortedFile = new AtomicInteger(0); // 已合并好的文件數 public AtomicInteger hasCombineFile = new AtomicInteger(0); // 程序啟動時間 public long startTime = 0l; // 讀取文件完成時間 public long finishReadFileTime = 0l; // 等待分發完畢時間 public long finishDeliverFileTime = 0l; // 排序完成時間 public long finishSortFileTime = 0l; // 合并完成時間 public long finishCombineFileTime = 0l; // 內存監控線程 public Thread ramMonitorT = new Thread(new Runnable() { @Override public void run() { try { while (true) { Memory.print(); Thread.sleep(2000); } } catch (Exception e) { } } }); public static void main(String[] args) throws Exception { BigSort sort = new BigSort(); // 待排序文件 if ((args.length > 0) && !args[0].equals("")) { sort.SRC_DATA = args[0]; } // 目的文件 if ((args.length > 1) && !args[1].equals("")) { if (!args[1].endsWith("\\")) { sort.DES_DATA_PATH = args[1] + "\\"; } else { sort.DES_DATA_PATH = args[1]; } sort.DES_SORT_DATA_PATH = sort.DES_DATA_PATH + "sort//"; sort.MERGE_FILE = sort.DES_DATA_PATH + "merge//merge.txt"; sort.MERGE_FILE_PATH = sort.DES_DATA_PATH + "merge//"; } sort.start(); } /** * 程序啟動入口 * * @throws Exception * */ public void start() throws Exception { System.out.println(String.format("CPU核心數[%s] 最大可用內存:[%sM] 初始化內存:[%sM]", cpuNums, Memory.getMaxHeapMemory() / 1024 / 1024, Memory.getInitHeapMemory() / 1024 / 1024)); Memory.print(); // 是否開啟內存監控 if (isRamMonitor) { ramMonitorT.setDaemon(true); ramMonitorT.start(); } // 1.準備階段 if (!prepare()) { return; } // 2.對源文件進行讀取入隊處理 readFile(new File(SRC_DATA)); // 3.等待分發數據線程把數據分發完畢,然后把線程池里面的線程全部終止 waitForFinishWriteDataRange(); System.gc(); // 4.對每個文件單獨排序 sort(); // 5.合并 combine(); System.out.println(String.format("[程序已全部完成][一共用時:%s秒][讀:%s秒,割:%s秒,排:%s秒,合:%s秒]", ((System.currentTimeMillis() - startTime) / 1000), finishReadFileTime, finishDeliverFileTime, finishSortFileTime, finishCombineFileTime)); System.out.println(String.format("[已排序完的文件在:%s]", MERGE_FILE)); } // 1.準備階段,文件準備 public boolean prepare() { try { System.out.println("[文件及目錄檢查][開始]"); File srcFile = new File(SRC_DATA); if (!srcFile.exists()) { System.out.println("[文件及目錄檢查][失敗][待排序文件不存在,程序結束]" + SRC_DATA); return false; } // 刪掉已存在的臨時文件 File desDataPath = new File(DES_DATA_PATH); // if(desDataPath.exists()){ // if(deleteDir(desDataPath)); // } // 創建目錄 if (!desDataPath.exists()) { desDataPath.mkdir(); } // 創建目錄 File desSortDataPath = new File(DES_SORT_DATA_PATH); if (!desSortDataPath.exists()) { desSortDataPath.mkdir(); } // 創建目錄 File mergeFilePath = new File(MERGE_FILE_PATH); if (!mergeFilePath.exists()) { mergeFilePath.mkdir(); } File mergeFile = new File(MERGE_FILE); if (mergeFile.exists()) { mergeFile.delete(); } System.out.println(String.format("[文件及目錄檢查][待排序文件路徑:%s]", SRC_DATA)); System.out.println(String.format("[文件及目錄檢查][排序完畢生成的文件地址:%s]", DES_DATA_PATH)); System.out.println("[文件及目錄檢查][完畢]"); } catch (Exception e) { System.out.println("[文件及目錄檢查][失敗,程序結束][原因]" + e.getMessage()); return false; } System.out.println("[啟動分發數據監聽線程][開始]"); startTime = System.currentTimeMillis(); for (int i = 0; i < DELIVER_DATA_QUEUE_SIZE; i++) { DeliverDataThread ddt = new DeliverDataThread(deliverDataQueue); deliverDataThreadES.execute(ddt); } System.out.println(String.format("[啟動分發數據監聽線程][完畢][共啟動:%s個監聽線程]", DELIVER_DATA_QUEUE_SIZE)); return true; } // 2.對源文件進行讀取入隊處理 public void readFile(File file) throws Exception { System.out.println(String.format("[讀取待排序文件][開始][大小:%sM]", file.length() / 1000 / 1000)); // 讀監控線程 Thread monitor = new Thread(new Runnable() { @Override public void run() { try { while (true) { System.out.println(String.format("[讀取待排序文件][已讀:%s行]", hasReaderDataLine.get())); Thread.sleep(5000); } } catch (Exception e) { } } }); monitor.start(); long startTime = System.currentTimeMillis(); FileUtil util = new FileUtil(new FileUtilImpl() { // 每讀到一行,應該怎么處理 public void handlerLin(String line) { hasReaderDataLine.incrementAndGet(); // 獲取到每一行的數據然后入隊! deliverDataQueue.offer(line.trim()); // 這里必須得去換行 } }); util.nioReadFile(file, BSIZE); monitor.interrupt(); finishReadFileTime = (System.currentTimeMillis() - startTime) / 1000; System.out.println(String.format("[讀取待排序文件][完畢][一共讀取:%S行][用時:%s秒]", hasReaderDataLine.get(), finishReadFileTime, hasReaderDataLine.get())); } // 3.等待分發數據線程把數據分發完畢,然后把線程池里面的線程全部終止 public void waitForFinishWriteDataRange() throws IOException { System.out.println("[數據分發][正在處理中]"); long cleanDeliverDataThreadStartTime = System.currentTimeMillis(); while (true) { if (hasReaderDataLine.get() == hasDataRangeWriteLine.get()) { // 對BW做結束,把內存中殘余的數據寫到文件 for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) { BufferedWriter bw = entry.getValue(); bw.close(); } break; } } deliverDataThreadES.shutdownNow(); dataRangeThreadES.shutdownNow(); finishDeliverFileTime = (System.currentTimeMillis() - cleanDeliverDataThreadStartTime) / 1000; System.out.println( String.format("[數據分發][完畢][已切割成:%s個待排序文件][用時:%s秒]", dataWriteMap.size(), finishDeliverFileTime)); } // 4.排序 public void sort() throws IOException { System.out.println(String.format("[排序][開始][待排序文件數量:%s個][并發排序數量:%s個]", dataWriteMap.size(), SORT_THREAD_SIZE)); long startTime = System.currentTimeMillis(); ExecutorService sortEs = Executors.newFixedThreadPool(SORT_THREAD_SIZE);// 排序線程池 for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) { int dataRange = entry.getKey(); SortThread st = new SortThread(dataRange); sortEs.execute(st); } // 監聽排序情況 Thread monitor = new Thread(new Runnable() { @Override public void run() { try { while (true) { System.out.println(String.format("[排序][已排好文件:%s個]", hasSortedFile.get())); System.out.println(String.format("總共:[%s] 已讀[%s] 已排:[%s]", hasReaderDataLine.get(),hasWaitSortedDataLine.get(),hasSortedDataLine.get())); Thread.sleep(5000); } } catch (Exception e) { } } }); monitor.start(); sortEs.shutdown(); while (true) { if (sortEs.isTerminated()) { finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000; System.out.println(String.format("[排序][完畢][已排好文件:%s個][已排好:%s行][用時:%s秒]", hasSortedFile.get(), hasSortedDataLine.get(), finishSortFileTime)); break; } try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // while(true){ // if(hasReaderDataLine.get() == hasSortedDataLine.get()){ // finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000; // System.out.println(String.format("[排序][完畢][已排好文件:%s個][已排好:%s行][用時:%s秒]",hasSortedFile.get(),hasSortedDataLine.get(),finishSortFileTime)); // break; // } // try { // Thread.sleep(500); // } catch (InterruptedException e) // { // e.printStackTrace(); // } // } // sortEs.shutdown(); monitor.interrupt(); } // 5.合并 public void combine() throws IOException, InterruptedException { System.out.println(String.format("[合并文件][開始][待合并文件數量:%s個]", dataWriteMap.size())); // 監聽合并情況 Thread monitor = new Thread(new Runnable() { @Override public void run() { try { while (true) { System.out.println(String.format("[合并文件][已合并文件:%s個]", hasCombineFile.get())); Thread.sleep(5000); } } catch (Exception e) { } } }); monitor.start(); File f = new File(DES_SORT_DATA_PATH); String[] files = f.list(); // 對文件名稱列表做排序,按順序合并 List<Integer> fileList = new ArrayList<Integer>(); for (String s : files) { fileList.add(Integer.valueOf(s.replaceAll(".txt", ""))); } Collections.sort(fileList); String[] mergeFiles = new String[fileList.size()]; for (int i = 0; i < fileList.size(); i++) { mergeFiles[i] = DES_SORT_DATA_PATH + String.valueOf(fileList.get(i)) + ".txt"; // mergeFiles[i] = String.valueOf(fileList.get(i))+".txt"; } long mergeStartTime = System.currentTimeMillis(); // 用java讀寫合并文件 combineFile(MERGE_FILE, mergeFiles); // 用系統命令合并文件 // combineFileUseSysCom(MERGE_FILE,mergeFiles); monitor.interrupt(); finishCombineFileTime = (System.currentTimeMillis() - mergeStartTime) / 1000; System.out.println(String.format("[合并文件][完畢][待排序文件大小:%s][合并完成文件大小:%s][用時:%s秒]", new File(SRC_DATA).length(), new File(MERGE_FILE).length(), finishCombineFileTime)); } // 分配隊列區間 public int getDataRange(String data) { int dataRange = data.length(); if (isDeliverTen) { if (dataRange != 1) { String dr = data.substring(0, 1); dataRange = Integer.valueOf(dataRange + "" + dr); } } return dataRange; } /** * * @author 838745 * * 分發數據線程 1. 從分發數據隊列中取數據 2. 獲取該數據的位數 3. 根據位數,把該數據放到相應的數據區間隊列中等待處理 * */ final static Object lock = new Object(); class DeliverDataThread extends Thread { ConcurrentLinkedQueue<String> deliverDataQueue; public DeliverDataThread(ConcurrentLinkedQueue<String> deliverDataQueue) { this.deliverDataQueue = deliverDataQueue; } @Override public void run() { try { while (true) { String data = deliverDataQueue.poll(); if (data == null || data.equals("")) { // 如果不休眠,當前線程會不停的循環,CPU都在當前線程上面,無法調度另外的線程. Thread.sleep(0); continue; } // 按照長度范圍,把數據放入相關的區間隊列 final int dataRange = getDataRange(data); // 數據區間隊列 // 對于2位數,分成10個隊列 // 10-19 為1個隊列,隊列名稱是21,20-29為1個隊列,隊列名稱是22 // 對于3位數,分鐘10個隊列 // 100-199 為1個隊列,隊列名稱是31,200-299為1個隊列,隊列名稱是32以此類推 BufferedWriter bw = dataWriteMap.get(dataRange); if (bw == null) { synchronized (lock) { bw = dataWriteMap.get(dataRange); if (bw == null) { // 產生相應的寫入對象 bw = new BufferedWriter(new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")), WRITE_SORT_BSIZE); dataWriteMap.put(dataRange, bw); } } } synchronized (bw) { bw.write(data); bw.newLine(); // 增加已經處理的行數 hasDataRangeWriteLine.incrementAndGet(); } } } catch (InterruptedException e1) { // System.out.println("結束分發線程:"+Thread.currentThread().getName() // + "用時" + (System.currentTimeMillis() - startTime)/1000 + // "S"); } catch (Exception e) { e.printStackTrace(); } } } class DeliverDataThread_bak extends Thread { ConcurrentLinkedQueue<String> deliverDataQueue; public DeliverDataThread_bak(ConcurrentLinkedQueue<String> deliverDataQueue) { this.deliverDataQueue = deliverDataQueue; } @Override public void run() { long startTime = System.currentTimeMillis(); try { while (true) { String data = deliverDataQueue.poll(); if (data == null || data.equals("")) { // 如果不休眠,當前線程會不停的循環,CPU都在當前線程上面,無法調度另外的線程. Thread.sleep(0); continue; } // 按照長度范圍,把數據放入相關的區間隊列 int dataRange = getDataRange(data); // 數據區間隊列 // 對于2位數,分成10個隊列 // 10-19 為1個隊列,隊列名稱是21,20-29為1個隊列,隊列名稱是22 // 對于3位數,分鐘10個隊列 // 100-199 為1個隊列,隊列名稱是31,200-299為1個隊列,隊列名稱是32以此類推 ConcurrentLinkedQueue<String> dataRangQueue = dataRangMap.get(dataRange); if (dataRangQueue == null) { // 創建隊列 dataRangQueue = new ConcurrentLinkedQueue<String>(); // 把當前隊列放到MAP中,就可以根據數據位數直接拿到隊列 dataRangMap.put(dataRange, dataRangQueue); // 產生相應的寫入對象 BufferedWriter bw = new BufferedWriter( new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")), WRITE_SORT_BSIZE); dataWriteMap.put(dataRange, bw); // 啟動數據區間隊列的監聽線程 DataRangeThread rq = new DataRangeThread(dataRange, dataRangQueue); for (int j = 0; j < RANG_QUEUE_SIZE; j++) { dataRangeThreadES.execute(rq); } } // 按數據位數,把數據放到相應的隊列中去 dataRangQueue.offer(data); } } catch (InterruptedException e1) { // System.out.println("結束分發線程:"+Thread.currentThread().getName() // + "用時" + (System.currentTimeMillis() - startTime)/1000 + // "S"); } catch (Exception e) { e.printStackTrace(); } } } /** * 數據區間寫入線程 * * @author 838745 * * 1. 從隊列中獲取相應的數據 2. 把該數據寫入到相應的數據區間文件中去 * */ class DataRangeThread extends Thread { ConcurrentLinkedQueue<String> dataRangQueue; int rang; public DataRangeThread(int rang, ConcurrentLinkedQueue<String> dataRangQueue) { this.dataRangQueue = dataRangQueue; this.rang = rang; } @Override public void run() { long startTime = System.currentTimeMillis(); try { while (true) { String data = dataRangQueue.poll(); if (data == null || data.equals("")) { // 如果不休眠,當前線程會不停的循環,CPU都耗在當前線程上面,無法調度另外的線程. Thread.sleep(0); continue; } // 按照長度范圍,把數據放入相關的區間隊列 BufferedWriter bw = dataWriteMap.get(rang); bw.write(data); bw.newLine(); // 增加已經處理的行數 hasDataRangeWriteLine.incrementAndGet(); } } catch (InterruptedException e1) { // System.out.println("結束數據區間線程:"+rang+" " + "用時" + // (System.currentTimeMillis() - startTime)/1000 + "S"); } catch (Exception e) { e.printStackTrace(); } } } /** * 排序線程 * * @author 838745 * */ class SortThread extends Thread { int dataRange; public SortThread(int dataRange) { this.dataRange = dataRange; } public void run() { StringBuilder newlinesBui = null; String lastLine = null; try { int lineCount = 0; long startTime = System.currentTimeMillis(); long startTime2 = System.currentTimeMillis(); final List<BigInteger> data = new ArrayList<BigInteger>(); File dataFile = new File(DES_DATA_PATH + dataRange + ".txt"); if (!dataFile.exists()) { return; } // 讀入文件 FileUtil util = new FileUtil(new FileUtilImpl() { // 每讀到一行,應該怎么處理 public void handlerLin(String line) { hasWaitSortedDataLine.incrementAndGet(); // 獲取到每一行的數據放入集合等待排序 data.add(new BigInteger(line)); } }); util.nioReadFile(dataFile, SORT_READER_BSIZE); // util.randomReadFile(dataFile, SORT_READER_BSIZE); String readEndTime = (System.currentTimeMillis() - startTime) / 1000 + "S"; // 排序 startTime = System.currentTimeMillis(); Collections.sort(data); String sortTime = (System.currentTimeMillis() - startTime) / 1000 + "S"; // 寫到文件 startTime = System.currentTimeMillis(); BufferedWriter bw = new BufferedWriter( new FileWriter(new File(DES_SORT_DATA_PATH + dataRange + ".txt")), SORT_WRITE_BSIZE); int i = 0; for (BigInteger b : data) { i = i++; bw.write(b.toString()); bw.newLine(); hasSortedDataLine.incrementAndGet(); // lineCount++; } bw.close(); String writeTime = (System.currentTimeMillis() - startTime) / 1000 + "S"; hasSortedFile.incrementAndGet(); // System.out.println(String.format("數據區間[%s] [文件大小:%sM] 排序[%s]行 // 完成時間[%s] 讀[%s] 排[%s] 寫[%s]", // dataRange, // dataFile.length()/1000/1000, // lineCount, // (System.currentTimeMillis() - startTime2) / 1000 +"S" // ,readEndTime, // sortTime, // writeTime)); } catch (Exception e) { e.printStackTrace(); } } } // Windos系統COPY合并程序 public void combineFileUseSysCom(String outFile, String[] files) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (String f : files) { sb.append(f + "+"); } String cmd = sb.substring(0, sb.length() - 1); System.out.println(cmd); String[] cmds = { "cmd", "/C", "copy", "/Y", cmd, MERGE_FILE.replaceAll("//", "\\\\") }; Process p = Runtime.getRuntime().exec(cmds, null, new File(DES_SORT_DATA_PATH.replaceAll("//", "\\\\"))); BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream())); String line = reader.readLine(); while (line != null) { line = reader.readLine(); System.out.println(line); hasCombineFile.incrementAndGet(); } p.waitFor(); } // JAVA合并程序 public void combineFile(String outFile, String[] files) { FileChannel outChannel = null; try { outChannel = new FileOutputStream(outFile).getChannel(); for (String f : files) { FileChannel fc = new FileInputStream(f).getChannel(); ByteBuffer bb = ByteBuffer.allocate(MERGE_BSIZE); while (fc.read(bb) != -1) { bb.flip(); // 回繞緩沖區,索引重置為開頭 outChannel.write(bb); bb.clear(); } fc.close(); hasCombineFile.incrementAndGet(); } } catch (IOException ioe) { ioe.printStackTrace(); } finally { try { if (outChannel != null) { outChannel.close(); } } catch (IOException ignore) { } } } /** * 遞歸刪除目錄下的所有文件及子目錄下所有文件 * * @param dir * 將要刪除的文件目錄 * @return boolean Returns "true" if all deletions were successful. If a * deletion fails, the method stops attempting to delete and returns * "false". */ private static boolean deleteDir(File dir) { if (dir.isDirectory()) { String[] children = dir.list(); for (int i = 0; i < children.length; i++) { boolean success = deleteDir(new File(dir, children[i])); if (!success) { return false; } } } // 目錄此時為空,可以刪除 return dir.delete(); } /************************************************* 讀文件工具類 ***************************************/ interface FileUtilImpl { public void handlerLin(String line); } class FileUtil implements FileUtilImpl { FileUtilImpl impl; public FileUtil(FileUtilImpl impl) { this.impl = impl; } // 讀到的行應該怎么處理 public void handlerLin(String line) { impl.handlerLin(line); } // nio讀文件 public void nioReadFile(File file, int SIZE) throws IOException { String enterStr = "\n"; FileChannel inChannel = new FileInputStream(file).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(SIZE); StringBuilder newlinesBui = new StringBuilder(); while (inChannel.read(buffer) != -1) { buffer.flip(); // ByteBuffer.array() 返回的 array 長度為 ByteBuffer // allocate的長度,并不是里面所含的內容的長度 // 這樣會導致,最后讀取的肯定不是allocate的長度,但是array返回的帶有上一次的冗余數據 // 解決辦法如下,重新按照剩余容量來制作一個新的byte byte[] contentBytes; if (buffer.remaining() != buffer.capacity()) { contentBytes = new byte[buffer.remaining()]; buffer.get(contentBytes, 0, contentBytes.length); } else { contentBytes = buffer.array(); } String content = new String(contentBytes); newlinesBui.append(content); int fromIndex = 0; int endIndex = -1; // 循環找到 \n String line; while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) { // 得到一行 line = newlinesBui.substring(fromIndex, endIndex).trim(); if (line != null && !line.trim().equals("")) { impl.handlerLin(line); } fromIndex = endIndex + 1; } newlinesBui.delete(0, fromIndex); buffer.clear(); } // 最后一行 String lastLine = newlinesBui.substring(0, newlinesBui.length()).trim(); if (lastLine != null && !lastLine.equals("")) { impl.handlerLin(lastLine); } inChannel.close(); } } /************************************************* 讀文件工具類 ***************************************/ /************************************************* 內存監控工具類 ***************************************/ static class Memory { public static long getMaxHeapMemory() { MemoryMXBean mmb = ManagementFactory.getMemoryMXBean(); return mmb.getHeapMemoryUsage().getMax(); } public static long getInitHeapMemory() { MemoryMXBean mmb = ManagementFactory.getMemoryMXBean(); return mmb.getHeapMemoryUsage().getInit(); } public static long getUsedHeapMemory() { MemoryMXBean mmb = ManagementFactory.getMemoryMXBean(); return mmb.getHeapMemoryUsage().getUsed(); } public static void print() { System.out.println(String.format("已經使用內存:[%sM] 剩余可用內存:[%sM]", Memory.getUsedHeapMemory() / 1024 / 1024, ((Memory.getMaxHeapMemory() / 1024 / 1024) - (Memory.getUsedHeapMemory() / 1024 / 1024)))); } } /************************************************* 內存監控工具類 ***************************************/ } ``` * 測試代碼 ``` package com.bingo; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; import java.util.Random; public class CreateFile { public static Thread ramMonitorT = new Thread(new Runnable() { @Override public void run() { try { while(true){ System.out.println(String.format("已生成文件大小:[%sM]", line/30000)); Thread.sleep(2000); } } catch (Exception e) { } } }); public static String SRC_DATA = "d://temp//bigdata/src/100m.txt"; public static int line = 0; public static void main(String[] args) throws InterruptedException, IOException { ramMonitorT.setDaemon(true); ramMonitorT.start(); //待排序文件 if( (args.length > 0) && !args[0].equals("")){ SRC_DATA = args[0]; } System.out.println("生成文件路徑:"+SRC_DATA); //文件大小 int m = 0; if( (args.length > 1) && !args[1].equals("")){ m = Integer.valueOf(args[1]); } System.out.println("生成文件大小:"+m+"M"); BufferedWriter bw = new BufferedWriter(new FileWriter(SRC_DATA)); //文件大小,1M=30000行,100M = 300W行,1G=3000W行,24G=3000W*24 int fileSize = 30000*m; for(int j = 0;j < fileSize;j++){ int rang = (int)(Math.random()*60)+1; StringBuffer num = new StringBuffer(); for(int i = 0; i< rang ; i++){ if(i != 0){ num.append((int)(Math.random()*10)); }else{ num.append((int)(Math.random()*9)+1); } } bw.write(num.toString()); bw.newLine(); line ++; if(j % 10000 == 0){ bw.flush(); } } bw.close(); System.out.println("完!"); } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看