<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [TOC] # flume 攔截器(interceptor) ## 1、flume攔截器介紹 > 攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件event,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。 ## 2、flume內置的攔截器 ### 2.1 時間戳攔截器 flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置: | 參數| 默認值 | 描述| | --- | --- | --- | | type timestamp | 類型名稱timestamp,也可以使用類名的全路徑org.apache.flume.interceptor.TimestampInterceptor$Builder | preserveExisting | false | 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值 > source連接到時間戳攔截器的配置: ~~~ a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false ~~~ ### 2.2 主機攔截器 > 主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。事件報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置: | 參數 | 默認值| 描述| | --- | --- | --- | | type| host | 類型名稱host,也可以使用類名的全路徑org.apache.flume.interceptor.HostInterceptor$Builder | hostHeader| host| 事件頭的key| | useIP | true | 如果設置為false,host鍵插入主機名| | preserveExisting| false| 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值| > source連接到主機攔截器的配置: ~~~ a1.sources.r1.interceptors=i2 a1.sources.r1.interceptors.i2.type=host a1.sources.r1.interceptors.i2.useIP=false a1.sources.r1.interceptors.i2.preserveExisting=false ~~~ ### 2.3 靜態攔截器 > 靜態攔截器的作用是將k/v插入到事件的報頭中。配置如下 | 參數| 默認值 | 描述| | --- | --- | --- | | type | static | 類型名稱static,也可以使用類全路徑名稱org.apache.flume.interceptor.StaticInterceptor$Builder| | key | key | 事件頭的key| | value| value | key對應的value值| | preserveExisting| true | 如果設置為true,若事件中報頭已經存在該key,不會替換value的值| > source連接到靜態攔截器的配置: ~~~ a1.sources.r1.interceptors= i3 a1.sources.r1.interceptors.static.type=static a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false ~~~ ### 2.4 正則過濾攔截器 > 在日志采集的時候,可能有一些數據是我們不需要的,這樣添加過濾攔截器,可以過濾掉不需要的日志,也可以根據需要收集滿足正則條件的日志。配置如下 | 參數 | 默認值| 描述| | --- | --- | --- | | type | REGEX_FILTER| 類型名稱REGEX_FILTER,也可以使用類全路徑名稱org.apache.flume.interceptor.RegexFilteringInterceptor$Builder| | regex | .* | 匹配除“\n”之外的任何個字符| | excludeEvents| false| 默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的| > source連接到正則過濾攔截器的配置: ~~~ a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false ~~~ > 這樣配置的攔截器就只會接收日志消息中帶有rm 或者kill的日志。 > 測試案例: ~~~ test_regex.conf # 定義這個agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source組件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = itcast01 a1.sources.r1.port = 44444 a1.sources.r1. a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER #保留內容中出現hadoop或者是spark的字符串的記錄 a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark) a1.sources.r1.interceptors.i4.excludeEvents=false # 描述和配置sink組件:k1 a1.sinks.k1.type = logger # 描述和配置channel組件,此處使用是內存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之間的連接關系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~~~ > 發送數據測試: ![](https://box.kancloud.cn/8ee4d1e179143ba54797796caeade277_576x243.png) > 打印到控制臺信息: ![](https://box.kancloud.cn/358febde9a38b69ce955fbb5252017bf_564x73.png) > 只接受到存在hadoop或者spark的記錄,驗證成功! ## 3 自定義攔截器 ### 1. 背景介紹 > Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理。 ### 2. 自定義攔截器 > 根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,并對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷。 ### 3. 實現 > 本技術方案核心包括二部分: 1)編寫java代碼,自定義攔截器; > 內容包括: 1. 定義一個類CustomParameterInterceptor實現Interceptor接口。 2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。 3. 添加CustomParameterInterceptor的有參構造方法。并對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。 4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。 5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。并給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。 6. 定義一個靜態類,類中封裝MD5加密方法 7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中 ![](https://box.kancloud.cn/112e1b76a928dea47e0c39ac529f0b99_580x512.png) 2) 修改Flume的配置信息 > 新增配置文件spool-interceptor-hdfs.conf,內容為: ~~~ a1.channels = c1 a1.sources = r1 a1.sinks = s1 #channel a1.channels.c1.type = memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=50000 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sources.r1.batchSize= 50 a1.sources.r1.inputCharset = UTF-8 a1.sources.r1.interceptors =i1 i2 a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder a1.sources.r1.interceptors.i1.fields_separator=\\u0009 a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6 a1.sources.r1.interceptors.i1.indexs_separator =\\u002c a1.sources.r1.interceptors.i1.encrypted_field_index =0 a1.sources.r1.interceptors.i2.type = timestamp #sink a1.sinks.s1.channel = c1 a1.sinks.s1.type = hdfs a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d a1.sinks.s1.hdfs.filePrefix = event a1.sinks.s1.hdfs.fileSuffix = .log a1.sinks.s1.hdfs.rollSize = 10485760 a1.sinks.s1.hdfs.rollInterval =20 a1.sinks.s1.hdfs.rollCount = 0 a1.sinks.s1.hdfs.batchSize = 1500 a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundUnit = minute a1.sinks.s1.hdfs.threadsPoolSize = 25 a1.sinks.s1.hdfs.useLocalTimeStamp = true a1.sinks.s1.hdfs.minBlockReplicas = 1 a1.sinks.s1.hdfs.fileType =DataStream a1.sinks.s1.hdfs.writeFormat = Text a1.sinks.s1.hdfs.callTimeout = 60000 a1.sinks.s1.hdfs.idleTimeout =60 ~~~ > 啟動: ~~~ bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console ~~~ ### 5.項目實現截圖: ![](https://box.kancloud.cn/206c82319f672e8fe4d9fb185b1d1654_570x245.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看