[TOC]
# 簡介
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor(時間戳)、HostInterceptor(主機)、RegexExtractorInterceptor(正則)等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理
# 1自定義攔截器
根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,并對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷
# 實現
二部分
## 編寫java代碼,自定義攔截器;
內容包括:
1. 定義一個類CustomParameterInterceptor實現Interceptor接口。
2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。
3. 添加CustomParameterInterceptor的有參構造方法。并對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。
4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。
5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。并給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。
6. 定義一個靜態類,類中封裝MD5加密方法
7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中

## 修改Flume的配置信息
新增配置文件spool-interceptor-hdfs.conf,內容為:
~~~
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
# 監控/root/data/下的文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
# 攔截器
a1.sources.r1.interceptors =i1 i2
# 自己定義的java類, $表示內部類
a1.sources.r1.interceptors.i1.type =com.hive.CustomParameterInterceptor$Builder
# 自定義攔截器的屬性,這個是代表分隔符,ulecode編碼的
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
# 當前列我需要取哪些列的下標
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
# 當前索引用的什么分隔符,用的是逗號,寫urlencode編碼
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
# 具體加密字段的參數
a1.sources.r1.interceptors.i1.encrypted_field_index =0
# 這個攔截器類型是時間戳
a1.sources.r1.interceptors.i2.type = timestamp
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
# 當前類hdfs里面的屬性值
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
~~~
# 代碼
先看內部Builder類,里面有configure方法
字段的默認值有CustomParameterInterceptor.Constants這個內部類提供
Builder類里面的builder是構造攔截器,用里面的類來構建
先調用這個類的構造函數
然后`List<Event> intercept`會調用當個intercept
~~~
package com.hive;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.hive.CustomParameterInterceptor.Constants.*;
public class CustomParameterInterceptor implements Interceptor {
//指明每一行字段的分隔符
private final String fields_separator;
//通過分割符分割后,指明需要那列的字段,下標
private final String indexs;
//多個下標的分割符
private final String indexs_separator;
//需要加密的字段下標
private final String encrypted_field_index;
public CustomParameterInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) {
//每一行字段的分隔符
String f = fields_separator.trim();
//多個下標的分割符
String i = indexs_separator.trim();
//通過分割符分割后,指明需要那列的字段,下標
this.indexs = indexs;
//需要加密的字段下標
this.encrypted_field_index = encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeToString(f);
}
//指明每一行字段的分隔符
this.fields_separator = f;
if (!i.equals("")) {
i = UnicodeToString(i);
}
//多個下標的分割符
this.indexs_separator = i;
}
/**
* unicode轉換為string
* \t 制表符 ('\u0009') \n 新行(換行)符 (' ') \r 回車符 (' ') \f 換頁符 ('\u000C') \a 報警
* (bell) 符 ('\u0007') \e 轉義符 ('\u001B') \cx 空格(\u0020)對應于 x 的控制符
*/
private String UnicodeToString(String str) {
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(str);
char ch;
while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(2), 16);
str = str.replace(matcher.group(1), ch + "");
}
return str;
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
//Body是具體存放數據的內容,獲取每一行數據
String line = new String(event.getBody(), Charsets.UTF_8);
//分隔符切分
String[] fields_spilts = line.split(fields_separator);
//對應所需要的列的下標
String[] indexs_split = indexs.split(indexs_separator);
String newLine = "";
//循環下標數組
for (int i = 0; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//對字段進行加密
if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) {
//將數據最終加密為md5
newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]);
} else {
newLine += fields_spilts[parseInt];
}
//拼接字段分割符
if (i != indexs_split.length - 1) {
newLine += fields_separator;
}
}
//把這個新的一行數據設置進去
event.setBody(newLine.getBytes());
return event;
} catch (Exception e) {
return event;
}
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> out = new ArrayList<Event>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
/**
* The fields_separator.指明每一行字段的分隔符
*/
private String fields_separator;
/**
* The indexs.通過分隔符分割后,指明需要那列的字段 下標
*/
private String indexs;
/**
* The indexs_separator. 多個下標下標的分隔符
*/
private String indexs_separator;
/**
* The encrypted_field. 需要加密的字段下標
*/
private String encrypted_field_index;
//構建對應的攔截器
@Override
public Interceptor build() {
//用上面一個類來構建
return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index);
}
//能夠幫我們獲取配置文件定義的參數
@Override
public void configure(Context context) {
//后面的值是默認值
fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(INDEXS, DEFAULT_INDEXS);
indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index = context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
}
}
public static class Constants {
/**
* The Constant FIELD_SEPARATOR.
*/
public static final String FIELD_SEPARATOR = "fields_separator";
/**
* The Constant DEFAULT_FIELD_SEPARATOR.
*/
public static final String DEFAULT_FIELD_SEPARATOR = " ";
/**
* The Constant INDEXS.
*/
public static final String INDEXS = "indexs";
/**
* The Constant DEFAULT_INDEXS.
*/
public static final String DEFAULT_INDEXS = "0";
/**
* The Constant INDEXS_SEPARATOR.
*/
public static final String INDEXS_SEPARATOR = "indexs_separator";
/**
* The Constant DEFAULT_INDEXS_SEPARATOR.
*/
public static final String DEFAULT_INDEXS_SEPARATOR = ",";
/**
* The Constant ENCRYPTED_FIELD_INDEX.
*/
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";
/**
* The Constant DEFAUL_TENCRYPTED_FIELD_INDEX.
*/
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";
/**
* The Constant PROCESSTIME.
*/
public static final String PROCESSTIME = "processTime";
/**
* The Constant PROCESSTIME.
*/
public static final String DEFAULT_PROCESSTIME = "a";
}
/**
* 字符串md5加密
*/
public static class StringUtils {
// 全局數組
private final static String[] strDigits = {"0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f"};
// 返回形式為數字跟字符串
private static String byteToArrayString(byte bByte) {
int iRet = bByte;
// System.out.println("iRet="+iRet);
if (iRet < 0) {
iRet += 256;
}
int iD1 = iRet / 16;
int iD2 = iRet % 16;
return strDigits[iD1] + strDigits[iD2];
}
// 返回形式只為數字
private static String byteToNum(byte bByte) {
int iRet = bByte;
System.out.println("iRet1=" + iRet);
if (iRet < 0) {
iRet += 256;
}
return String.valueOf(iRet);
}
// 轉換字節數組為16進制字串
private static String byteToString(byte[] bByte) {
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < bByte.length; i++) {
sBuffer.append(byteToArrayString(bByte[i]));
}
return sBuffer.toString();
}
public static String GetMD5Code(String strObj) {
String resultString = null;
try {
resultString = new String(strObj);
MessageDigest md = MessageDigest.getInstance("MD5");
// md.digest() 該函數返回值為存放哈希值結果的byte數組
resultString = byteToString(md.digest(strObj.getBytes()));
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
return resultString;
}
}
}
~~~
然后把這個jar包上傳到hive的lib目錄
啟動:
~~~
flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介