# 如何單機排序一個大數據文件?
## **問題來源:**
針對一個大文件,如何對里面的元素進行排序
## **問題描述:**
24GTxt文件,每行1個大整數,1-100位不等
純JDK排序。
## **解決方案:**
> 程序流程
1. 源文件采用單線程NIO按行讀
2. 讀到的每一行入到隊列A
3. 開啟16個線程(根據CPU核數),去消費這個隊列
4. 消費之后,把數據寫入相關的文件待排序
5. 開啟8個線程并發排序每個待排序文件(讀進來,排序,寫)
6. 按文件名做合并
*****
> 經驗總結
1. 文件的讀取先要看清楚是按行還是按字節。 如果按行讀,不能用多線程,方法是讀1個BUFFERED,判斷結束是否是換行,如果不是,就按字節讀,一直讀到是換行為止,或者按BUFFERED讀,然后按換行截取,剩下的就拼在下一個BUFFERED的頭部。如果按字節讀,可以用多線程(RandomAccessFile
2. 讀和寫,最好設置緩存大小。16M剛好
3. Eclipse運行的java程序是獨立的JVM,如果內存不夠,可以加參數-Xms3072m -Xmx6072m
4. 遇到高并發自增,可以采用AtomicInteger
5. ByteBuffer.array() 返回的 array 長度為 ByteBuffer allocate的長度,并不是里面所含的內容的長度
```
//這樣會導致,最后讀取的肯定不是allocate的長度,但是array返回的帶有上一次的冗余數據
//解決辦法如下,重新按照剩余容量來制作一個新的byte
byte[] data;
if(buffer.remaining() != buffer.capacity()){
data = new byte[buffer.remaining()];
buffer.get(data, 0, data.length);
}else{
data = buffer.array();
}
String content = new String(data);
```
6.如果中斷線程池里面的線程
可以使用Pool.shutdown. 但是前提是線程里面有阻斷方法。如Sleep或者阻塞隊列等等。
7.對于阻塞隊列,入隊和出隊所占用的時間比較長,做實時性的性能差,因為阻塞涉及到加鎖
8.線程池不能設置setDaemon。如果線程池里面的線程讀守候,那線程就無法回收了。矛盾
9.同1時刻,1個CPU運行1個或者多個線程,如8核兩線程,那就是一共16個線程
*****
> 測試報告
* 運行結果
1. SSD 10分鐘跑完24G
2. 機械硬盤 80分鐘跑完24G
* 程序啟動使用內存
| | 32位JDK啟動程序使用內存 | 64位JDK啟動程序使用 |
|---|---|---|
| -Xms1g | 11M | 5M |
| -Xms1.1g | 12M | |
| -Xms1.2g | 報錯 | |
| -Xms2g | 報錯 | 10M |
| -Xms3g | 報錯 | 15M |
| -Xms5g | 報錯 | 25M |
| -Xms6g | 報錯 | 30M |
* BufferedWriter占用內存數(基于64位JDK,-Xms5g)
```
BufferedWriter bw = new BufferedWriter
(new FileWriter(new File("D:\\temp\\bigdata\\des3g\\"+i+".txt")),內存大小);
```
* BufferedWriter 緩存 5M 每個對象大概占用10M
| 創建對象數量 | 占用內存|
|---|---|
| 2 | 25M |
| 3 | 35M |
| 4 | 45M |
| 500 | 1265M(GC) |
* BufferedWriter 緩存 3M 每個對象大概占用6M
| 創建對象數量 | 占用內存|
|---|---|
| 4 | 25M |
| 5 | 31M |
| 6 | 37M |
| 500 | 507M 1265M (GC) |
* BufferedWriter 緩存 1M 每個對象大概占用2M
| 創建對象數量 | 占用內存|
|---|---|
| 12 | 25M |
| 13 | 27M |
| 14 | 29M |
| 500 | 1006M(GC) |
*****
> 程序代碼
* 排序代碼
```
package com.bingo4;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class BigSort {
/**************************************** 可配置項 ***********************************/
// 是否開啟內存監控,每2秒打印內存情況
public boolean isRamMonitor = false;
// 待排序文件
public String SRC_DATA = "d://temp//bigdata/src/100m.txt";
// 排序完畢生成的文件地址
public String DES_DATA_PATH = "d://temp//bigdata//des//";
// 默認開啟,每1位數,就分發成10個待排序文件,如果待排序文件里面最大是60位數,就分發成600個待排序文件.源文件如果超過8G左右,必須開啟,否則后面單個文件做排序會導致內存溢出
// 如果關閉,每1位數,就分發成1個待排序文件,這個對于源文件不大的情況下,速度極快。
public boolean isDeliverTen = true;
// 讀入待排序文件緩存
final static int BSIZE = 1024 * 1024 * 1; // 3M
// 寫入數據區間文件緩存
final static int WRITE_SORT_BSIZE = 1024 * 1024 * 3; // 3M
// 排序讀寫緩存
final static int SORT_READER_BSIZE = 1024 * 1024 * 1; // 5M
final static int SORT_WRITE_BSIZE = 1024 * 1024 * 1; // 5M
// 合并讀寫緩存
final static int MERGE_BSIZE = 1024 * 1024 * 2; // 5M
// 分發數據線程大小
public static int DELIVER_DATA_QUEUE_SIZE = 16;
// 每個數據區間監聽隊列的線程數, 這里設置為1,效率最高
public static int RANG_QUEUE_SIZE = 1;
// 并發排序線程數
public static int SORT_THREAD_SIZE = 8;
/**************************************** 可配置項 ***********************************/
public String DES_SORT_DATA_PATH = DES_DATA_PATH + "sort//";
public String MERGE_FILE = DES_DATA_PATH + "merge//merge.txt";
public String MERGE_FILE_PATH = DES_DATA_PATH + "merge//";
int cpuNums = Runtime.getRuntime().availableProcessors();
// 分發數據隊列
public ConcurrentLinkedQueue<String> deliverDataQueue = new ConcurrentLinkedQueue<String>();
// 分發數據線程的執行線程池
public ExecutorService deliverDataThreadES = Executors.newFixedThreadPool(DELIVER_DATA_QUEUE_SIZE);
// 數據分布范圍集合
public Map<Integer, ConcurrentLinkedQueue<String>> dataRangMap = new HashMap<Integer, ConcurrentLinkedQueue<String>>();
// 數據分布寫入對象
public Map<Integer, BufferedWriter> dataWriteMap = new ConcurrentHashMap<Integer, BufferedWriter>();
// 數據區間線程池
public ExecutorService dataRangeThreadES = Executors.newFixedThreadPool(1);
// CAS:將這個變量更新為新值,但是如果從我上次看到這個變量之后其他線程修改了它的值,那么更新就失敗”
// 已經讀取完畢的數據行數
public AtomicInteger hasReaderDataLine = new AtomicInteger(0);
// 通過多線程,已經按數據區間處理好的數據行數
public AtomicInteger hasDataRangeWriteLine = new AtomicInteger(0);
// 已排序的總行數
public AtomicInteger hasSortedDataLine = new AtomicInteger(0);
// 已經讀到內存等待排序的總行數
public AtomicInteger hasWaitSortedDataLine = new AtomicInteger(0);
// 已排序的文件數
public AtomicInteger hasSortedFile = new AtomicInteger(0);
// 已合并好的文件數
public AtomicInteger hasCombineFile = new AtomicInteger(0);
// 程序啟動時間
public long startTime = 0l;
// 讀取文件完成時間
public long finishReadFileTime = 0l;
// 等待分發完畢時間
public long finishDeliverFileTime = 0l;
// 排序完成時間
public long finishSortFileTime = 0l;
// 合并完成時間
public long finishCombineFileTime = 0l;
// 內存監控線程
public Thread ramMonitorT = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
Memory.print();
Thread.sleep(2000);
}
} catch (Exception e) {
}
}
});
public static void main(String[] args) throws Exception {
BigSort sort = new BigSort();
// 待排序文件
if ((args.length > 0) && !args[0].equals("")) {
sort.SRC_DATA = args[0];
}
// 目的文件
if ((args.length > 1) && !args[1].equals("")) {
if (!args[1].endsWith("\\")) {
sort.DES_DATA_PATH = args[1] + "\\";
} else {
sort.DES_DATA_PATH = args[1];
}
sort.DES_SORT_DATA_PATH = sort.DES_DATA_PATH + "sort//";
sort.MERGE_FILE = sort.DES_DATA_PATH + "merge//merge.txt";
sort.MERGE_FILE_PATH = sort.DES_DATA_PATH + "merge//";
}
sort.start();
}
/**
* 程序啟動入口
*
* @throws Exception
*
*/
public void start() throws Exception {
System.out.println(String.format("CPU核心數[%s] 最大可用內存:[%sM] 初始化內存:[%sM]", cpuNums,
Memory.getMaxHeapMemory() / 1024 / 1024, Memory.getInitHeapMemory() / 1024 / 1024));
Memory.print();
// 是否開啟內存監控
if (isRamMonitor) {
ramMonitorT.setDaemon(true);
ramMonitorT.start();
}
// 1.準備階段
if (!prepare()) {
return;
}
// 2.對源文件進行讀取入隊處理
readFile(new File(SRC_DATA));
// 3.等待分發數據線程把數據分發完畢,然后把線程池里面的線程全部終止
waitForFinishWriteDataRange();
System.gc();
// 4.對每個文件單獨排序
sort();
// 5.合并
combine();
System.out.println(String.format("[程序已全部完成][一共用時:%s秒][讀:%s秒,割:%s秒,排:%s秒,合:%s秒]",
((System.currentTimeMillis() - startTime) / 1000), finishReadFileTime, finishDeliverFileTime,
finishSortFileTime, finishCombineFileTime));
System.out.println(String.format("[已排序完的文件在:%s]", MERGE_FILE));
}
// 1.準備階段,文件準備
public boolean prepare() {
try {
System.out.println("[文件及目錄檢查][開始]");
File srcFile = new File(SRC_DATA);
if (!srcFile.exists()) {
System.out.println("[文件及目錄檢查][失敗][待排序文件不存在,程序結束]" + SRC_DATA);
return false;
}
// 刪掉已存在的臨時文件
File desDataPath = new File(DES_DATA_PATH);
// if(desDataPath.exists()){
// if(deleteDir(desDataPath));
// }
// 創建目錄
if (!desDataPath.exists()) {
desDataPath.mkdir();
}
// 創建目錄
File desSortDataPath = new File(DES_SORT_DATA_PATH);
if (!desSortDataPath.exists()) {
desSortDataPath.mkdir();
}
// 創建目錄
File mergeFilePath = new File(MERGE_FILE_PATH);
if (!mergeFilePath.exists()) {
mergeFilePath.mkdir();
}
File mergeFile = new File(MERGE_FILE);
if (mergeFile.exists()) {
mergeFile.delete();
}
System.out.println(String.format("[文件及目錄檢查][待排序文件路徑:%s]", SRC_DATA));
System.out.println(String.format("[文件及目錄檢查][排序完畢生成的文件地址:%s]", DES_DATA_PATH));
System.out.println("[文件及目錄檢查][完畢]");
} catch (Exception e) {
System.out.println("[文件及目錄檢查][失敗,程序結束][原因]" + e.getMessage());
return false;
}
System.out.println("[啟動分發數據監聽線程][開始]");
startTime = System.currentTimeMillis();
for (int i = 0; i < DELIVER_DATA_QUEUE_SIZE; i++) {
DeliverDataThread ddt = new DeliverDataThread(deliverDataQueue);
deliverDataThreadES.execute(ddt);
}
System.out.println(String.format("[啟動分發數據監聽線程][完畢][共啟動:%s個監聽線程]", DELIVER_DATA_QUEUE_SIZE));
return true;
}
// 2.對源文件進行讀取入隊處理
public void readFile(File file) throws Exception {
System.out.println(String.format("[讀取待排序文件][開始][大小:%sM]", file.length() / 1000 / 1000));
// 讀監控線程
Thread monitor = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(String.format("[讀取待排序文件][已讀:%s行]", hasReaderDataLine.get()));
Thread.sleep(5000);
}
} catch (Exception e) {
}
}
});
monitor.start();
long startTime = System.currentTimeMillis();
FileUtil util = new FileUtil(new FileUtilImpl() {
// 每讀到一行,應該怎么處理
public void handlerLin(String line) {
hasReaderDataLine.incrementAndGet();
// 獲取到每一行的數據然后入隊!
deliverDataQueue.offer(line.trim()); // 這里必須得去換行
}
});
util.nioReadFile(file, BSIZE);
monitor.interrupt();
finishReadFileTime = (System.currentTimeMillis() - startTime) / 1000;
System.out.println(String.format("[讀取待排序文件][完畢][一共讀取:%S行][用時:%s秒]", hasReaderDataLine.get(), finishReadFileTime,
hasReaderDataLine.get()));
}
// 3.等待分發數據線程把數據分發完畢,然后把線程池里面的線程全部終止
public void waitForFinishWriteDataRange() throws IOException {
System.out.println("[數據分發][正在處理中]");
long cleanDeliverDataThreadStartTime = System.currentTimeMillis();
while (true) {
if (hasReaderDataLine.get() == hasDataRangeWriteLine.get()) {
// 對BW做結束,把內存中殘余的數據寫到文件
for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) {
BufferedWriter bw = entry.getValue();
bw.close();
}
break;
}
}
deliverDataThreadES.shutdownNow();
dataRangeThreadES.shutdownNow();
finishDeliverFileTime = (System.currentTimeMillis() - cleanDeliverDataThreadStartTime) / 1000;
System.out.println(
String.format("[數據分發][完畢][已切割成:%s個待排序文件][用時:%s秒]", dataWriteMap.size(), finishDeliverFileTime));
}
// 4.排序
public void sort() throws IOException {
System.out.println(String.format("[排序][開始][待排序文件數量:%s個][并發排序數量:%s個]", dataWriteMap.size(), SORT_THREAD_SIZE));
long startTime = System.currentTimeMillis();
ExecutorService sortEs = Executors.newFixedThreadPool(SORT_THREAD_SIZE);// 排序線程池
for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) {
int dataRange = entry.getKey();
SortThread st = new SortThread(dataRange);
sortEs.execute(st);
}
// 監聽排序情況
Thread monitor = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(String.format("[排序][已排好文件:%s個]", hasSortedFile.get()));
System.out.println(String.format("總共:[%s] 已讀[%s] 已排:[%s]", hasReaderDataLine.get(),hasWaitSortedDataLine.get(),hasSortedDataLine.get()));
Thread.sleep(5000);
}
} catch (Exception e) {
}
}
});
monitor.start();
sortEs.shutdown();
while (true) {
if (sortEs.isTerminated()) {
finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000;
System.out.println(String.format("[排序][完畢][已排好文件:%s個][已排好:%s行][用時:%s秒]", hasSortedFile.get(),
hasSortedDataLine.get(), finishSortFileTime));
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// while(true){
// if(hasReaderDataLine.get() == hasSortedDataLine.get()){
// finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000;
// System.out.println(String.format("[排序][完畢][已排好文件:%s個][已排好:%s行][用時:%s秒]",hasSortedFile.get(),hasSortedDataLine.get(),finishSortFileTime));
// break;
// }
// try {
// Thread.sleep(500);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// }
// sortEs.shutdown();
monitor.interrupt();
}
// 5.合并
public void combine() throws IOException, InterruptedException {
System.out.println(String.format("[合并文件][開始][待合并文件數量:%s個]", dataWriteMap.size()));
// 監聽合并情況
Thread monitor = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(String.format("[合并文件][已合并文件:%s個]", hasCombineFile.get()));
Thread.sleep(5000);
}
} catch (Exception e) {
}
}
});
monitor.start();
File f = new File(DES_SORT_DATA_PATH);
String[] files = f.list();
// 對文件名稱列表做排序,按順序合并
List<Integer> fileList = new ArrayList<Integer>();
for (String s : files) {
fileList.add(Integer.valueOf(s.replaceAll(".txt", "")));
}
Collections.sort(fileList);
String[] mergeFiles = new String[fileList.size()];
for (int i = 0; i < fileList.size(); i++) {
mergeFiles[i] = DES_SORT_DATA_PATH + String.valueOf(fileList.get(i)) + ".txt";
// mergeFiles[i] = String.valueOf(fileList.get(i))+".txt";
}
long mergeStartTime = System.currentTimeMillis();
// 用java讀寫合并文件
combineFile(MERGE_FILE, mergeFiles);
// 用系統命令合并文件
// combineFileUseSysCom(MERGE_FILE,mergeFiles);
monitor.interrupt();
finishCombineFileTime = (System.currentTimeMillis() - mergeStartTime) / 1000;
System.out.println(String.format("[合并文件][完畢][待排序文件大小:%s][合并完成文件大小:%s][用時:%s秒]", new File(SRC_DATA).length(),
new File(MERGE_FILE).length(), finishCombineFileTime));
}
// 分配隊列區間
public int getDataRange(String data) {
int dataRange = data.length();
if (isDeliverTen) {
if (dataRange != 1) {
String dr = data.substring(0, 1);
dataRange = Integer.valueOf(dataRange + "" + dr);
}
}
return dataRange;
}
/**
*
* @author 838745
*
* 分發數據線程 1. 從分發數據隊列中取數據 2. 獲取該數據的位數 3. 根據位數,把該數據放到相應的數據區間隊列中等待處理
*
*/
final static Object lock = new Object();
class DeliverDataThread extends Thread {
ConcurrentLinkedQueue<String> deliverDataQueue;
public DeliverDataThread(ConcurrentLinkedQueue<String> deliverDataQueue) {
this.deliverDataQueue = deliverDataQueue;
}
@Override
public void run() {
try {
while (true) {
String data = deliverDataQueue.poll();
if (data == null || data.equals("")) {
// 如果不休眠,當前線程會不停的循環,CPU都在當前線程上面,無法調度另外的線程.
Thread.sleep(0);
continue;
}
// 按照長度范圍,把數據放入相關的區間隊列
final int dataRange = getDataRange(data);
// 數據區間隊列
// 對于2位數,分成10個隊列
// 10-19 為1個隊列,隊列名稱是21,20-29為1個隊列,隊列名稱是22
// 對于3位數,分鐘10個隊列
// 100-199 為1個隊列,隊列名稱是31,200-299為1個隊列,隊列名稱是32以此類推
BufferedWriter bw = dataWriteMap.get(dataRange);
if (bw == null) {
synchronized (lock) {
bw = dataWriteMap.get(dataRange);
if (bw == null) {
// 產生相應的寫入對象
bw = new BufferedWriter(new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")),
WRITE_SORT_BSIZE);
dataWriteMap.put(dataRange, bw);
}
}
}
synchronized (bw) {
bw.write(data);
bw.newLine();
// 增加已經處理的行數
hasDataRangeWriteLine.incrementAndGet();
}
}
} catch (InterruptedException e1) {
// System.out.println("結束分發線程:"+Thread.currentThread().getName()
// + "用時" + (System.currentTimeMillis() - startTime)/1000 +
// "S");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class DeliverDataThread_bak extends Thread {
ConcurrentLinkedQueue<String> deliverDataQueue;
public DeliverDataThread_bak(ConcurrentLinkedQueue<String> deliverDataQueue) {
this.deliverDataQueue = deliverDataQueue;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
try {
while (true) {
String data = deliverDataQueue.poll();
if (data == null || data.equals("")) {
// 如果不休眠,當前線程會不停的循環,CPU都在當前線程上面,無法調度另外的線程.
Thread.sleep(0);
continue;
}
// 按照長度范圍,把數據放入相關的區間隊列
int dataRange = getDataRange(data);
// 數據區間隊列
// 對于2位數,分成10個隊列
// 10-19 為1個隊列,隊列名稱是21,20-29為1個隊列,隊列名稱是22
// 對于3位數,分鐘10個隊列
// 100-199 為1個隊列,隊列名稱是31,200-299為1個隊列,隊列名稱是32以此類推
ConcurrentLinkedQueue<String> dataRangQueue = dataRangMap.get(dataRange);
if (dataRangQueue == null) {
// 創建隊列
dataRangQueue = new ConcurrentLinkedQueue<String>();
// 把當前隊列放到MAP中,就可以根據數據位數直接拿到隊列
dataRangMap.put(dataRange, dataRangQueue);
// 產生相應的寫入對象
BufferedWriter bw = new BufferedWriter(
new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")), WRITE_SORT_BSIZE);
dataWriteMap.put(dataRange, bw);
// 啟動數據區間隊列的監聽線程
DataRangeThread rq = new DataRangeThread(dataRange, dataRangQueue);
for (int j = 0; j < RANG_QUEUE_SIZE; j++) {
dataRangeThreadES.execute(rq);
}
}
// 按數據位數,把數據放到相應的隊列中去
dataRangQueue.offer(data);
}
} catch (InterruptedException e1) {
// System.out.println("結束分發線程:"+Thread.currentThread().getName()
// + "用時" + (System.currentTimeMillis() - startTime)/1000 +
// "S");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 數據區間寫入線程
*
* @author 838745
*
* 1. 從隊列中獲取相應的數據 2. 把該數據寫入到相應的數據區間文件中去
*
*/
class DataRangeThread extends Thread {
ConcurrentLinkedQueue<String> dataRangQueue;
int rang;
public DataRangeThread(int rang, ConcurrentLinkedQueue<String> dataRangQueue) {
this.dataRangQueue = dataRangQueue;
this.rang = rang;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
try {
while (true) {
String data = dataRangQueue.poll();
if (data == null || data.equals("")) {
// 如果不休眠,當前線程會不停的循環,CPU都耗在當前線程上面,無法調度另外的線程.
Thread.sleep(0);
continue;
}
// 按照長度范圍,把數據放入相關的區間隊列
BufferedWriter bw = dataWriteMap.get(rang);
bw.write(data);
bw.newLine();
// 增加已經處理的行數
hasDataRangeWriteLine.incrementAndGet();
}
} catch (InterruptedException e1) {
// System.out.println("結束數據區間線程:"+rang+" " + "用時" +
// (System.currentTimeMillis() - startTime)/1000 + "S");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 排序線程
*
* @author 838745
*
*/
class SortThread extends Thread {
int dataRange;
public SortThread(int dataRange) {
this.dataRange = dataRange;
}
public void run() {
StringBuilder newlinesBui = null;
String lastLine = null;
try {
int lineCount = 0;
long startTime = System.currentTimeMillis();
long startTime2 = System.currentTimeMillis();
final List<BigInteger> data = new ArrayList<BigInteger>();
File dataFile = new File(DES_DATA_PATH + dataRange + ".txt");
if (!dataFile.exists()) {
return;
}
// 讀入文件
FileUtil util = new FileUtil(new FileUtilImpl() {
// 每讀到一行,應該怎么處理
public void handlerLin(String line) {
hasWaitSortedDataLine.incrementAndGet();
// 獲取到每一行的數據放入集合等待排序
data.add(new BigInteger(line));
}
});
util.nioReadFile(dataFile, SORT_READER_BSIZE);
// util.randomReadFile(dataFile, SORT_READER_BSIZE);
String readEndTime = (System.currentTimeMillis() - startTime) / 1000 + "S";
// 排序
startTime = System.currentTimeMillis();
Collections.sort(data);
String sortTime = (System.currentTimeMillis() - startTime) / 1000 + "S";
// 寫到文件
startTime = System.currentTimeMillis();
BufferedWriter bw = new BufferedWriter(
new FileWriter(new File(DES_SORT_DATA_PATH + dataRange + ".txt")), SORT_WRITE_BSIZE);
int i = 0;
for (BigInteger b : data) {
i = i++;
bw.write(b.toString());
bw.newLine();
hasSortedDataLine.incrementAndGet();
// lineCount++;
}
bw.close();
String writeTime = (System.currentTimeMillis() - startTime) / 1000 + "S";
hasSortedFile.incrementAndGet();
// System.out.println(String.format("數據區間[%s] [文件大小:%sM] 排序[%s]行
// 完成時間[%s] 讀[%s] 排[%s] 寫[%s]",
// dataRange,
// dataFile.length()/1000/1000,
// lineCount,
// (System.currentTimeMillis() - startTime2) / 1000 +"S"
// ,readEndTime,
// sortTime,
// writeTime));
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Windos系統COPY合并程序
public void combineFileUseSysCom(String outFile, String[] files) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (String f : files) {
sb.append(f + "+");
}
String cmd = sb.substring(0, sb.length() - 1);
System.out.println(cmd);
String[] cmds = { "cmd", "/C", "copy", "/Y", cmd, MERGE_FILE.replaceAll("//", "\\\\") };
Process p = Runtime.getRuntime().exec(cmds, null, new File(DES_SORT_DATA_PATH.replaceAll("//", "\\\\")));
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = reader.readLine();
while (line != null) {
line = reader.readLine();
System.out.println(line);
hasCombineFile.incrementAndGet();
}
p.waitFor();
}
// JAVA合并程序
public void combineFile(String outFile, String[] files) {
FileChannel outChannel = null;
try {
outChannel = new FileOutputStream(outFile).getChannel();
for (String f : files) {
FileChannel fc = new FileInputStream(f).getChannel();
ByteBuffer bb = ByteBuffer.allocate(MERGE_BSIZE);
while (fc.read(bb) != -1) {
bb.flip(); // 回繞緩沖區,索引重置為開頭
outChannel.write(bb);
bb.clear();
}
fc.close();
hasCombineFile.incrementAndGet();
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
try {
if (outChannel != null) {
outChannel.close();
}
} catch (IOException ignore) {
}
}
}
/**
* 遞歸刪除目錄下的所有文件及子目錄下所有文件
*
* @param dir
* 將要刪除的文件目錄
* @return boolean Returns "true" if all deletions were successful. If a
* deletion fails, the method stops attempting to delete and returns
* "false".
*/
private static boolean deleteDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
for (int i = 0; i < children.length; i++) {
boolean success = deleteDir(new File(dir, children[i]));
if (!success) {
return false;
}
}
}
// 目錄此時為空,可以刪除
return dir.delete();
}
/************************************************* 讀文件工具類 ***************************************/
interface FileUtilImpl {
public void handlerLin(String line);
}
class FileUtil implements FileUtilImpl {
FileUtilImpl impl;
public FileUtil(FileUtilImpl impl) {
this.impl = impl;
}
// 讀到的行應該怎么處理
public void handlerLin(String line) {
impl.handlerLin(line);
}
// nio讀文件
public void nioReadFile(File file, int SIZE) throws IOException {
String enterStr = "\n";
FileChannel inChannel = new FileInputStream(file).getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
StringBuilder newlinesBui = new StringBuilder();
while (inChannel.read(buffer) != -1) {
buffer.flip();
// ByteBuffer.array() 返回的 array 長度為 ByteBuffer
// allocate的長度,并不是里面所含的內容的長度
// 這樣會導致,最后讀取的肯定不是allocate的長度,但是array返回的帶有上一次的冗余數據
// 解決辦法如下,重新按照剩余容量來制作一個新的byte
byte[] contentBytes;
if (buffer.remaining() != buffer.capacity()) {
contentBytes = new byte[buffer.remaining()];
buffer.get(contentBytes, 0, contentBytes.length);
} else {
contentBytes = buffer.array();
}
String content = new String(contentBytes);
newlinesBui.append(content);
int fromIndex = 0;
int endIndex = -1;
// 循環找到 \n
String line;
while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) {
// 得到一行
line = newlinesBui.substring(fromIndex, endIndex).trim();
if (line != null && !line.trim().equals("")) {
impl.handlerLin(line);
}
fromIndex = endIndex + 1;
}
newlinesBui.delete(0, fromIndex);
buffer.clear();
}
// 最后一行
String lastLine = newlinesBui.substring(0, newlinesBui.length()).trim();
if (lastLine != null && !lastLine.equals("")) {
impl.handlerLin(lastLine);
}
inChannel.close();
}
}
/************************************************* 讀文件工具類 ***************************************/
/************************************************* 內存監控工具類 ***************************************/
static class Memory {
public static long getMaxHeapMemory() {
MemoryMXBean mmb = ManagementFactory.getMemoryMXBean();
return mmb.getHeapMemoryUsage().getMax();
}
public static long getInitHeapMemory() {
MemoryMXBean mmb = ManagementFactory.getMemoryMXBean();
return mmb.getHeapMemoryUsage().getInit();
}
public static long getUsedHeapMemory() {
MemoryMXBean mmb = ManagementFactory.getMemoryMXBean();
return mmb.getHeapMemoryUsage().getUsed();
}
public static void print() {
System.out.println(String.format("已經使用內存:[%sM] 剩余可用內存:[%sM]", Memory.getUsedHeapMemory() / 1024 / 1024,
((Memory.getMaxHeapMemory() / 1024 / 1024) - (Memory.getUsedHeapMemory() / 1024 / 1024))));
}
}
/************************************************* 內存監控工具類 ***************************************/
}
```
* 測試代碼
```
package com.bingo;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Random;
public class CreateFile {
public static Thread ramMonitorT = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
System.out.println(String.format("已生成文件大小:[%sM]", line/30000));
Thread.sleep(2000);
}
} catch (Exception e) {
}
}
});
public static String SRC_DATA = "d://temp//bigdata/src/100m.txt";
public static int line = 0;
public static void main(String[] args) throws InterruptedException, IOException {
ramMonitorT.setDaemon(true);
ramMonitorT.start();
//待排序文件
if( (args.length > 0) && !args[0].equals("")){
SRC_DATA = args[0];
}
System.out.println("生成文件路徑:"+SRC_DATA);
//文件大小
int m = 0;
if( (args.length > 1) && !args[1].equals("")){
m = Integer.valueOf(args[1]);
}
System.out.println("生成文件大小:"+m+"M");
BufferedWriter bw = new BufferedWriter(new FileWriter(SRC_DATA));
//文件大小,1M=30000行,100M = 300W行,1G=3000W行,24G=3000W*24
int fileSize = 30000*m;
for(int j = 0;j < fileSize;j++){
int rang = (int)(Math.random()*60)+1;
StringBuffer num = new StringBuffer();
for(int i = 0; i< rang ; i++){
if(i != 0){
num.append((int)(Math.random()*10));
}else{
num.append((int)(Math.random()*9)+1);
}
}
bw.write(num.toString());
bw.newLine();
line ++;
if(j % 10000 == 0){
bw.flush();
}
}
bw.close();
System.out.println("完!");
}
}
```