<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 需求 現有一些原始日志需要做增強解析處理,流程: 1. 從原始日志文件中讀取數據 2. 根據日志中的一個URL字段到外部知識庫中獲取信息增強到原始日志 3. 如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始數據中URL字段輸出到待爬清單目錄 # 分析 程序的關鍵點是要在一個mapreduce程序中根據數據的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現 # 實現 實現要點: 1. **在mapreduce中訪問外部資源** 2. **自定義outputformat,改寫其中的recordwriter,改寫具體輸出數據的方法write()** # 代碼實現 **數據庫獲取數據的工具** 給數據打上標簽 ~~~ public class DBLoader { public static void dbLoader(HashMap<String, String> ruleMap) { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,content from urlcontent"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { DBLoader db = new DBLoader(); HashMap<String, String> map = new HashMap<String,String>(); db.dbLoader(map); System.out.println(map.size()); } } ~~~ **自定義一個outputformat** 根據不同的放到不同的文件中 ~~~ public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //參數context代表上下文 //獲取配置信息 FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log"); Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log"); FSDataOutputStream enhanceOut = fs.create(enhancePath); FSDataOutputStream toCrawlOut = fs.create(toCrawlPath); //返回的類和下面定義的內部類對應起來 return new MyRecordWriter(enhanceOut,toCrawlOut); } public static class MyRecordWriter extends RecordWriter<Text, NullWritable>{ FSDataOutputStream enhanceOut = null; FSDataOutputStream toCrawlOut = null; public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) { this.enhanceOut = enhanceOut; this.toCrawlOut = toCrawlOut; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { //有了數據,你來負責寫到目的地 —— hdfs //判斷,進來內容如果是帶tocrawl的,就往待爬清單輸出流中寫 toCrawlOut //然后就造成了寫到不同文件的情況 if(key.toString().contains("tocrawl")){ toCrawlOut.write(key.toString().getBytes()); }else{ enhanceOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(toCrawlOut!=null){ toCrawlOut.close(); } if(enhanceOut!=null){ enhanceOut.close(); } } } } ~~~ **開發mapreduce處理流程** ~~~ /** * 這個程序是對每個小時不斷產生的用戶上網記錄日志進行增強(將日志中的url所指向的網頁內容分析結果信息追加到每一行原始日志后面) */ public class LogEnhancer { static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> { HashMap<String, String> knowledgeMap = new HashMap<String, String>(); /** * maptask在初始化時會先調用setup方法一次 利用這個機制,將外部的知識庫加載到maptask執行的機器內存中 */ @Override protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { DBLoader.dbLoader(knowledgeMap); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { String url = fields[26]; // 對這一行日志中的url去知識庫中查找內容分析信息 String content = knowledgeMap.get(url); // 根據內容信息匹配的結果,來構造兩種輸出結果 String result = ""; if (null == content) { // 輸往待爬清單的內容 result = url + "\t" + "tocrawl\n"; } else { // 輸往增強日志的內容 result = line + "\t" + content + "\n"; } context.write(new Text(result), NullWritable.get()); } catch (Exception e) { } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogEnhancer.class); job.setMapperClass(LogEnhancerMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要將自定義的輸出格式組件設置到job中 job.setOutputFormatClass(LogEnhancerOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 雖然我們自定義了outputformat,但是因為我們的outputformat繼承自fileoutputformat // 而fileoutputformat要輸出一個_SUCCESS文件,所以,在這還得指定一個輸出目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); System.exit(0); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看