[TOC]
# 注意啟動腳本命令的書寫
agent 的名稱別寫錯了,后臺執行加上`nohup ... &`
# channel參數
~~~
capacity:默認該通道中最大的可以存儲的event數量
trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量
keep-alive:event添加到通道中或者移出的允許時間
注意:capacity > trasactionCapacity
~~~
# 日志采集到HDFS配置
## 說明1(sink端)
~~~
#定義sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.200.101:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#時間類型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按條數生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按時間生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量寫入hdfs的個數
a1.sinks.k1.hdfs.batchSize = 10000
flume操作hdfs的線程數(包括新建,寫入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超時時間
a1.sinks.k1.hdfs.callTimeout=30000
~~~
## 說明2 (sink端)
| hdfs.round | false | 如果時間戳向下舍入(如果為true,則會影響除%t之外的所有基于時間的轉義序列) |
| --- | --- | --- |
| hdfs.roundValue | 1 | 舍入到最高倍數(在使用hdfs.roundUnit配置的單位中),小于當前時間 |
| hdfs.roundUnit | second | 舍入值的單位 - second,分鐘或小時 |
* round: 默認值:false 是否啟用時間上的”舍棄”,這里的”舍棄”,類似于”四舍五入”
* roundValue:默認值:1 時間上進行“舍棄”的值;
* roundUnit: 默認值:seconds時間上進行”舍棄”的單位,包含:second,minute,hour
案例(1):
~~~
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
~~~
當時間為`2015-10-16 17:38:59`時候,hdfs.path依然會被解析為:
~~~
/flume/events/2015-10-16/17:30/00
/flume/events/2015-10-16/17:40/00
/flume/events/2015-10-16/17:50/00
~~~
因為設置的是舍棄10分鐘內的時間,因此,該目錄每10分鐘新生成一個。
案例(2):
~~~
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H:%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
~~~
現象:10秒為時間梯度生成對應的目錄,目錄下面包括很多小文件!!!
HDFS產生的數據目錄格式如下:
~~~
/flume/events/2016-07-28/18:45/10
/flume/events/2016-07-28/18:45/20
/flume/events/2016-07-28/18:45/30
/flume/events/2016-07-28/18:45/40
/flume/events/2016-07-28/18:45/50
/flume/events/2016-07-28/18:46/10
/flume/events/2016-07-28/18:46/20
/flume/events/2016-07-28/18:46/30
/flume/events/2016-07-28/18:46/40
/flume/events/2016-07-28/18:46/50
~~~
# 斷點續傳
日志采集使用tail -F 監控一個文件新增的內容
(詳細見案例:flume的第6個配置案例-分類收集數據-使用static攔截器)
Source端的代碼:
~~~
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/data/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
~~~
這里會出現這樣一個情況,當你的這個flume agent程序掛了或者是服務器宕機了,那么隨著文件內容的增加,下次重啟時,會消費到重復的數據, 怎么辦呢?
解決方案:使用改進版的配置信息,修改信息
~~~
a1.sources.r2.command= tail??-n?+$(tail?-n1?/root/log)?-F?/root/data/nginx.log?|?awk?'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print?$0;print?i?>>?"/root/log";fflush("")}'?/root/log-?
~~~
意思就是說:Source每次讀取一條信息,就往/root/log文件記住當前消息的行數。這樣的話當你的程序掛了之后,重啟時先獲取上次讀取所在的行數,依次從下讀,這樣避免了數據重復。
而在flume1.7已經集成了該功能
配置文件:
~~~
a1.channels = ch1
a1.sources = s1
a1.sinks = hdfs-sink1
#channel
a1.channels.ch1.type = memory
a1.channels.ch1.capacity=100000
a1.channels.ch1.transactionCapacity=50000
#source
a1.sources.s1.channels = ch1
#監控一個目錄下的多個文件新增的內容
a1.sources.s1.type = taildir
#通過 json 格式存下每個文件消費的偏移量,避免從頭消費
a1.sources.s1.positionFile = /var/local/apache-flume-1.7.0-bin/taildir_position.json
a1.sources.s1.filegroups = f1 f2 f3
a1.sources.s1.filegroups.f1 = /root/data/access.log
a1.sources.s1.filegroups.f2 = /root/data/nginx.log
a1.sources.s1.filegroups.f3 = /root/data/web.log
a1.sources.s1.headers.f1.headerKey = access
a1.sources.s1.headers.f2.headerKey = nginx
a1.sources.s1.headers.f3.headerKey = web
a1.sources.s1.fileHeader = true
##sink
a1.sinks.hdfs-sink1.channel = ch1
a1.sinks.hdfs-sink1.type = hdfs
a1.sinks.hdfs-sink1.hdfs.path =hdfs://master:9000/demo/data
a1.sinks.hdfs-sink1.hdfs.filePrefix = event_data
a1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
a1.sinks.hdfs-sink1.hdfs.rollSize = 10485760
a1.sinks.hdfs-sink1.hdfs.rollInterval =20
a1.sinks.hdfs-sink1.hdfs.rollCount = 0
a1.sinks.hdfs-sink1.hdfs.batchSize = 1500
a1.sinks.hdfs-sink1.hdfs.round = true
a1.sinks.hdfs-sink1.hdfs.roundUnit = minute
a1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
a1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
a1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
a1.sinks.hdfs-sink1.hdfs.fileType =DataStream
a1.sinks.hdfs-sink1.hdfs.writeFormat = Text
a1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
~~~
# flume的header參數配置講解
~~~
#配置信息test-header.conf
a1.channels?=?c1
a1.sources?=?r1
a1.sinks?=?k1
#channel
a1.channels.c1.type?=?memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels?=?c1
a1.sources.r1.type?=?spooldir
a1.sources.r1.spoolDir?=?/var/tmp
a1.sources.r1.batchSize=?100
a1.sources.r1.inputCharset?=?UTF-8
a1.sources.r1.fileHeader?=?true
# 控制臺的key,mmm,文件的絕對路徑
a1.sources.r1.fileHeaderKey?=?mmm?
a1.sources.r1.basenameHeader?=?true
# 文件的名稱,看下面的控制臺
a1.sources.r1.basenameHeaderKey?=?nnn
#sink
a1.sinks.k1.type?=?logger
a1.sinks.k1.channel?=?c1
~~~
執行腳本:
~~~
bin/flume-ng?agent?-c?conf?-f?conf/test-header.conf??-name?a1?-Dflume.root.logger=DEBUG,console
~~~
看到內容控制臺打印的信息:
~~~
Event:?{?headers:{mmm=/var/tmp/bbb,?nnn=bbb}?body:?30?30?30?000?}
Event:?{?headers:{mmm=/var/tmp/aaa,?nnn=aaa}?body:?31?31?31?111?}
~~~
其中aaa,?bbb?為目錄/var/tmp?下面的2個文件名稱
官網描述:

- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介