[TOC]
# Flume簡介及安裝
## 1 Flume介紹
### 1.1 概述
1. Flume是Cloudera提供的一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。
2. Flume可以采集文件,socket數據包、文件夾等各種形式源數據,又可以將采集到的數據輸出到HDFS、hbase、hive、kafka等眾多外部存儲系統中
3. 一般的采集需求,通過對flume的簡單配置即可實現
4. Flume針對特殊場景也具備良好的自定義擴展能力,因此,flume可以適用于大部分的日常數據采集場景
> 當前Flume有兩個版本:
~~~
Flume 0.9X版本的統稱Flume-og,
Flume1.X版本的統稱Flume-ng。
~~~
> 由于Flume-ng經過重大重構,與Flume-og有很大不同,使用時請注意區分。
### 1.2 運行機制
1) Flume分布式系統中最核心的角色是agent,flume采集系統就是由一個個agent所連接起來形成
2) 每一個agent相當于一個數據傳遞員,內部有三個組件:
a) Source:采集源,用于跟數據源對接,以獲取數據
b) Sink:下沉地,采集數據的傳送目的,用于往下一級agent傳遞數據或者往最終存儲系統傳遞數據
c) Channel:angent內部的數據傳輸通道,用于從source將數據傳遞到sink

### 1.3 Flume采集系統結構圖
#### 1.3.1. 簡單結構
> 單個agent采集數據

#### 1.3.2. 復雜結構
> 多級agent之間串聯
1) 第一種:2個agent串聯

2) 第二種:多個agent的采集的數據進行匯總

3) 第三種:采集的數據可以下層到不同的系統中

## 2 Flume實戰案例
### 2.1 Flume的安裝部署
1) Flume的安裝非常簡單,只需要解壓即可,當然,前提是已有hadoop環境
~~~
上傳安裝包到數據源所在節點上
然后解壓 tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后進入flume的目錄,修改conf下的flume-env.sh,在里面配置JAVA_HOME
~~~
2) 根據數據采集的需求配置采集方案,描述在配置文件中(文件名可任意自定義)
3) 指定采集方案配置文件,在相應的節點上啟動flume agent
> 先用一個最簡單的例子來測試一下程序環境是否正常

1) 先在flume的conf目錄下新建一個文件
~~~
vi netcat-logger.conf
~~~
~~~
# 定義這個agent中各組件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source組件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = itcast01
a1.sources.r1.port = 44444
# 描述和配置sink組件:k1
a1.sinks.k1.type = logger
# 描述和配置channel組件,此處使用是內存緩存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之間的連接關系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
~~~
2) 啟動agent去采集數據
~~~
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定flume自身的配置文件所在目錄
-f conf/netcat-logger.con 指定我們所描述的采集方案
-n a1 指定我們這個agent的名字
~~~
3) 測試
> NaNundefined先要往agent采集監聽的端口上發送數據,讓agent有數據可采
> NaNundefined隨便在一個能跟agent節點聯網的機器上
> NaNundefinedtelnet anget-hostname port (telnet itcast01 44444)

