<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 簡介 Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor(時間戳)、HostInterceptor(主機)、RegexExtractorInterceptor(正則)等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理 # 自定義攔截器 根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,并對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷 # 需求 ~~~ 13601249301 100 200 300 400 500 600 700 13601249302 100 200 300 400 500 600 700 13601249303 100 200 300 400 500 600 700 13601249304 100 200 300 400 500 600 700 13601249305 100 200 300 400 500 600 700 13601249306 100 200 300 400 500 600 700 13601249307 100 200 300 400 500 600 700 13601249308 100 200 300 400 500 600 700 13601249309 100 200 300 400 500 600 700 13601249310 100 200 300 400 500 600 700 13601249311 100 200 300 400 500 600 700 13601249312 100 200 300 400 500 600 700 ~~~ 把這個變成這個樣子 第一行加密,中間有幾行舍棄 ![](https://box.kancloud.cn/9a73270b2419e629800c1ed8bf4dae99_1436x190.png) # 實現 二部分 ## 編寫java代碼,自定義攔截器; 內容包括: 1. 定義一個類CustomParameterInterceptor實現Interceptor接口。 2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。 3. 添加CustomParameterInterceptor的有參構造方法。并對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。 4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。 5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。并給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。 6. 定義一個靜態類,類中封裝MD5加密方法 7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中 ![](https://box.kancloud.cn/4f2e68edbe218f81cdd341ee91d57df7_522x604.png) ## 修改Flume的配置信息 新增配置文件spool-interceptor-hdfs.conf,內容為: ~~~ a1.channels = c1 a1.sources = r1 a1.sinks = s1 #channel a1.channels.c1.type = memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=50000 #source a1.sources.r1.channels = c1 # 監控/root/data/下的文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sources.r1.batchSize= 50 a1.sources.r1.inputCharset = UTF-8 # 攔截器 a1.sources.r1.interceptors =i1 i2 # 自己定義的java類, $表示內部類 a1.sources.r1.interceptors.i1.type =com.hive.CustomParameterInterceptor$Builder # 下面的定義屬性會傳遞到自定義的類中 # 自定義攔截器的屬性,這個是代表分隔符,unicode編碼的空格 a1.sources.r1.interceptors.i1.fields_separator=\\u0009 # 當前列我需要取哪些列的下標 a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6 # 當前索引用的什么分隔符,用的是逗號,寫unicode編碼 a1.sources.r1.interceptors.i1.indexs_separator =\\u002c # 具體加密字段的參數,第一列加密 a1.sources.r1.interceptors.i1.encrypted_field_index =0 # 這個攔截器類型是時間戳 a1.sources.r1.interceptors.i2.type = timestamp #sink a1.sinks.s1.channel = c1 a1.sinks.s1.type = hdfs a1.sinks.s1.hdfs.path =hdfs://master:9000/flume/%Y%m%d # 當前類hdfs里面的屬性值 a1.sinks.s1.hdfs.filePrefix = event a1.sinks.s1.hdfs.fileSuffix = .log a1.sinks.s1.hdfs.rollSize = 10485760 a1.sinks.s1.hdfs.rollInterval =20 a1.sinks.s1.hdfs.rollCount = 0 a1.sinks.s1.hdfs.batchSize = 1500 a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundUnit = minute a1.sinks.s1.hdfs.threadsPoolSize = 25 a1.sinks.s1.hdfs.useLocalTimeStamp = true a1.sinks.s1.hdfs.minBlockReplicas = 1 a1.sinks.s1.hdfs.fileType =DataStream a1.sinks.s1.hdfs.writeFormat = Text a1.sinks.s1.hdfs.callTimeout = 60000 a1.sinks.s1.hdfs.idleTimeout =60 ~~~ # 代碼 先看內部Builder類,里面有configure方法 字段的默認值有CustomParameterInterceptor.Constants這個內部類提供 Builder類里面的builder是構造攔截器,用里面的類來構建 先調用這個類的構造函數 然后`List<Event> intercept`會調用當個intercept ~~~ package com.hive; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import static com.hive.CustomParameterInterceptor.Constants.*; public class CustomParameterInterceptor implements Interceptor { //指明每一行字段的分隔符 private final String fields_separator; //通過分割符分割后,指明需要那列的字段,下標 private final String indexs; //多個下標的分割符 private final String indexs_separator; //需要加密的字段下標 private final String encrypted_field_index; public CustomParameterInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) { //每一行字段的分隔符 String f = fields_separator.trim(); //多個下標的分割符 String i = indexs_separator.trim(); //通過分割符分割后,指明需要那列的字段,下標 this.indexs = indexs; //需要加密的字段下標 this.encrypted_field_index = encrypted_field_index.trim(); if (!f.equals("")) { f = UnicodeToString(f); } //指明每一行字段的分隔符 this.fields_separator = f; if (!i.equals("")) { i = UnicodeToString(i); } //多個下標的分割符 this.indexs_separator = i; } /** * unicode轉換為string * \t 制表符 ('\u0009') \n 新行(換行)符 (' ') \r 回車符 (' ') \f 換頁符 ('\u000C') \a 報警 * (bell) 符 ('\u0007') \e 轉義符 ('\u001B') \cx 空格(\u0020)對應于 x 的控制符 */ private String UnicodeToString(String str) { Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))"); Matcher matcher = pattern.matcher(str); char ch; while (matcher.find()) { ch = (char) Integer.parseInt(matcher.group(2), 16); str = str.replace(matcher.group(1), ch + ""); } return str; } //初始化方法 @Override public void initialize() {} //處理單條事件 @Override public Event intercept(Event event) { if (event == null) { return null; } try { //getBody是具體存放數據的內容,獲取每一行數據 String line = new String(event.getBody(), Charsets.UTF_8); //分隔符切分 String[] fields_spilts = line.split(fields_separator); //對應所需要的列的下標 String[] indexs_split = indexs.split(indexs_separator); String newLine = ""; //循環下標數組 for (int i = 0; i < indexs_split.length; i++) { int parseInt = Integer.parseInt(indexs_split[i]); //對字段進行加密 if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) { //將數據最終加密為md5 newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]); } else { newLine += fields_spilts[parseInt]; } //拼接字段分割符 if (i != indexs_split.length - 1) { newLine += fields_separator; } } //把這個新的一行數據設置進去 event.setBody(newLine.getBytes()); return event; } catch (Exception e) { return event; } } //處理很多事件 @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> out = new ArrayList<Event>(); //循環這個時間列表 for (Event event : events) { //調用單個事件 Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } //這個Interceptor.Builder可以讀取flume的配置文件,因為Builder中繼承的Configurable可以讀取配置文件 public static class Builder implements Interceptor.Builder { /** * The fields_separator.指明每一行字段的分隔符 */ private String fields_separator; /** * The indexs.通過分隔符分割后,指明需要那列的字段 下標 */ private String indexs; /** * The indexs_separator. 多個下標的分隔符 */ private String indexs_separator; /** * The encrypted_field. 需要加密的字段下標 */ private String encrypted_field_index; //構建對應的攔截器 @Override public Interceptor build() { //用上面一個類來構建,返回外面的那個類,把值傳遞給他的構造器 return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index); } //能夠幫我們獲取配置文件定義的參數 @Override public void configure(Context context) { //后面的值是默認值,context可以獲取到配置文件的值,在這里面用大寫常量代替,具體的值可參考下面的類Constants fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR); indexs = context.getString(INDEXS, DEFAULT_INDEXS); indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR); encrypted_field_index = context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX); } } public static class Constants { /** * The Constant FIELD_SEPARATOR. */ public static final String FIELD_SEPARATOR = "fields_separator"; /** * The Constant DEFAULT_FIELD_SEPARATOR. */ public static final String DEFAULT_FIELD_SEPARATOR = " "; /** * The Constant INDEXS. */ public static final String INDEXS = "indexs"; /** * The Constant DEFAULT_INDEXS. */ public static final String DEFAULT_INDEXS = "0"; /** * The Constant INDEXS_SEPARATOR. */ public static final String INDEXS_SEPARATOR = "indexs_separator"; /** * The Constant DEFAULT_INDEXS_SEPARATOR. */ public static final String DEFAULT_INDEXS_SEPARATOR = ","; /** * The Constant ENCRYPTED_FIELD_INDEX. */ public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index"; /** * The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */ public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = ""; /** * The Constant PROCESSTIME. */ public static final String PROCESSTIME = "processTime"; /** * The Constant PROCESSTIME. */ public static final String DEFAULT_PROCESSTIME = "a"; } /** * 字符串md5加密 */ public static class StringUtils { // 全局數組 private final static String[] strDigits = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}; // 返回形式為數字跟字符串 private static String byteToArrayString(byte bByte) { int iRet = bByte; // System.out.println("iRet="+iRet); if (iRet < 0) { iRet += 256; } int iD1 = iRet / 16; int iD2 = iRet % 16; return strDigits[iD1] + strDigits[iD2]; } // 返回形式只為數字 private static String byteToNum(byte bByte) { int iRet = bByte; System.out.println("iRet1=" + iRet); if (iRet < 0) { iRet += 256; } return String.valueOf(iRet); } // 轉換字節數組為16進制字串 private static String byteToString(byte[] bByte) { StringBuffer sBuffer = new StringBuffer(); for (int i = 0; i < bByte.length; i++) { sBuffer.append(byteToArrayString(bByte[i])); } return sBuffer.toString(); } public static String GetMD5Code(String strObj) { String resultString = null; try { resultString = new String(strObj); MessageDigest md = MessageDigest.getInstance("MD5"); // md.digest() 該函數返回值為存放哈希值結果的byte數組 resultString = byteToString(md.digest(strObj.getBytes())); } catch (NoSuchAlgorithmException ex) { ex.printStackTrace(); } return resultString; } } } ~~~ 然后把這個jar包上傳到hive的lib目錄 啟動: ~~~ flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看