<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # MapReduce重要配置參數 ## 資源相關參數 **以下參數是在用戶自己的mr應用程序中配置就可以生效** 1. **mapreduce.map.memory.mb**: 一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。 2. **mapreduce.reduce.memory.mb**: 一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。 3. **mapreduce.map.cpu.vcores**: 每個Map task可使用的最多cpu core數目, 默認值: 1 4. **mapreduce.reduce.cpu.vcores**: 每個Reduce task可使用的最多cpu core數目, 默認值: 1 5. `mapreduce.map.java.opts`: Map Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g. `“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” `(@taskid@會被Hadoop框架自動換為相應的taskid), 默認值: “” 6. `mapreduce.reduce.java.opts`: Reduce Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g. `“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”`, 默認值: “” **應該在yarn啟動之前就配置在服務器的配置文件中才能生效** 7. `yarn.scheduler.minimum-allocation-mb` 1024 給應用程序container分配的最小內存 8. `yarn.scheduler.maximum-allocation-mb` 8192 給應用程序container分配的最大內存 9. `yarn.scheduler.minimum-allocation-vcores` 1 10. `yarn.scheduler.maximum-allocation-vcores` 32 11. `yarn.nodemanager.resource.memory-mb` 8192 **shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好** 12. mapreduce.task.io.sort.mb 100 ` //shuffle的環形緩沖區大小,默認100m` 14. mapreduce.map.sort.spill.percent 0.8 `//環形緩沖區溢出的閾值,默認80%` ## 容錯相關參數 1. `mapreduce.map.maxattempts`: 每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。 2. `mapreduce.reduce.maxattempts`: 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。 3. `mapreduce.map.failures.maxpercent`: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,默認值為0. 如果你的應用程序允許丟棄部分輸入數據,則該該值設為一個大于0的值,比如5,表示如果有低于5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個作業扔認為成功。 4. `mapreduce.reduce.failures.maxpercent`: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,默認值為0. 5. `mapreduce.task.timeout`: Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處于block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是`“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”` ## 本地運行mapreduce 作業 設置以下幾個參數: ~~~ mapreduce.framework.name=local mapreduce.jobtracker.address=local fs.defaultFS=local ~~~ ## 效率和穩定性相關參數 1. mapreduce.map.speculative: 是否為Map Task打開推測執行機制,默認為false 2. mapreduce.reduce.speculative: 是否為Reduce Task打開推測執行機制,默認為false 3. mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出現在用戶jar包和hadoop jar中時,優先使用哪個jar包中的class,默認為false,表示優先使用hadoop jar中的class。 4. mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片時的最小切片大小, 5. mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片時的最大切片大小 (切片的默認大小就等于blocksize,即 134217728) # 全局計數器 在實際生產代碼中,常常需要將數據處理過程中遇到的不合規數據行進行全局計數,類似這種需求可以借助mapreduce框架中提供的全局計數器來實現 ![](https://box.kancloud.cn/785f99ed03a9fa67c23ae1e8645a1d07_1051x899.png) 這個統計是全局的 # 多job串聯 一個稍復雜點的處理邏輯往往需要多個mapreduce程序串聯處理,多job的串聯可以借助mapreduce框架的JobControl實現 1. 我們可以用shell腳本,根據狀態返回,來決定下一步的shell執行還是不執行 2. 可以設置多個job他們的依賴關系 ~~~ ControlledJob cJob1 = new ControlledJob(job1.getConfiguration()); ControlledJob cJob2 = new ControlledJob(job2.getConfiguration()); ControlledJob cJob3 = new ControlledJob(job3.getConfiguration()); cJob1.setJob(job1); cJob2.setJob(job2); cJob3.setJob(job3); // 設置作業依賴關系,job2執行依賴job1,job3依賴job2 cJob2.addDependingJob(cJob1); cJob3.addDependingJob(cJob2); //設置JobControl,里面放一個組名 JobControl jobControl = new JobControl("RecommendationJob"); jobControl.addJob(cJob1); jobControl.addJob(cJob2); jobControl.addJob(cJob3); // 新建一個線程來運行已加入JobControl中的作業,開始進程并等待結束 Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); //判斷是不是已經finish了,沒有finish就繼續執行 while (!jobControl.allFinished()) { Thread.sleep(500); } jobControl.stop(); return 0; ~~~ # 數據壓縮 ## 概述 這是mapreduce的一種優化策略:通過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減少磁盤IO,提高MR程序運行速度(但相應增加了cpu運算負擔) 1. Mapreduce支持將map輸出的結果或者reduce輸出的結果進行壓縮,以減少網絡IO或最終輸出數據的體積 2. 壓縮特性運用得當能提高性能,但運用不當也可能降低性能 3. 基本原則: 運算密集型的job,少用壓縮 IO密集型的job,多用壓縮 ## MR支持的壓縮編碼 ![](https://box.kancloud.cn/438a3c2e9ecc41a083c8bbcfc05ffa66_657x202.png) ## Reducer輸出壓縮 在配置參數或在代碼中都可以設置reduce的輸出壓縮 1. 在配置參數中設置 ~~~ mapreduce.output.fileoutputformat.compress=false mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec mapreduce.output.fileoutputformat.compress.type=RECORD ~~~ 2. 在代碼中設置 ~~~ Job job = Job.getInstance(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName("")); ~~~ ## Mapper輸出壓縮 在配置參數或在代碼中都可以設置reduce的輸出壓縮 1. 在配置參數中設置 ~~~ mapreduce.map.output.compress=false mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec ~~~ 2. 在代碼中設置: ~~~ conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true); conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class); ~~~ ## 壓縮文件的讀取(源碼) Hadoop自帶的InputFormat類內置支持壓縮文件的讀取,比如TextInputformat類,在其initialize方法中: ~~~ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); //根據文件后綴名創建相應壓縮編碼的codec CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); //判斷是否屬于可切片壓縮編碼類型 if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); //如果是可切片壓縮編碼,則創建一個CompressedSplitLineReader讀取壓縮數據 in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { //如果是不可切片壓縮編碼,則創建一個SplitLineReader讀取壓縮數據,并將文件輸入流轉換成解壓數據流傳遞給普通SplitLineReader讀取 in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); //如果不是壓縮文件,則創建普通SplitLineReader讀取數據 in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看