### 2.2 Flume中常用的source、channel、sink組件
#### 2.2.1 source組件
| Source類型 | 說明 |
| --- | --- |
| Avro Source | 支持Avro協議(實際上是Avro RPC),內置支持 |
| Thrift Source | 支持Thrift協議,內置支持 |
| Exec Source | 基于Unix的command在標準輸出上生產數據 |
| JMS Source | 從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 |
| Spooling Directory Source | 監控指定目錄內數據變更 |
| Twitter 1% firehose Source | 通過API持續下載Twitter數據,試驗性質 |
| Netcat Source | 監控某個端口,將流經端口的每一個文本行數據作為Event輸入 |
| Sequence Generator Source | 序列生成器數據源,生產序列數據 |
| Syslog Sources| 讀取syslog數據,產生Event,支持UDP和TCP兩種協議 |
| HTTP Source | 基于HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 |
| Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
#### 2.2.2 Channel組件
| Channel類型 | 說明 |
| ---------- | --- |
| Memory Channel | Event數據存儲在內存中 |
| JDBC Channel | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby |
| File Channel | Event數據存儲在磁盤文件中 |
| Spillable Memory Channel | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) |
| Pseudo Transaction Channel | 測試用途 |
| Custom Channel | 自定義Channel實現 |
#### 2.2.3 sink組件
| Sink類型 | 說明 |
| --- | --- |
| HDFS Sink | 數據寫入HDFS |
| Logger Sink | 數據寫入日志文件 |
| Avro Sink | 數據被轉換成Avro Event,然后發送到配置的RPC端口上 |
| Thrift Sink | 數據被轉換成Thrift Event,然后發送到配置的RPC端口上 |
| IRC Sink | 數據在IRC上進行回放 |
| File Roll Sink | | 存儲數據到本地文件系統 |
| Null Sink | 丟棄到所有數據 |
| HBase Sink | 數據寫入HBase數據庫 |
| Morphline Solr Sink | 數據發送到Solr搜索服務器(集群) |
| ElasticSearch Sink | 數據發送到Elastic Search搜索服務器(集群) |
| Kite Dataset Sink | 寫數據到Kite Dataset,試驗性質的 |
| Custom Sink | 自定義Sink實現 |
> Flume支持眾多的source、channel、sink類型,詳細手冊可參考官方文檔
> http://flume.apache.org/FlumeUserGuide.html
### 2.3 采集案例
#### 2.3.1、采集目錄到HDFS
> 采集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就需要把文件采集到HDFS中去
> 根據需求,首先定義以下3大要素
1. 采集源,即source——監控文件目錄 : spooldir
2. 下沉目標,即sink——HDFS文件系統 : hdfs sink
3. source和sink之間的傳遞通道——channel,可用file channel 也可以用內存memory channel
> 配置文件編寫:
~~~
#定義三大組件的名稱
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 配置source組件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/data/
agent1.sources.source1.fileHeader = false
#配置攔截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
# 配置sink組件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#滾動生成的文件按大小生成
agent1.sinks.sink1.hdfs.rollSize = 102400
#滾動生成的文件按行數生成
agent1.sinks.sink1.hdfs.rollCount = 1000000
#滾動生成的文件按時間生成
agent1.sinks.sink1.hdfs.rollInterval = 60
#開啟滾動生成目錄
agent1.sinks.sink1.hdfs.round = true
#以10為一梯度滾動生成
agent1.sinks.sink1.hdfs.roundValue = 10
#單位為分鐘
agent1.sinks.sink1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
agent1.channels.channel1.keep-alive = 120
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
~~~
> flume的source采用spoodir時! 目錄下面不允許存放同名的文件,否則報錯!
> Channel參數解釋:
| capacity| 默認該通道中最大的可以存儲的event數量|
| --- | --- |
| trasactionCapacity| 每次最大可以從source中拿到或者送到sink中的event數量|
| keep-alive| event添加到通道中或者移出的允許時間|
> 其他組件:Interceptor(攔截器)
> 用于Source的一組Interceptor,按照預設的順序在必要地方裝飾和過濾events。
> 內建的Interceptors允許增加event的headers比如:時間戳、主機名、靜態標記等等
> 定制的interceptors可以通過內省event payload(讀取原始日志),實現自己的業務邏輯(很強大)
#### 2.3.2、采集文件到HDFS
> 采集需求:比如業務系統使用log4j生成的日志,日志內容不斷增加,需要把追加到日志文件中的數據實時采集到hdfs

> 根據需求,首先定義以下3大要素
1. 采集源,即source——監控文件內容更新 : exec ‘tail -F file’
2. 下沉目標,即sink——HDFS文件系統 : hdfs sink
3. Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內存channel
> 配置文件編寫:
~~~
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1 i2
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path=hdfs://itcast01:9000/file/%{hostname}/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 10240
agent1.sinks.sink1.hdfs.rollCount = 1000
agent1.sinks.sink1.hdfs.rollInterval = 10
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
~~~
#### 2.3.3、多個agent串聯
> 采集需求:比如業務系統使用log4j生成的日志,日志內容不斷增加,需要把追加到日志文件中的數據實時采集到hdfs,使用agent串聯

