<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 數據壓縮 ## 概述 這是mapreduce的一種優化策略:通過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減少磁盤IO,提高MR程序運行速度(但相應增加了cpu運算負擔) 1. Mapreduce支持將map輸出的結果或者reduce輸出的結果進行壓縮,以減少網絡IO或最終輸出數據的體積 2. 壓縮特性運用得當能提高性能,但運用不當也可能降低性能 3. 基本原則: 運算密集型的job,少用壓縮 IO密集型的job,多用壓縮 ## MR支持的壓縮編碼 | 壓縮格式 | hadoop自帶? | 算法 | 文件擴展名 | 是否可切分 | 換壓縮格式后,原來的程序是否需要修改 | | --- | --- | --- | --- | --- | --- | | DEFAULT | 是 | DEFAULT | .deflate | 否 | 和文本處理一樣,不需要修改 | | Gzip | 是 | DEFAULT | .gz | 否 | 和文本處理一樣,不需要修改 | | bzip2 | 是 | bzip2 | .bz2 | 是 | 和文本處理一樣,不需要修改 | | LZO | 否 | LZO | .lzo | 是 | 需要建索引,還需要指定輸入格式 | | Snappy | 否 | Snappy | .snappy | 否 | 和文本處理一樣,不需要修改 | 為了支持多種壓縮/解壓縮算法,hadoop引入了編碼/解碼器,如下 | 壓縮格式 | 對應的編碼/解碼器 | | --- | --- | | DEFLATE | org.apache.hadoop.io.compress.DefaultCodec | | gzip | org.apache.hadoop.io.compress.GzipCodec | | bzip2 | org.apache.hadoop.io.compress.BZip2Codec | | LZO | com.hadoop.compression.lzo.LzopCodec | | Snappy | org.apache.hadoop.io.compress.SnappyCodec | 壓縮性能比較 | 壓縮算法 | 原始文件大小 | 壓縮文件大小 | 壓縮速度 | 解壓速度 | | --- | --- | --- | --- | --- | | gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s | | bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s | | LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.9MB/s | # 壓縮方式選擇 ## Gzip壓縮 優點: 壓縮率比較高,而且壓縮/解壓速度也比較快,hadoop本身支持,在應用處理gzip格式的文件就和直接處理文本一樣,大部分linux系統都自帶gzip命令,使用方便 缺點: 不支持split 應用場景: 當每個文件壓縮之后在130M以內的(1個塊大小內),都可以考慮用gzip壓縮格式,比如一天或者一小時的日志壓縮成一個gzip文件,運行mapreduce程序的時候通過多個gzip文件達到并發.hive程序和java寫的mapreduce程序完全和文本處理一樣,壓縮之后原來的程序不需要做任何修改 ## Bzip2壓縮 優點: 支持split,具有很高的壓縮率,比gzip壓縮率都高,hadoop本身支持,但不支持native,在linux系統下自帶bzip2命令使用方便 缺點: 壓縮/解壓速度慢,不支持native 應用場景: 適用對速度要求不高,但需要較高壓縮率的時候,可以作為mapreduce作業的輸出格式.或者輸出之后的數據比較大,處理之后的數據需要壓縮存檔減少磁盤空間并且以后數據用得比較少的情況,或者對單個很大的文本文件想壓縮減少存儲空間,同時又需要支持split,而且兼容之前的應用程序(應用程序不需要修改)的情況 ## Lzo壓縮 優點: 壓縮/解壓速度也比較快,合理的壓縮率.支持split,是hadoop中最流行的壓縮格式.可以在linux系統下安裝lzop命令,使用方便 缺點: 壓縮率比gzip要低一些,hadoop本身不支持,需要安裝.在應用中對lzo格式的文件需要做一些特殊處理(為了支持split需要建索引,還需要指定inoutformat為lzo格式) 應用場景: 一個很大的文本文件,壓縮之后還大于200M以上的可以考慮,而且單個文件越大,lzo優點越明顯 ## Snappy壓縮 優點: 高速壓縮速度和合理的壓縮率 缺點: 不支持split,壓縮率比gzip要低,hadoop本身不支持,需要安裝 應用場景: 當mapreduce作業的map輸出的數據比較大的時候,作為map到reduce的中間數據的壓縮格式,或者作為一個mapreduce作業的輸出和另外一個mapreduce作業的輸入 ## 壓縮位置選擇 壓縮可以在mapreduce作用的任意階段啟用 **輸入端采用壓縮** 在有大量數據并計劃重復處理的情況下,應該考慮對輸入進行壓縮.然而,你無須顯示指定使用的編解碼方式. hadoop自動檢查文件擴展名.如果擴展名能夠匹配,就會用恰當的編解碼方式對文件進行壓縮和解壓,否則hadoop不會使用任何編解碼器 **輸出采用壓縮** 當map任務輸出的中間數據量很大時,應考慮在此階段采用壓縮技術.這能顯著改善內部數據shuffle過程.shuffle是消耗資源最多的環節.可用于壓縮mapper輸出的快速編解碼器包括LZO或者snapper **reducer輸出采用壓縮** 在此階段啟用壓縮技術能夠減少要存儲的數據量,因此降低所需的磁盤空間. 當mapreduce作業形成作業鏈條時,所以啟用壓縮同樣有效 # 壓縮配置參數 要在hadoop中啟用壓縮,可以配置如下參數(mapred-site.xml中) ![](https://box.kancloud.cn/0ce97c148bfc30d8608f3f6f783a2e45_835x293.png) ![](https://box.kancloud.cn/add5e45d615c7a7cbd7f5b249c5276a6_834x698.png) ![](https://box.kancloud.cn/504049c57117ec6adc5ee57c1a2c7f67_836x325.png) # 代碼 ## 測試輸出壓縮 ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; import java.io.*; public class TestCompress { public static void main(String[] args) throws IOException, ClassNotFoundException { //測試壓縮 compress("/Users/jdxia/Desktop/website/data/input/order.txt", "org.apache.hadoop.io.compress.BZip2Codec"); } //測試壓縮 @SuppressWarnings({"resource", "unchecked"}) private static void compress(String filename, String method) throws IOException, ClassNotFoundException { //獲取輸入流 FileInputStream fis = new FileInputStream(new File(filename)); Class className = Class.forName(method); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(className, new Configuration()); //獲取輸出流,輸出的文件是文件名加后綴 FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension())); CompressionOutputStream cos = codec.createOutputStream(fos); //流的對拷,暫時不關閉流,最后在filename所在文件夾中會有個壓縮文件 IOUtils.copyBytes(fis, cos, 1024*1024*5, false); //關閉資源 fis.close(); cos.close(); fos.close(); } } ~~~ ## 測試輸入壓縮 ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; public class TestCompress { public static void main(String[] args) throws IOException, ClassNotFoundException { //測試壓縮 decompress("/Users/jdxia/Desktop/website/data/input/order.txt.bz2"); } private static void decompress(String filename) throws IOException { //校驗是否能解壓縮 CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration()); CompressionCodec codec = factory.getCodec(new Path(filename)); //如果不支持直接返回 if (codec == null) { System.out.println("cannot find codec for file " + filename); return; } //獲取輸入流 CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename))); //獲取輸出流 FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded")); //流的對拷 IOUtils.copyBytes(cis, fos, 1024*1024*5, false); //關閉資源 cis.close(); fos.close(); } } ~~~ ## map輸出采用壓縮 即使你的MapReduce的輸入輸出文件都是未壓縮的文件,你仍然可以對map任務的中間結果輸出做壓縮,因為他要寫在硬盤并且通過網絡傳輸到reduce節點,對其壓縮可以提高很多性能,這些工作只要設置兩個屬性即可 這些工作只要設置兩個屬性就可以 **map端輸入可以根據擴展名來** ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //開啟map端輸出壓縮 conf.setBoolean("mapreduce.map.output.compress", true); conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 job.setJarByClass(WorldCountDriver.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } } ~~~ ## reduce輸出采用壓縮 ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 job.setJarByClass(WorldCountDriver.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output")); //設置reduce端輸出壓縮開啟 FileOutputFormat.setCompressOutput(job, true); //設置壓縮的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } } ~~~ ## 數據流的壓縮和解壓縮 CompressionCodec有兩個方法可以用于輕松的壓縮或解壓縮數據.要想對正在被寫入一個輸出流的數據進行壓縮,我們可以使用createOutputStream(OutputStreamout)方法創建一個CompressionOutputStream,將其以壓縮格式寫入底層的流. 相反,想要對從輸入流讀取而來的數據進行解壓縮,則調用createInputStream(InputStream)函數,從而獲得一個CompressionInputStream,從而從底層的流讀取未壓縮的數據 ## Reducer輸出壓縮 在配置參數或在代碼中都可以設置reduce的輸出壓縮 1. 在配置參數中設置 ~~~ mapreduce.output.fileoutputformat.compress=false mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec mapreduce.output.fileoutputformat.compress.type=RECORD ~~~ 2. 在代碼中設置 ~~~ Job job = Job.getInstance(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName("")); ~~~ ## Mapper輸出壓縮 在配置參數或在代碼中都可以設置reduce的輸出壓縮 1. 在配置參數中設置 ~~~ mapreduce.map.output.compress=false mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec ~~~ 2. 在代碼中設置: ~~~ conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true); conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class); ~~~ ## 壓縮文件的讀取(源碼) Hadoop自帶的InputFormat類內置支持壓縮文件的讀取,比如TextInputformat類,在其initialize方法中: ~~~ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); //根據文件后綴名創建相應壓縮編碼的codec CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); //判斷是否屬于可切片壓縮編碼類型 if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); //如果是可切片壓縮編碼,則創建一個CompressedSplitLineReader讀取壓縮數據 in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { //如果是不可切片壓縮編碼,則創建一個SplitLineReader讀取壓縮數據,并將文件輸入流轉換成解壓數據流傳遞給普通SplitLineReader讀取 in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); //如果不是壓縮文件,則創建普通SplitLineReader讀取數據 in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看