<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 簡介 Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor(時間戳)、HostInterceptor(主機)、RegexExtractorInterceptor(正則)等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理 # 1自定義攔截器 根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,并對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷 # 實現 二部分 ## 編寫java代碼,自定義攔截器; 內容包括: 1. 定義一個類CustomParameterInterceptor實現Interceptor接口。 2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。 3. 添加CustomParameterInterceptor的有參構造方法。并對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。 4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。 5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。并給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。 6. 定義一個靜態類,類中封裝MD5加密方法 7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中 ![](https://box.kancloud.cn/a670ca860c6fc66fd3e802c185d8a0d5_531x617.png) ## 修改Flume的配置信息 新增配置文件spool-interceptor-hdfs.conf,內容為: ~~~ a1.channels = c1 a1.sources = r1 a1.sinks = s1 #channel a1.channels.c1.type = memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=50000 #source a1.sources.r1.channels = c1 # 監控/root/data/下的文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sources.r1.batchSize= 50 a1.sources.r1.inputCharset = UTF-8 # 攔截器 a1.sources.r1.interceptors =i1 i2 # 自己定義的java類, $表示內部類 a1.sources.r1.interceptors.i1.type =com.hive.CustomParameterInterceptor$Builder # 自定義攔截器的屬性,這個是代表分隔符,ulecode編碼的 a1.sources.r1.interceptors.i1.fields_separator=\\u0009 # 當前列我需要取哪些列的下標 a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6 # 當前索引用的什么分隔符,用的是逗號,寫urlencode編碼 a1.sources.r1.interceptors.i1.indexs_separator =\\u002c # 具體加密字段的參數 a1.sources.r1.interceptors.i1.encrypted_field_index =0 # 這個攔截器類型是時間戳 a1.sources.r1.interceptors.i2.type = timestamp #sink a1.sinks.s1.channel = c1 a1.sinks.s1.type = hdfs a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d # 當前類hdfs里面的屬性值 a1.sinks.s1.hdfs.filePrefix = event a1.sinks.s1.hdfs.fileSuffix = .log a1.sinks.s1.hdfs.rollSize = 10485760 a1.sinks.s1.hdfs.rollInterval =20 a1.sinks.s1.hdfs.rollCount = 0 a1.sinks.s1.hdfs.batchSize = 1500 a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundUnit = minute a1.sinks.s1.hdfs.threadsPoolSize = 25 a1.sinks.s1.hdfs.useLocalTimeStamp = true a1.sinks.s1.hdfs.minBlockReplicas = 1 a1.sinks.s1.hdfs.fileType =DataStream a1.sinks.s1.hdfs.writeFormat = Text a1.sinks.s1.hdfs.callTimeout = 60000 a1.sinks.s1.hdfs.idleTimeout =60 ~~~ # 代碼 先看內部Builder類,里面有configure方法 字段的默認值有CustomParameterInterceptor.Constants這個內部類提供 Builder類里面的builder是構造攔截器,用里面的類來構建 先調用這個類的構造函數 然后`List<Event> intercept`會調用當個intercept ~~~ package com.hive; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import static com.hive.CustomParameterInterceptor.Constants.*; public class CustomParameterInterceptor implements Interceptor { //指明每一行字段的分隔符 private final String fields_separator; //通過分割符分割后,指明需要那列的字段,下標 private final String indexs; //多個下標的分割符 private final String indexs_separator; //需要加密的字段下標 private final String encrypted_field_index; public CustomParameterInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) { //每一行字段的分隔符 String f = fields_separator.trim(); //多個下標的分割符 String i = indexs_separator.trim(); //通過分割符分割后,指明需要那列的字段,下標 this.indexs = indexs; //需要加密的字段下標 this.encrypted_field_index = encrypted_field_index.trim(); if (!f.equals("")) { f = UnicodeToString(f); } //指明每一行字段的分隔符 this.fields_separator = f; if (!i.equals("")) { i = UnicodeToString(i); } //多個下標的分割符 this.indexs_separator = i; } /** * unicode轉換為string * \t 制表符 ('\u0009') \n 新行(換行)符 (' ') \r 回車符 (' ') \f 換頁符 ('\u000C') \a 報警 * (bell) 符 ('\u0007') \e 轉義符 ('\u001B') \cx 空格(\u0020)對應于 x 的控制符 */ private String UnicodeToString(String str) { Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))"); Matcher matcher = pattern.matcher(str); char ch; while (matcher.find()) { ch = (char) Integer.parseInt(matcher.group(2), 16); str = str.replace(matcher.group(1), ch + ""); } return str; } @Override public void initialize() { } @Override public Event intercept(Event event) { if (event == null) { return null; } try { //Body是具體存放數據的內容,獲取每一行數據 String line = new String(event.getBody(), Charsets.UTF_8); //分隔符切分 String[] fields_spilts = line.split(fields_separator); //對應所需要的列的下標 String[] indexs_split = indexs.split(indexs_separator); String newLine = ""; //循環下標數組 for (int i = 0; i < indexs_split.length; i++) { int parseInt = Integer.parseInt(indexs_split[i]); //對字段進行加密 if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) { //將數據最終加密為md5 newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]); } else { newLine += fields_spilts[parseInt]; } //拼接字段分割符 if (i != indexs_split.length - 1) { newLine += fields_separator; } } //把這個新的一行數據設置進去 event.setBody(newLine.getBytes()); return event; } catch (Exception e) { return event; } } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> out = new ArrayList<Event>(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } public static class Builder implements Interceptor.Builder { /** * The fields_separator.指明每一行字段的分隔符 */ private String fields_separator; /** * The indexs.通過分隔符分割后,指明需要那列的字段 下標 */ private String indexs; /** * The indexs_separator. 多個下標下標的分隔符 */ private String indexs_separator; /** * The encrypted_field. 需要加密的字段下標 */ private String encrypted_field_index; //構建對應的攔截器 @Override public Interceptor build() { //用上面一個類來構建 return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index); } //能夠幫我們獲取配置文件定義的參數 @Override public void configure(Context context) { //后面的值是默認值 fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR); indexs = context.getString(INDEXS, DEFAULT_INDEXS); indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR); encrypted_field_index = context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX); } } public static class Constants { /** * The Constant FIELD_SEPARATOR. */ public static final String FIELD_SEPARATOR = "fields_separator"; /** * The Constant DEFAULT_FIELD_SEPARATOR. */ public static final String DEFAULT_FIELD_SEPARATOR = " "; /** * The Constant INDEXS. */ public static final String INDEXS = "indexs"; /** * The Constant DEFAULT_INDEXS. */ public static final String DEFAULT_INDEXS = "0"; /** * The Constant INDEXS_SEPARATOR. */ public static final String INDEXS_SEPARATOR = "indexs_separator"; /** * The Constant DEFAULT_INDEXS_SEPARATOR. */ public static final String DEFAULT_INDEXS_SEPARATOR = ","; /** * The Constant ENCRYPTED_FIELD_INDEX. */ public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index"; /** * The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */ public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = ""; /** * The Constant PROCESSTIME. */ public static final String PROCESSTIME = "processTime"; /** * The Constant PROCESSTIME. */ public static final String DEFAULT_PROCESSTIME = "a"; } /** * 字符串md5加密 */ public static class StringUtils { // 全局數組 private final static String[] strDigits = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}; // 返回形式為數字跟字符串 private static String byteToArrayString(byte bByte) { int iRet = bByte; // System.out.println("iRet="+iRet); if (iRet < 0) { iRet += 256; } int iD1 = iRet / 16; int iD2 = iRet % 16; return strDigits[iD1] + strDigits[iD2]; } // 返回形式只為數字 private static String byteToNum(byte bByte) { int iRet = bByte; System.out.println("iRet1=" + iRet); if (iRet < 0) { iRet += 256; } return String.valueOf(iRet); } // 轉換字節數組為16進制字串 private static String byteToString(byte[] bByte) { StringBuffer sBuffer = new StringBuffer(); for (int i = 0; i < bByte.length; i++) { sBuffer.append(byteToArrayString(bByte[i])); } return sBuffer.toString(); } public static String GetMD5Code(String strObj) { String resultString = null; try { resultString = new String(strObj); MessageDigest md = MessageDigest.getInstance("MD5"); // md.digest() 該函數返回值為存放哈希值結果的byte數組 resultString = byteToString(md.digest(strObj.getBytes())); } catch (NoSuchAlgorithmException ex) { ex.printStackTrace(); } return resultString; } } } ~~~ 然后把這個jar包上傳到hive的lib目錄 啟動: ~~~ flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看