> 根據需求,首先定義以下3大要素
> 第一臺flume agent
1. 采集源,即source——監控文件內容更新 : exec ‘tail -F file’
2. 下沉目標,即sink——數據的發送者,實現序列化 : avro sink
3. Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內存channel
> 第二臺flume agent
1. 采集源,即source——接受數據。并實現反序列化 : avro source
2. 下沉目標,即sink——HDFS文件系統 : HDFS sink
3. Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內存channel
> 配置文件編寫:
~~~
Flume-agent1
#tail-avro-avro-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1
# Describe the sink
##sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = itcast02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 10
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Flume-agent2: avro-hdfs.conf
a1.sources = r1
a1.sinks =s1
a1.channels = c1
##source中的avro組件是一個接收者服務
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://itcast01:9000/flumedata
a1.sinks.s1.hdfs.filePrefix = access_log
a1.sinks.s1.hdfs.batchSize= 100
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.writeFormat =Text
a1.sinks.s1.hdfs.rollSize = 10240
a1.sinks.s1.hdfs.rollCount = 1000
a1.sinks.s1.hdfs.rollInterval = 10
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundValue = 10
a1.sinks.s1.hdfs.roundUnit = minute
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
~~~
#### 2.3.4、高可用配置案例
(一)、failover故障轉移
> 在完成單點的Flume NG搭建后,下面我們搭建一個高可用的Flume NG集群,架構圖如下所示:

(1)節點分配
> Flume的Agent和Collector分布如下表所示:
| 名稱 | Ip地址 | Host| 角色|
| --- | --- |--- |--- |
| Agent1 | 192.168.200.101| Itcast01 | WebServer|
| Collector1 | 192.168.200.102 | Itcast02| AgentMstr1|
| Collector2 | 192.168.200.103| Itcast03 | AgentMstr2|
> Agent1數據分別流入到Collector1和Collector2,Flume NG本身提供了Failover機制,可以自動切換和恢復。下面我們開發配置Flume NG集群。
(2)配置
> 在下面單點Flume中,基本配置都完成了,我們只需要新添加兩個配置文件,它們是flume-client.conf和flume-server.conf,其配置內容如下所示:
1) itcast01上的flume-client.conf配置
~~~
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/log/test.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = itcast02
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = itcast03
agent1.sinks.k2.port = 52020
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 5
agent1.sinkgroups.g1.processor.maxpenalty = 10000
#這里首先要申明一個sinkgroups,然后再設置2個sink ,k1與k2,其中2個優先級是10和5,#而processor的maxpenalty被設置為10秒,默認是30秒。‘
~~~
> 啟動命令:
~~~
bin/flume-ng agent -n agent1 -c conf -f conf/flume-client.conf
-Dflume.root.logger=DEBUG,console
~~~
2) Itcast02和itcast03上的flume-server.conf配置
~~~
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 52020
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader=hostname
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/flume/logs/%{hostname}
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
~~~
> 啟動命令:
~~~
bin/flume-ng agent -n agent1 -c conf -f conf/flume-server.conf
-Dflume.root.logger=DEBUG,console
~~~
(3)測試failover
1) 先在itcast02和itcast03上啟動腳本
~~~
bin/flume-ng agent -n a1 -c conf -f conf/flume-server.conf
-Dflume.root.logger=DEBUG,consoln
~~~
2) 然后啟動itcast01上的腳本
~~~
bin/flume-ng agent -n agent1 -c conf -f conf/flume-client.conf
-Dflume.root.logger=DEBUG,console
~~~
3) Shell腳本生成數據
~~~
while true;do date >> test.log; sleep 1s ;done
~~~
4) 觀察HDFS上生成的數據目錄。只觀察到itcast02在接受數據

5) Itcast02上的agent被干掉之后,繼續觀察HDFS上生成的數據目錄,itcast03對應的ip目錄出現,此時數據收集切換到itcast03上

6) Itcast02上的agent重啟后,繼續觀察HDFS上生成的數據目錄。此時數據收集切換到itcast02上,又開始繼續工作

