[TOC]
# flume 攔截器(interceptor)
## 1、flume攔截器介紹
> 攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件event,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。
## 2、flume內置的攔截器
### 2.1 時間戳攔截器
flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置:
| 參數| 默認值 | 描述|
| --- | --- | --- |
| type timestamp | 類型名稱timestamp,也可以使用類名的全路徑org.apache.flume.interceptor.TimestampInterceptor$Builder
| preserveExisting | false | 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值
> source連接到時間戳攔截器的配置:
~~~
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false
~~~
### 2.2 主機攔截器
> 主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。事件報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置:
| 參數 | 默認值| 描述|
| --- | --- | --- |
| type| host | 類型名稱host,也可以使用類名的全路徑org.apache.flume.interceptor.HostInterceptor$Builder
| hostHeader| host| 事件頭的key|
| useIP | true | 如果設置為false,host鍵插入主機名|
| preserveExisting| false| 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值|
> source連接到主機攔截器的配置:
~~~
a1.sources.r1.interceptors=i2
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.useIP=false
a1.sources.r1.interceptors.i2.preserveExisting=false
~~~
### 2.3 靜態攔截器
> 靜態攔截器的作用是將k/v插入到事件的報頭中。配置如下
| 參數| 默認值 | 描述|
| --- | --- | --- |
| type | static | 類型名稱static,也可以使用類全路徑名稱org.apache.flume.interceptor.StaticInterceptor$Builder|
| key | key | 事件頭的key|
| value| value | key對應的value值|
| preserveExisting| true | 如果設置為true,若事件中報頭已經存在該key,不會替換value的值|
> source連接到靜態攔截器的配置:
~~~
a1.sources.r1.interceptors= i3
a1.sources.r1.interceptors.static.type=static a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false
~~~
### 2.4 正則過濾攔截器
> 在日志采集的時候,可能有一些數據是我們不需要的,這樣添加過濾攔截器,可以過濾掉不需要的日志,也可以根據需要收集滿足正則條件的日志。配置如下
| 參數 | 默認值| 描述|
| --- | --- | --- |
| type | REGEX_FILTER| 類型名稱REGEX_FILTER,也可以使用類全路徑名稱org.apache.flume.interceptor.RegexFilteringInterceptor$Builder|
| regex | .* | 匹配除“\n”之外的任何個字符|
| excludeEvents| false| 默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的|
> source連接到正則過濾攔截器的配置:
~~~
a1.sources.r1.interceptors=i4
a1.sources.r1.interceptors.i4.type=REGEX_FILTER a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false
~~~
> 這樣配置的攔截器就只會接收日志消息中帶有rm 或者kill的日志。
> 測試案例:
~~~
test_regex.conf
# 定義這個agent中各組件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source組件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = itcast01
a1.sources.r1.port = 44444
a1.sources.r1.
a1.sources.r1.interceptors=i4
a1.sources.r1.interceptors.i4.type=REGEX_FILTER
#保留內容中出現hadoop或者是spark的字符串的記錄
a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark)
a1.sources.r1.interceptors.i4.excludeEvents=false
# 描述和配置sink組件:k1
a1.sinks.k1.type = logger
# 描述和配置channel組件,此處使用是內存緩存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之間的連接關系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
~~~
> 發送數據測試:

> 打印到控制臺信息:

> 只接受到存在hadoop或者spark的記錄,驗證成功!
## 3 自定義攔截器
### 1. 背景介紹
> Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理。
### 2. 自定義攔截器
> 根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,并對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷。
### 3. 實現
> 本技術方案核心包括二部分:
1)編寫java代碼,自定義攔截器;
> 內容包括:
1. 定義一個類CustomParameterInterceptor實現Interceptor接口。
2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。
3. 添加CustomParameterInterceptor的有參構造方法。并對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。
4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。
5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。并給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。
6. 定義一個靜態類,類中封裝MD5加密方法
7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中

2) 修改Flume的配置信息
> 新增配置文件spool-interceptor-hdfs.conf,內容為:
~~~
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = timestamp
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
~~~
> 啟動:
~~~
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
~~~
### 5.項目實現截圖:

- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