[TOC]
# 簡介
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor(時間戳)、HostInterceptor(主機)、RegexExtractorInterceptor(正則)等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理
# 自定義攔截器
根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,并對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷
# 需求
~~~
13601249301 100 200 300 400 500 600 700
13601249302 100 200 300 400 500 600 700
13601249303 100 200 300 400 500 600 700
13601249304 100 200 300 400 500 600 700
13601249305 100 200 300 400 500 600 700
13601249306 100 200 300 400 500 600 700
13601249307 100 200 300 400 500 600 700
13601249308 100 200 300 400 500 600 700
13601249309 100 200 300 400 500 600 700
13601249310 100 200 300 400 500 600 700
13601249311 100 200 300 400 500 600 700
13601249312 100 200 300 400 500 600 700
~~~
把這個變成這個樣子
第一行加密,中間有幾行舍棄

# 實現
二部分
## 編寫java代碼,自定義攔截器;
內容包括:
1. 定義一個類CustomParameterInterceptor實現Interceptor接口。
2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。
3. 添加CustomParameterInterceptor的有參構造方法。并對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。
4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。
5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。并給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。
6. 定義一個靜態類,類中封裝MD5加密方法
7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中

## 修改Flume的配置信息
新增配置文件spool-interceptor-hdfs.conf,內容為:
~~~
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
# 監控/root/data/下的文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
# 攔截器
a1.sources.r1.interceptors =i1 i2
# 自己定義的java類, $表示內部類
a1.sources.r1.interceptors.i1.type =com.hive.CustomParameterInterceptor$Builder
# 下面的定義屬性會傳遞到自定義的類中
# 自定義攔截器的屬性,這個是代表分隔符,unicode編碼的空格
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
# 當前列我需要取哪些列的下標
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
# 當前索引用的什么分隔符,用的是逗號,寫unicode編碼
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
# 具體加密字段的參數,第一列加密
a1.sources.r1.interceptors.i1.encrypted_field_index =0
# 這個攔截器類型是時間戳
a1.sources.r1.interceptors.i2.type = timestamp
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://master:9000/flume/%Y%m%d
# 當前類hdfs里面的屬性值
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
~~~
# 代碼
先看內部Builder類,里面有configure方法
字段的默認值有CustomParameterInterceptor.Constants這個內部類提供
Builder類里面的builder是構造攔截器,用里面的類來構建
先調用這個類的構造函數
然后`List<Event> intercept`會調用當個intercept
~~~
package com.hive;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.hive.CustomParameterInterceptor.Constants.*;
public class CustomParameterInterceptor implements Interceptor {
//指明每一行字段的分隔符
private final String fields_separator;
//通過分割符分割后,指明需要那列的字段,下標
private final String indexs;
//多個下標的分割符
private final String indexs_separator;
//需要加密的字段下標
private final String encrypted_field_index;
public CustomParameterInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) {
//每一行字段的分隔符
String f = fields_separator.trim();
//多個下標的分割符
String i = indexs_separator.trim();
//通過分割符分割后,指明需要那列的字段,下標
this.indexs = indexs;
//需要加密的字段下標
this.encrypted_field_index = encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeToString(f);
}
//指明每一行字段的分隔符
this.fields_separator = f;
if (!i.equals("")) {
i = UnicodeToString(i);
}
//多個下標的分割符
this.indexs_separator = i;
}
/**
* unicode轉換為string
* \t 制表符 ('\u0009') \n 新行(換行)符 (' ') \r 回車符 (' ') \f 換頁符 ('\u000C') \a 報警
* (bell) 符 ('\u0007') \e 轉義符 ('\u001B') \cx 空格(\u0020)對應于 x 的控制符
*/
private String UnicodeToString(String str) {
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(str);
char ch;
while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(2), 16);
str = str.replace(matcher.group(1), ch + "");
}
return str;
}
//初始化方法
@Override
public void initialize() {}
//處理單條事件
@Override
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
//getBody是具體存放數據的內容,獲取每一行數據
String line = new String(event.getBody(), Charsets.UTF_8);
//分隔符切分
String[] fields_spilts = line.split(fields_separator);
//對應所需要的列的下標
String[] indexs_split = indexs.split(indexs_separator);
String newLine = "";
//循環下標數組
for (int i = 0; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//對字段進行加密
if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) {
//將數據最終加密為md5
newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]);
} else {
newLine += fields_spilts[parseInt];
}
//拼接字段分割符
if (i != indexs_split.length - 1) {
newLine += fields_separator;
}
}
//把這個新的一行數據設置進去
event.setBody(newLine.getBytes());
return event;
} catch (Exception e) {
return event;
}
}
//處理很多事件
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> out = new ArrayList<Event>();
//循環這個時間列表
for (Event event : events) {
//調用單個事件
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
//這個Interceptor.Builder可以讀取flume的配置文件,因為Builder中繼承的Configurable可以讀取配置文件
public static class Builder implements Interceptor.Builder {
/**
* The fields_separator.指明每一行字段的分隔符
*/
private String fields_separator;
/**
* The indexs.通過分隔符分割后,指明需要那列的字段 下標
*/
private String indexs;
/**
* The indexs_separator. 多個下標的分隔符
*/
private String indexs_separator;
/**
* The encrypted_field. 需要加密的字段下標
*/
private String encrypted_field_index;
//構建對應的攔截器
@Override
public Interceptor build() {
//用上面一個類來構建,返回外面的那個類,把值傳遞給他的構造器
return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index);
}
//能夠幫我們獲取配置文件定義的參數
@Override
public void configure(Context context) {
//后面的值是默認值,context可以獲取到配置文件的值,在這里面用大寫常量代替,具體的值可參考下面的類Constants
fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(INDEXS, DEFAULT_INDEXS);
indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index = context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
}
}
public static class Constants {
/**
* The Constant FIELD_SEPARATOR.
*/
public static final String FIELD_SEPARATOR = "fields_separator";
/**
* The Constant DEFAULT_FIELD_SEPARATOR.
*/
public static final String DEFAULT_FIELD_SEPARATOR = " ";
/**
* The Constant INDEXS.
*/
public static final String INDEXS = "indexs";
/**
* The Constant DEFAULT_INDEXS.
*/
public static final String DEFAULT_INDEXS = "0";
/**
* The Constant INDEXS_SEPARATOR.
*/
public static final String INDEXS_SEPARATOR = "indexs_separator";
/**
* The Constant DEFAULT_INDEXS_SEPARATOR.
*/
public static final String DEFAULT_INDEXS_SEPARATOR = ",";
/**
* The Constant ENCRYPTED_FIELD_INDEX.
*/
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";
/**
* The Constant DEFAUL_TENCRYPTED_FIELD_INDEX.
*/
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";
/**
* The Constant PROCESSTIME.
*/
public static final String PROCESSTIME = "processTime";
/**
* The Constant PROCESSTIME.
*/
public static final String DEFAULT_PROCESSTIME = "a";
}
/**
* 字符串md5加密
*/
public static class StringUtils {
// 全局數組
private final static String[] strDigits = {"0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f"};
// 返回形式為數字跟字符串
private static String byteToArrayString(byte bByte) {
int iRet = bByte;
// System.out.println("iRet="+iRet);
if (iRet < 0) {
iRet += 256;
}
int iD1 = iRet / 16;
int iD2 = iRet % 16;
return strDigits[iD1] + strDigits[iD2];
}
// 返回形式只為數字
private static String byteToNum(byte bByte) {
int iRet = bByte;
System.out.println("iRet1=" + iRet);
if (iRet < 0) {
iRet += 256;
}
return String.valueOf(iRet);
}
// 轉換字節數組為16進制字串
private static String byteToString(byte[] bByte) {
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < bByte.length; i++) {
sBuffer.append(byteToArrayString(bByte[i]));
}
return sBuffer.toString();
}
public static String GetMD5Code(String strObj) {
String resultString = null;
try {
resultString = new String(strObj);
MessageDigest md = MessageDigest.getInstance("MD5");
// md.digest() 該函數返回值為存放哈希值結果的byte數組
resultString = byteToString(md.digest(strObj.getBytes()));
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
return resultString;
}
}
}
~~~
然后把這個jar包上傳到hive的lib目錄
啟動:
~~~
flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
~~~
- 基礎
- 編譯和安裝
- classpath到底是什么?
- 編譯運行
- 安裝
- sdkman多版本
- jabba多版本
- java字節碼查看
- 數據類型
- 簡介
- 整形
- char和int
- 變量和常量
- 大數值運算
- 基本類型包裝類
- Math類
- 內存劃分
- 位運算符
- 方法相關
- 方法重載
- 可變參數
- 方法引用
- 面向對象
- 定義
- 繼承和覆蓋
- 接口和抽象類
- 接口定義增強
- 內建函數式接口
- 多態
- 泛型
- final和static
- 內部類
- 包
- 修飾符
- 異常
- 枚舉類
- 代碼塊
- 對象克隆
- BeanUtils
- java基礎類
- scanner類
- Random類
- System類
- Runtime類
- Comparable接口
- Comparator接口
- MessageFormat類
- NumberFormat
- 數組相關
- 數組
- Arrays
- string相關
- String
- StringBuffer
- StringBuilder
- 正則
- 日期類
- Locale類
- Date
- DateFormat
- SimpleDateFormat
- Calendar
- 新時間日期API
- 簡介
- LocalDate,LocalTime,LocalDateTime
- Instant時間點
- 帶時區的日期,時間處理
- 時間間隔
- 日期時間校正器
- TimeUnit
- 用yyyy
- 集合
- 集合和迭代器
- ArrayList集合
- List
- Set
- 判斷集合唯一
- Map和Entry
- stack類
- Collections集合工具類
- Stream數據流
- foreach不能修改內部元素
- of方法
- IO
- File類
- 字節流stream
- 字符流Reader
- IO流分類
- 轉換流
- 緩沖流
- 流的操作規律
- properties
- 序列化流與反序列化流
- 打印流
- System類對IO支持
- commons-IO
- IO流總結
- NIO
- 異步與非阻塞
- IO通信
- Unix的IO模型
- epoll對于文件描述符操作模式
- 用戶空間和內核空間
- NIO與普通IO的主要區別
- Paths,Path,Files
- Buffer
- Channel
- Selector
- Pipe
- Charset
- NIO代碼
- 多線程
- 創建線程
- 線程常用方法
- 線程池相關
- 線程池概念
- ThreadPoolExecutor
- Runnable和Callable
- 常用的幾種線程池
- 線程安全
- 線程同步的幾種方法
- synchronized
- 死鎖
- lock接口
- ThreadLoad
- ReentrantLock
- 讀寫鎖
- 鎖的相關概念
- volatile
- 釋放鎖和不釋放鎖的操作
- 等待喚醒機制
- 線程狀態
- 守護線程和普通線程
- Lamda表達式
- 反射相關
- 類加載器
- 反射
- 注解
- junit注解
- 動態代理
- 網絡編程相關
- 簡介
- UDP
- TCP
- 多線程socket上傳圖片
- NIO
- JDBC相關
- JDBC
- 預處理
- 批處理
- 事務
- properties配置文件
- DBUtils
- DBCP連接池
- C3P0連接池
- 獲得MySQL自動生成的主鍵
- Optional類
- Jigsaw模塊化
- 日志相關
- JDK日志
- log4j
- logback
- xml
- tomcat
- maven
- 簡介
- 倉庫
- 目錄結構
- 常用命令
- 生命周期
- idea配置
- jar包沖突
- 依賴范圍
- 私服
- 插件
- git-commit-id-plugin
- maven-assembly-plugin
- maven-resources-plugin
- maven-compiler-plugin
- versions-maven-plugin
- maven-source-plugin
- tomcat-maven-plugin
- 多環境
- 自定義插件
- stream
- swing
- json
- jackson
- optional
- junit
- gradle
- servlet
- 配置
- ServletContext
- 生命周期
- HttpServlet
- request
- response
- 亂碼
- session和cookie
- cookie
- session
- jsp
- 簡介
- 注釋
- 方法,成員變量
- 指令
- 動作標簽
- 隱式對象
- EL
- JSTL
- javaBean
- listener監聽器
- Filter過濾器
- 圖片驗證碼
- HttpUrlConnection
- 國際化
- 文件上傳
- 文件下載
- spring
- 簡介
- Bean
- 獲取和實例化
- 屬性注入
- 自動裝配
- 繼承和依賴
- 作用域
- 使用外部屬性文件
- spel
- 前后置處理器
- 生命周期
- 掃描規則
- 整合多個配置文件
- 注解
- 簡介
- 注解分層
- 類注入
- 分層和作用域
- 初始化方法和銷毀方法
- 屬性
- 泛型注入
- Configuration配置文件
- aop
- aop的實現
- 動態代理實現
- cglib代理實現
- aop名詞
- 簡介
- aop-xml
- aop-注解
- 代理方式選擇
- jdbc
- 簡介
- JDBCTemplate
- 事務
- 整合
- junit整合
- hibernate
- 簡介
- hibernate.properties
- 實體對象三種狀態
- 檢索方式
- 簡介
- 導航對象圖檢索
- OID檢索
- HQL
- Criteria(QBC)
- Query
- 緩存
- 事務管理
- 關系映射
- 注解
- 優化
- MyBatis
- 簡介
- 入門程序
- Mapper動態代理開發
- 原始Dao開發
- Mapper接口開發
- SqlMapConfig.xml
- map映射文件
- 輸出返回map
- 輸入參數
- pojo包裝類
- 多個輸入參數
- resultMap
- 動態sql
- 關聯
- 一對一
- 一對多
- 多對多
- 整合spring
- CURD
- 占位符和sql拼接以及參數處理
- 緩存
- 延遲加載
- 注解開發
- springMVC
- 簡介
- RequestMapping
- 參數綁定
- 常用注解
- 響應
- 文件上傳
- 異常處理
- 攔截器
- springBoot
- 配置
- 熱更新
- java配置
- springboot配置
- yaml語法
- 運行
- Actuator 監控
- 多環境配置切換
- 日志
- 日志簡介
- logback和access
- 日志文件配置屬性
- 開機自啟
- aop
- 整合
- 整合Redis
- 整合Spring Data JPA
- 基本查詢
- 復雜查詢
- 多數據源的支持
- Repository分析
- JpaSpeci?cationExecutor
- 整合Junit
- 整合mybatis
- 常用注解
- 基本操作
- 通用mapper
- 動態sql
- 關聯映射
- 使用xml
- spring容器
- 整合druid
- 整合郵件
- 整合fastjson
- 整合swagger
- 整合JDBC
- 整合spingboot-cache
- 請求
- restful
- 攔截器
- 常用注解
- 參數校驗
- 自定義filter
- websocket
- 響應
- 異常錯誤處理
- 文件下載
- 常用注解
- 頁面
- Thymeleaf組件
- 基本對象
- 內嵌對象
- 上傳文件
- 單元測試
- 模擬請求測試
- 集成測試
- 源碼解析
- 自動配置原理
- 啟動流程分析
- 源碼相關鏈接
- Servlet,Filter,Listener
- springcloud
- 配置
- 父pom
- 創建子工程
- Eureka
- Hystrix
- Ribbon
- Feign
- Zuul
- kotlin
- 基本數據類型
- 函數
- 區間
- 區塊鏈
- 簡介
- linux
- ulimit修改
- 防止syn攻擊
- centos7部署bbr
- debain9開啟bbr
- mysql
- 隔離性
- sql執行加載順序
- 7種join
- explain
- 索引失效和優化
- 表連接優化
- orderby的filesort問題
- 慢查詢
- show profile
- 全局查詢日志
- 死鎖解決
- sql
- 主從
- IDEA
- mac快捷鍵
- 美化界面
- 斷點調試
- 重構
- springboot-devtools熱部署
- IDEA進行JAR打包
- 導入jar包
- ProjectStructure
- toString添加json模板
- 配置maven
- Lombok插件
- rest client
- 文檔顯示
- sftp文件同步
- 書簽
- 代碼查看和搜索
- postfix
- live template
- git
- 文件頭注釋
- JRebel
- 離線模式
- xRebel
- github
- 連接mysql
- 選項沒有Java class的解決方法
- 擴展
- 項目配置和web部署
- 前端開發
- json和Inject language
- idea內存和cpu變高
- 相關設置
- 設計模式
- 單例模式
- 簡介
- 責任鏈
- JUC
- 原子類
- 原子類簡介
- 基本類型原子類
- 數組類型原子類
- 引用類型原子類
- JVM
- JVM規范內存解析
- 對象的創建和結構
- 垃圾回收
- 內存分配策略
- 備注
- 虛擬機工具
- 內存模型
- 同步八種操作
- 內存區域大小參數設置
- happens-before
- web service
- tomcat
- HTTPS
- nginx
- 變量
- 運算符
- 模塊
- Rewrite規則
- Netty
- netty為什么沒用AIO
- 基本組件
- 源碼解讀
- 簡單的socket例子
- 準備netty
- netty服務端啟動
- 案例一:發送字符串
- 案例二:發送對象
- websocket
- ActiveMQ
- JMS
- 安裝
- 生產者-消費者代碼
- 整合springboot
- kafka
- 簡介
- 安裝
- 圖形化界面
- 生產過程分析
- 保存消息分析
- 消費過程分析
- 命令行
- 生產者
- 消費者
- 攔截器interceptor
- partition
- kafka為什么快
- kafka streams
- kafka與flume整合
- RabbitMQ
- AMQP
- 整體架構
- RabbitMQ安裝
- rpm方式安裝
- 命令行和管控頁面
- 消息生產與消費
- 整合springboot
- 依賴和配置
- 簡單測試
- 多方測試
- 對象支持
- Topic Exchange模式
- Fanout Exchange訂閱
- 消息確認
- java client
- RabbitAdmin和RabbitTemplate
- 兩者簡介
- RabbitmqAdmin
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
- 詳解
- Jackson2JsonMessageConverter
- ContentTypeDelegatingMessageConverter
- lucene
- 簡介
- 入門程序
- luke查看索引
- 分析器
- 索引庫維護
- elasticsearch
- 配置
- 插件
- head插件
- ik分詞插件
- 常用術語
- Mapping映射
- 數據類型
- 屬性方法
- Dynamic Mapping
- Index Template 索引模板
- 管理映射
- 建立映射
- 索引操作
- 單模式下CURD
- mget多個文檔
- 批量操作
- 版本控制
- 基本查詢
- Filter過濾
- 組合查詢
- 分析器
- redis
- String
- list
- hash
- set
- sortedset
- 發布訂閱
- 事務
- 連接池
- 管道
- 分布式可重入鎖
- 配置文件翻譯
- 持久化
- RDB
- AOF
- 總結
- Lettuce
- zookeeper
- zookeeper簡介
- 集群部署
- Observer模式
- 核心工作機制
- zk命令行操作
- zk客戶端API
- 感知服務動態上下線
- 分布式共享鎖
- 原理
- zab協議
- 兩階段提交協議
- 三階段提交協議
- Paxos協議
- ZAB協議
- hadoop
- 簡介
- hadoop安裝
- 集群安裝
- 單機安裝
- linux編譯hadoop
- 添加新節點
- 退役舊節點
- 集群間數據拷貝
- 歸檔
- 快照管理
- 回收站
- 檢查hdfs健康狀態
- 安全模式
- hdfs簡介
- hdfs命令行操作
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案例-單詞統計
- 局部聚合Combiner
- combiner流程
- combiner案例
- 自定義排序
- 自定義Bean對象
- 排序的分類
- 案例-按總量排序需求
- 一次性完成統計和排序
- 分區
- 分區簡介
- 案例-結果分區
- 多表合并
- reducer端合并
- map端合并(分布式緩存)
- 分組
- groupingComparator
- 案例-求topN
- 全局計數器
- 合并小文件
- 小文件的弊端
- CombineTextInputFormat機制
- 自定義InputFormat
- 自定義outputFormat
- 多job串聯
- 倒排索引
- 共同好友
- 串聯
- 數據壓縮
- InputFormat接口實現類
- yarn簡介
- 推測執行算法
- 本地提交到yarn
- 框架運算全流程
- 數據傾斜問題
- mapreduce的優化方案
- HA機制
- 優化
- Hive
- 安裝
- shell參數
- 數據類型
- 集合類型
- 數據庫
- DDL操作
- 創建表
- 修改表
- 分區表
- 分桶表
- DML操作
- load
- insert
- select
- export,import
- Truncate
- 注意
- 嚴格模式
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transfrom實現
- having和where不同
- 壓縮
- 存儲
- 存儲和壓縮結合使用
- explain詳解
- 調優
- Fetch抓取
- 本地模式
- 表的優化
- GroupBy
- count(Distinct)去重統計
- 行列過濾
- 動態分區調整
- 數據傾斜
- 并行執行
- JVM重用
- 推測執行
- reduce內存和個數
- sql查詢結果作為變量(shell)
- youtube
- flume
- 簡介
- 安裝
- 常用組件
- 攔截器
- 案例
- 監聽端口到控制臺
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 單flume多channel,sink
- 自定義攔截器
- 高可用配置
- 使用注意
- 監控Ganglia
- sqoop
- 安裝
- 常用命令
- 數據導入
- 準備數據
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 打包腳本
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- oozie
- 安裝
- hbase
- 簡介
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- 安裝
- 命令行
- 基本CURD
- java api
- CURD
- CAS
- 過濾器查詢
- 建表高級屬性
- 與mapreduce結合
- 與sqoop結合
- 協處理器
- 參數配置優化
- 數據備份和恢復
- 節點管理
- 案例-點擊流
- 簡介
- HUE
- 安裝
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 單詞統計(接入kafka)
- 并行度和分組
- 啟動流程分析
- ACK容錯機制
- ACK簡介
- BaseRichBolt簡單使用
- BaseBasicBolt簡單使用
- Ack工作機制
- 本地目錄樹
- zookeeper目錄樹
- 通信機制
- 案例
- 日志告警
- 工具
- YAPI
- chrome無法手動拖動安裝插件
- 時間和空間復雜度
- jenkins
- 定位cpu 100%
- 常用腳本工具
- OOM問題定位
- scala
- 編譯
- 基本語法
- 函數
- 數組常用方法
- 集合
- 并行集合
- 類
- 模式匹配
- 異常
- tuple元祖
- actor并發編程
- 柯里化
- 隱式轉換
- 泛型
- 迭代器
- 流stream
- 視圖view
- 控制抽象
- 注解
- spark
- 企業架構
- 安裝
- api開發
- mycat
- Groovy
- 基礎