(二)、load balance負載均衡
(1)節點分配
> 如failover故障轉移的節點分配
(2)配置
> 在failover故障轉移的配置上稍作修改
> itcast01上的flume-client-loadbalance.conf配置
~~~
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/log/test.log
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = itcast02
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = itcast03
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set load-balance
agent1.sinkgroups.g1.processor.type = load_balance
# 默認是round_robin,還可以選擇random
agent1.sinkgroups.g1.processor.selector = round_robin
#如果backoff被開啟,則 sink processor會屏蔽故障的sink
agent1.sinkgroups.g1.processor.backoff = true
~~~
> Itcast02和itcast03上的flume-server-loadbalance.conf配置
~~~
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 52020
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i2.useIP=false
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/flume/loadbalance/%{hostname}
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
~~~
(3)測試load balance
1) 先在itcast02和itcast03上啟動腳本
bin/flume-ng agent -n a1 -c conf -f conf/flume-server-loadbalance.conf
-Dflume.root.logger=DEBUG,console
2) 然后啟動itcast01上的腳本
~~~
bin/flume-ng agent -n agent1 -c conf -f conf/flume-client-loadbalance.conf
-Dflume.root.logger=DEBUG,console
~~~
3) Shell腳本生成數據
~~~
while true;do date >> test.log; sleep 1s ;done
~~~
4) 觀察HDFS上生成的數據目錄,由于輪訓機制都會收集到數據

5) Itcast02上的agent被干掉之后,itcast02上不在產生數據

6) Itcast02上的agent重新啟動后,兩者都可以接受到數據

