<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 需求 過濾輸入的log日志中是否包含java 1. 包含java的網站輸出到`e:/java.log`中 2. 不包含java的網站輸出到`e:/other.log`中 輸入數據 : log.txt ~~~ java.org jdxia java.com x.com java ~~~ 輸出預期: `java.log` `other.log` # OutputFormat接口實現類 OutputFormat是MapReduce輸出的基類,所有實現MapReduce輸出都實現了OutputFormat接口. 常見是OutputFormat實現類 1. 文本輸出TextOutputFormat 默認的輸出格式是TextOutputFormat,它把每條記錄寫為文本行.他的鍵和值可以是任意類型,因為TextOutputFormat調用toString()方法把他們轉換為字符串 2. SequenceFileOutputFormat SequenceFileOutputFormat將它的輸出寫為一個順序文件.如果輸出需要作為后續MapReduce任務的輸入,這便是一種很好的輸出格式,因為他的格式緊湊,很容易被壓縮 3. 自定義OutputFormat 根據用戶需求,自定義實現輸出 # 代碼 自定義OutputFormat步驟 1. 自定義一個類繼承FileOutputFormat 2. 改寫recordwrite,具體改寫輸出數據的方法write() **自定義一個OutputFormat** ~~~ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { //創建一個RecordWriter return new FilterRecordWriter(job); } } ~~~ ~~~ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream javaOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { //1. 獲取文件系統 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); //2. 創建輸出文件路徑 Path javaPath = new Path("/Users/jdxia/Desktop/website/data/java.log"); Path otherPath = new Path("/Users/jdxia/Desktop/website/data/other.log"); //3. 創建輸出流 javaOut = fs.create(javaPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { //判斷是否包含"java"輸出到不同文件 if (key.toString().contains("java")) { javaOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //關閉資源,流不關文件是空的 if (javaOut != null) { javaOut.close(); } if (otherOut != null) { otherOut.close(); } } } ~~~ **Mapper類** ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取一行 String line = value.toString(); k.set(line); //寫出 context.write(k, NullWritable.get()); } } ~~~ **Reducer類** ~~~ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String k = key.toString(); k += "\r\n"; context.write(new Text(k), NullWritable.get()); } } ~~~ **驅動類** ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FilterDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); //輸入輸出組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(FilterOutputFormat.class); //Map的輸出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //reduce的輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input/")); //如果有這個文件夾就刪除 Path out = new Path("/Users/jdxia/Desktop/website/data/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, out); //雖然自定義OutputFormat,但是因為我們的OutputFormat繼承自FileOutputFormat //而FileOutputFormat要輸出一個_SUCCESS文件,所以這里還需要指定一個輸出目錄 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output/ ")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } } ~~~ # 注意 自定義OutputFormat時,注意recordWriter中的close方法必須關閉流資源.否則輸出的文件內容中數據為空
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看