<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                通過InputFormat決定讀取的數據的類型,然后拆分成一個個InputSplit,每個InputSplit對應一個Map處理,RecordReader讀取InputSplit的內容給Map ## InputFormat 決定讀取數據的格式,可以是文件或數據庫等 ### 功能 1. 驗證作業輸入的正確性,如格式等 1. 將輸入文件切割成邏輯分片(InputSplit),一個InputSplit將會被分配給一個獨立的Map任務 1. 提供RecordReader實現,讀取InputSplit中的"K-V對"供Mapper使用 ### 方法 **List getSplits():** 獲取由輸入文件計算出輸入分片(InputSplit),解決數據或文件分割成片問題 **RecordReader <k,v>createRecordReader():</k,v>** 創建#x5EFA;RecordReader,從InputSplit中讀取數據,解決讀取分片中數據問題 ### 類結構 ![](https://box.kancloud.cn/2015-07-23_55b03fb6552eb.png) **TextInputFormat:** 輸入文件中的每一行就是一個記錄,Key是這一行的byte offset,而value是這一行的內容 **KeyValueTextInputFormat:** 輸入文件中每一行就是一個記錄,第一個分隔符字符切分每行。在分隔符字符之前的內容為Key,在之后的為Value。分隔符變量通過key.value.separator.in.input.line變量設置,默認為(\t)字符。 **NLineInputFormat:** 與TextInputFormat一樣,但每個數據塊必須保證有且只有N行,mapred.line.input.format.linespermap屬性,默認為1 **SequenceFileInputFormat:** 一個用來讀取字符流數據的InputFormat,<key,value>為用戶自定義的。字符流數據是Hadoop自定義的壓縮的二進制數據格式。它用來優化從一個MapReduce任務的輸出到另一個MapReduce任務的輸入之間的數據傳輸過程。</key,value> ## InputSplit 代表一個個邏輯分片,并沒有真正存儲數據,只是提供了一個如何將數據分片的方法 Split內有Location信息,利于數據局部化 一個InputSplit給一個單獨的Map處理 ~~~ public abstract class InputSplit { /** * 獲取Split的大小,支持根據size對InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 獲取存儲該分片的數據所在的節點位置. */ public abstract String[] getLocations() throws IOException, InterruptedException; } ~~~ ## RecordReader 將InputSplit拆分成一個個<key,value>對給Map處理,也是實際的文件讀取分隔對象</key,value> ## 問題 ### 大量小文件如何處理 CombineFileInputFormat可以將若干個Split打包成一個,目的是避免過多的Map任務(因為Split的數目決定了Map的數目,大量的Mapper Task創建銷毀開銷將是巨大的) ### 怎么計算split的 通常一個split就是一個block(FileInputFormat僅僅拆分比block大的文件),這樣做的好處是使得Map可以在存儲有當前數據的節點上運行本地的任務,而不需要通過網絡進行跨節點的任務調度 通過mapred.min.split.size, mapred.max.split.size, block.size來控制拆分的大小 如果mapred.min.split.size大于block size,則會將兩個block合成到一個split,這樣有部分block數據需要通過網絡讀取 如果mapred.max.split.size小于block size,則會將一個block拆成多個split,增加了Map任務數(Map對split進行計算?#x5E76;且上報結果,關閉當前計算打開新的split均需要耗費資源) 先獲取文件在HDFS上的路徑和Block信息,然后根據splitSize對文件進行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默認splitSize 就等于blockSize的默認值(64m) ~~~ public List<InputSplit> getSplits(JobContext job) throws IOException { // 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job.getConfiguration()); // 獲取該文件所有的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割 if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 計算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循環分片。 // 當剩余數據與分片大小比值大于Split_Slop時,繼續分片, 小于等于時,停止分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 處理余下的數據 if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else { // 不可split,整塊返回 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { // 對于長度為0的文件,創建空Hosts列表,返回 splits.add(makeSplit(path, 0, length, new String[0])); } } // 設置輸入文件數量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; } ~~~ ### 分片間的數據如何處理 split是根據文件大小分割的,而一般處理是根據分隔符進行分割的,這樣勢必存在一條記錄橫跨兩個split ![](https://box.kancloud.cn/2015-07-23_55b03fb66abcf.png) 解決辦法是只要不是第一個split,都會遠程讀取一條記錄。不是第一個split的都忽略到第一條記錄 ~~~ public class LineRecordReader extends RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函數即對LineRecordReader的一個初始化 // 主要是計算分片的始末位置,打開輸入流以供讀取K-V對,處理分片經過壓縮的情況等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打開文件,并定位到分片讀取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是壓縮文件的話,直接打開文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // 只要不是第一個split,則忽略本split的第一行數據 if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下&#x#x6B21;讀取就會從偏移位置開始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // 忽略第一行數據,重新定位start start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即為偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 讀取的數據長度為0,則說明已讀完 if (newSize == 0) { break; } pos += newSize; // 讀取的數據長度小于最大行長度,也說明已讀取完畢 if (newSize < maxLineLength) { break; } // 執行到此處,說明該行數據沒讀完,繼續讀入 } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看