#### 2.3.5、Flume日志分類采集匯總
> <見案例資料>
#### 2.3.6、Flume自定義攔截器
> <見案例資料>
## 3、Flume實際使用時需要注意的事項
1) 注意啟動腳本命令的書寫
> agent 的名稱別寫錯了,后臺執行加上nohup ...
2) channel參數
| capacity:| 默認該通道中最大的可以存儲的event數量|
| --- | --- |
| trasactionCapacity:| 每次最大可以從source中拿到或者送到sink中的event數量|
| keep-alive:| event添加到通道中或者移出的允許時間|
> 注意:capacity > trasactionCapacity
3) 日志采集到HDFS配置說明1(sink端)
> #定義sink
~~~
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.200.101:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#時間類型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按條數生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按時間生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量寫入hdfs的個數
a1.sinks.k1.hdfs.batchSize = 10000
flume操作hdfs的線程數(包括新建,寫入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超時時間
a1.sinks.k1.hdfs.callTimeout=30000
~~~
4) 日志采集到HDFS配置說明2(sink端)
> hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
> hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured usinghdfs.roundUnit), less than current time.
> hdfs.roundUnit second The unit of the round down value -
> second, minute or hour.
| round | 默認值:false 是否啟用時間上的”舍棄”,這里的”舍棄”,類似于”四舍五入”|
| --- | --- |
| roundValue| 默認值:1 時間上進行“舍棄”的值;
| roundUnit | 默認值:seconds時間上進行”舍棄”的單位,包含:second,minute,hour|
> 案例(1):
> a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S
> a1.sinks.k1.hdfs.round = true
> a1.sinks.k1.hdfs.roundValue = 10
> a1.sinks.k1.hdfs.roundUnit = minute
> 當時間為2015-10-16 17:38:59時候,hdfs.path依然會被解析為:
~~~
/flume/events/2015-10-16/17:30/00
/flume/events/2015-10-16/17:40/00
/flume/events/2015-10-16/17:50/00
~~~
> 因為設置的是舍棄10分鐘內的時間,因此,該目錄每10分鐘新生成一個。
> 案例(2):
~~~
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
~~~
> 現象:10秒為時間梯度生成對應的目錄,目錄下面包括很多小文件!!!
> HDFS產生的數據目錄格式如下:
~~~
/flume/events/2016-07-28/18:45/10
/flume/events/2016-07-28/18:45/20
/flume/events/2016-07-28/18:45/30
/flume/events/2016-07-28/18:45/40
/flume/events/2016-07-28/18:45/50
/flume/events/2016-07-28/18:46/10
/flume/events/2016-07-28/18:46/20
/flume/events/2016-07-28/18:46/30
/flume/events/2016-07-28/18:46/40
/flume/events/2016-07-28/18:46/50
~~~
5) 日志采集使用tail -F 監控一個文件新增的內容(斷點續傳)
> (詳細見案例:flume的第6個配置案例-分類收集數據-使用static攔截器)
> Source端的代碼:
~~~
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/data/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
~~~
> 這里會出現這樣一個情況,當你的這個flume agent程序掛了或者是服務器宕機了,那么隨著文件內容的增加,下次重啟時,會消費到重復的數據, 怎么辦呢?
> 解決方案:使用改進版的配置信息,修改信息
~~~
a1.sources.r2.command= tail -n +$(tail -n1 /root/log) -F /root/data/nginx.log | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/root/log";fflush("")}' /root/log-
~~~
> 意思就是說:Source每次讀取一條信息,就往/root/log文件記住當前消息的行數。這樣的話當你的程序掛了之后,重啟時先獲取上次讀取所在的行數,依次從下讀,這樣避免了數據重復。
> 而在flume1.7已經集成了該功能
> 配置文件:
> 配置案例:
~~~
a1.channels = ch1
a1.sources = s1
a1.sinks = hdfs-sink1
#channel
a1.channels.ch1.type = memory
a1.channels.ch1.capacity=100000
a1.channels.ch1.transactionCapacity=50000
#source
a1.sources.s1.channels = ch1
#監控一個目錄下的多個文件新增的內容
a1.sources.s1.type = taildir
#通過 json 格式存下每個文件消費的偏移量,避免從頭消費
a1.sources.s1.positionFile = /var/local/apache-flume-1.7.0-bin/taildir_position.json
a1.sources.s1.filegroups = f1 f2 f3
a1.sources.s1.filegroups.f1 = /root/data/access.log
a1.sources.s1.filegroups.f2 = /root/data/nginx.log
a1.sources.s1.filegroups.f3 = /root/data/web.log
a1.sources.s1.headers.f1.headerKey = access
a1.sources.s1.headers.f2.headerKey = nginx
a1.sources.s1.headers.f3.headerKey = web
a1.sources.s1.fileHeader = true
##sink
a1.sinks.hdfs-sink1.channel = ch1
a1.sinks.hdfs-sink1.type = hdfs
a1.sinks.hdfs-sink1.hdfs.path =hdfs://master:9000/demo/data
a1.sinks.hdfs-sink1.hdfs.filePrefix = event_data
a1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
a1.sinks.hdfs-sink1.hdfs.rollSize = 10485760
a1.sinks.hdfs-sink1.hdfs.rollInterval =20
a1.sinks.hdfs-sink1.hdfs.rollCount = 0
a1.sinks.hdfs-sink1.hdfs.batchSize = 1500
a1.sinks.hdfs-sink1.hdfs.round = true
a1.sinks.hdfs-sink1.hdfs.roundUnit = minute
a1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
a1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
a1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
a1.sinks.hdfs-sink1.hdfs.fileType =DataStream
a1.sinks.hdfs-sink1.hdfs.writeFormat = Text
a1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
6)flume的header參數配置講解
#配置信息test-header.conf
a1.channels = c1
a1.sources = r1
a1.sinks = k1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/tmp
a1.sources.r1.batchSize= 100
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = mmm
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = nnn
#sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
~~~
執行腳本:
~~~
bin/flume-ng agent -c conf -f conf/test-header.conf -name a1 -Dflume.root.logger=DEBUG,console
~~~
> 看到內容控制臺打印的信息:
~~~
Event: { headers:{mmm=/var/tmp/bbb, nnn=bbb} body: 30 30 30 000 }
Event: { headers:{mmm=/var/tmp/aaa, nnn=aaa} body: 31 31 31 111 }
~~~
> 其中aaa bbb 為目錄/var/tmp 下面的2個文件名稱
> 官網描述:

- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