[TOC]
# 1. Parquet介紹
Apache Parquet 是 Hadoop 生態圈中一種<ins>新型列式存儲格式</ins>,它可以兼容Hadoop 生態圈中大多數計算框架(Mapreduce、Spark 等),被多種查詢引擎支持
(Hive、Impala、Drill 等),<ins>并且它是語言和平臺無關的</ins>。Parquet 最初是由Twitter 和 Cloudera 合作開發完成并開源,2015 年 5 月從 Apache 的孵化器里畢業成為 Apache 頂級項目。<br/>
Parquet 最初的靈感來自 Google 于 2010 年發表的 Dremel 論文,文中介紹了一種支持嵌套結構的存儲格式,并且使用了列式存儲的方式提升查詢性能,在Dremel 論文中還介紹了 Google 如何使用這種存儲格式實現并行查詢的,如果對此感興趣可以參考論文和開源實現 Drill。<br/>
**數據模型**
Parquet 支持嵌套的數據模型,類似于 Protocol Buffers,每一個數據模型的 schema 包含多個字段,<ins>每一個字段有三個屬性:重復次數、數據類型和字段
名,重復次數可以是以下三種:required(只出現 1 次),repeated(出現 0 次或多次),optional(出現 0 次或 1 次)</ins>。<br/>
每一個字段的數據類型可以分成兩種:group(復雜類型)和 primitive(基本類型)。<br/>
**存儲方式:列式存儲**
列式存儲優點:
? 按需讀取列
? 壓縮編碼可以降低磁盤存儲空間
<br/>
**文件結構**
Parquet 文件是以二進制方式存儲的,是不可以直接讀取和修改的,Parquet文件是自解析的,文件中包括該文件的數據和元數據。在 HDFS 文件系統和Parquet 文件中存在如下幾個概念:
<mark>HDFS 塊(Block)</mark>:它是 HDFS 上的最小的副本單位,HDFS 會把一個 Block 存儲在本地的一個文件并且維護分散在不同的機器上的多個副本。
<mark>HDFS 文件(File)</mark>:一個 HDFS 的文件,包括數據和元數據,數據分散存儲在多個 Block 中。
<mark>行組(Row Group)</mark>:按照行將數據物理上劃分為多個單元,每一個行組包含一定的行數,在一個 HDFS 文件中至少存儲一個行組,Parquet 讀寫的時候會將
整個行組緩存在內存中,所以如果每一個行組的大小是由內存的大小決定的。
<mark>列塊(Column Chunk)</mark>:在一個行組中每一列保存在一個列塊中,行組中的所有列連續的存儲在這個行組文件中。不同的列塊可使用不同的算法進行壓縮。
<mark>頁(Page)</mark>:每一個列塊劃分為多個頁,一個頁是最小的編碼的單位,在同一個列塊的不同頁可能使用不同的編碼方式。<br/>
通常情況下,在存儲 Parquet 數據的時候會按照 HDFS 的 Block 大小設置行組的大小,由于一般情況下每一個 Mapper 任務處理數據的最小單位是一個Block,這樣可以把每一個行組由一個 Mapper 任務處理,增大任務執行并行度。Parquet 文件的格式如下圖所示。

上圖展示了一個 Parquet 文件的結構,一個文件中可以存儲多個行組,文件的首位都是該文件的 Magic Code,用于校驗它是否是一個 Parquet 文件,Footer length 存儲了文件元數據的大小,通過該值和文件長度可以計算出元數據的偏移量,文件的元數據中包括每一個行組的元數據信息和當前文件的 Schema 信息。除了文件中每一個行組的元數據,每一頁的開始都會存儲該頁的元數據,在Parquet 中,有三種類型的頁:<ins>數據頁、字典頁和索引頁</ins>。數據頁用于存儲當前行組中該列的值,字典頁存儲該列值的編碼字典,每一個列塊中最多包含一個字典頁,索引頁用來存儲當前行組下該列的索引,目前 Parquet 中還不支持索引頁,但是在后面的版本中增加。<br/>
**數據類型**
```
BOOLEAN: 1 bit boolean
INT32: 32 bit signed ints
INT64: 64 bit signed ints
INT96: 96 bit signed ints
FLOAT: IEEE 32-bit floating point values
DOUBLE: IEEE 64-bit floating point values
BYTE_ARRAY: arbitrarily long byte arrays.
也可以全部指定類型為binary二進制
```
<br/>
# 2. Java讀寫Parquet
在 *`pom.xml`* 中引入下面的依賴
```xml
<!--Parquet-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.2</version>
</dependency>
```
Java代碼:
```java
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import java.io.IOException;
/**
* @Author Leo
* @Date 2019/5/7 11:49
**/
public class ParquetOps {
public static void main(String[] args) {
try {
write();
read();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void write() throws IOException {
Path file = new Path("/tmp/user-parquet/1.parquet");
String schemaStr = "message User{\n" +
" required binary name (UTF8);\n" +
" required int32 age;\n" +
" repeated group family{\n" +
" repeated binary father (UTF8);\n" +
" repeated binary mother (UTF8);\n" +
" optional binary sister (UTF8);\n" +
" }\n" +
"}\n";
MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withType(schema).build();
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
Group group1 = groupFactory.newGroup();
group1.add("name", "jason");
group1.add("age", 9);
Group cGroup1 = group1.addGroup("family");
cGroup1.add("father", "XXX");
cGroup1.add("mother", "XXX");
Group group2 = groupFactory.newGroup();
group2.add("name", "tom");
group2.add("age", 18);
//添加子組
group2.addGroup("family")
.append("father", "ZZZ")
.append("mother", "ZZZ");//append與add返回值不同
writer.write(group1);
writer.write(group2);
writer.close();
}
private static void read() throws IOException {
Path file = new Path(
"/tmp/user-parquet/1.parquet");
ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), file);
ParquetReader<Group> reader = builder.build();
SimpleGroup group = (SimpleGroup) reader.read();
System.out.println("schema:" + group.getType().toString());
while (group != null) {
System.out.println("username:" + group.getString(0, 0));
System.out.println("age:" + group.getInteger(1, 0));
System.out.println("family.father:" + group.getGroup(2, 0).getString(0, 0));
System.out.println(group.toString());
group = (SimpleGroup) reader.read();
}
}
}
```
<br/>
# 3. 在Hive中使用Parquet
```sql
create external table parquet_table(
name string,
age int)
stored as parquet;
0: jdbc:hive2://hadoop101:10000> select * from parquet_table;
+---------------------+--------------------+--+
| parquet_table.name | parquet_table.age |
+---------------------+--------------------+--+
+---------------------+--------------------+--+
0: jdbc:hive2://hadoop101:10000> show create table parquet_table;
+----------------------------------------------------+--+
| createtab_stmt |
+----------------------------------------------------+--+
| CREATE EXTERNAL TABLE `parquet_table`( |
| `name` string, |
| `age` int) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| STORED AS INPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://hadoop101:9000/home/hadoop/hive/warehouse/hivebook.db/parquet_table' |
| TBLPROPERTIES ( |
| 'transient_lastDdlTime'='1609154516') |
+----------------------------------------------------+--+
```
- Hadoop
- hadoop是什么?
- Hadoop組成
- hadoop官網
- hadoop安裝
- hadoop配置
- 本地運行模式配置
- 偽分布運行模式配置
- 完全分布運行模式配置
- HDFS分布式文件系統
- HDFS架構
- HDFS設計思想
- HDFS組成架構
- HDFS文件塊大小
- HDFS優缺點
- HDFS Shell操作
- HDFS JavaAPI
- 基本使用
- HDFS的I/O 流操作
- 在SpringBoot項目中的API
- HDFS讀寫流程
- HDFS寫流程
- HDFS讀流程
- NN和SNN關系
- NN和SNN工作機制
- Fsimage和 Edits解析
- checkpoint時間設置
- NameNode故障處理
- 集群安全模式
- DataNode工作機制
- 支持的文件格式
- MapReduce分布式計算模型
- MapReduce是什么?
- MapReduce設計思想
- MapReduce優缺點
- MapReduce基本使用
- MapReduce編程規范
- WordCount案例
- MapReduce任務進程
- Hadoop序列化對象
- 為什么要序列化
- 常用數據序列化類型
- 自定義序列化對象
- MapReduce框架原理
- MapReduce工作流程
- MapReduce核心類
- MapTask工作機制
- Shuffle機制
- Partition分區
- Combiner合并
- ReduceTask工作機制
- OutputFormat
- 使用MapReduce實現SQL Join操作
- Reduce join
- Reduce join 代碼實現
- Map join
- Map join 案例實操
- MapReduce 開發總結
- Hadoop 優化
- MapReduce 優化需要考慮的點
- MapReduce 優化方法
- 分布式資源調度框架 Yarn
- Yarn 基本架構
- ResourceManager(RM)
- NodeManager(NM)
- ApplicationMaster
- Container
- 作業提交全過程
- JobHistoryServer 使用
- 資源調度器
- 先進先出調度器(FIFO)
- 容量調度器(Capacity Scheduler)
- 公平調度器(Fair Scheduler)
- Yarn 常用命令
- Zookeeper
- zookeeper是什么?
- zookeeper完全分布式搭建
- Zookeeper特點
- Zookeeper數據結構
- Zookeeper 內部原理
- 選舉機制
- stat 信息中字段解釋
- 選擇機制中的概念
- 選舉消息內容
- 監聽器原理
- Hadoop 高可用集群搭建
- Zookeeper 應用
- Zookeeper Shell操作
- Zookeeper Java應用
- Hive
- Hive是什么?
- Hive的優缺點
- Hive架構
- Hive元數據存儲模式
- 內嵌模式
- 本地模式
- 遠程模式
- Hive環境搭建
- 偽分布式環境搭建
- Hive命令工具
- 命令行模式
- 交互模式
- Hive數據類型
- Hive數據結構
- 參數配置方式
- Hive數據庫
- 數據庫存儲位置
- 數據庫操作
- 表的創建
- 建表基本語法
- 內部表
- 外部表
- 臨時表
- 建表高階語句
- 表的刪除與修改
- 分區表
- 靜態分區
- 動態分區
- 分桶表
- 創建分桶表
- 分桶抽樣
- Hive視圖
- 視圖的創建
- 側視圖Lateral View
- Hive數據導入導出
- 導入數據
- 導出數據
- 查詢表數據量
- Hive事務
- 事務是什么?
- Hive事務的局限性和特點
- Hive事務的開啟和設置
- Hive PLSQL
- Hive高階查詢
- 查詢基本語法
- 基本查詢
- distinct去重
- where語句
- 列正則表達式
- 虛擬列
- CTE查詢
- 嵌套查詢
- join語句
- 內連接
- 左連接
- 右連接
- 全連接
- 多表連接
- 笛卡爾積
- left semi join
- group by分組
- having刷選
- union與union all
- 排序
- order by
- sort by
- distribute by
- cluster by
- 聚合運算
- 基本聚合
- 高級聚合
- 窗口函數
- 序列窗口函數
- 聚合窗口函數
- 分析窗口函數
- 窗口函數練習
- 窗口子句
- Hive函數
- Hive函數分類
- 字符串函數
- 類型轉換函數
- 數學函數
- 日期函數
- 集合函數
- 條件函數
- 聚合函數
- 表生成函數
- 自定義Hive函數
- 自定義函數分類
- 自定義Hive函數流程
- 添加JAR包的方式
- 自定義臨時函數
- 自定義永久函數
- Hive優化
- Hive性能調優工具
- EXPLAIN
- ANALYZE
- Fetch抓取
- 本地模式
- 表的優化
- 小表 join 大表
- 大表 join 大表
- 開啟Map Join
- group by
- count(distinct)
- 笛卡爾積
- 行列過濾
- 動態分區調整
- 分區分桶表
- 數據傾斜
- 數據傾斜原因
- 調整Map數
- 調整Reduce數
- 產生數據傾斜的場景
- 并行執行
- 嚴格模式
- JVM重用
- 推測執行
- 啟用CBO
- 啟動矢量化
- 使用Tez引擎
- 壓縮算法和文件格式
- 文件格式
- 壓縮算法
- Zeppelin
- Zeppelin是什么?
- Zeppelin安裝
- 配置Hive解釋器
- Hbase
- Hbase是什么?
- Hbase環境搭建
- Hbase分布式環境搭建
- Hbase偽分布式環境搭建
- Hbase架構
- Hbase架構組件
- Hbase數據存儲結構
- Hbase原理
- Hbase Shell
- 基本操作
- 表操作
- namespace
- Hbase Java Api
- Phoenix集成Hbase
- Phoenix是什么?
- 安裝Phoenix
- Phoenix數據類型
- Phoenix Shell
- HBase與Hive集成
- HBase與Hive的對比
- HBase與Hive集成使用
- Hbase與Hive集成原理
- HBase優化
- RowKey設計
- 內存優化
- 基礎優化
- Hbase管理
- 權限管理
- Region管理
- Region的自動拆分
- Region的預拆分
- 到底采用哪種拆分策略?
- Region的合并
- HFile的合并
- 為什么要有HFile的合并
- HFile合并方式
- Compaction執行時間
- Compaction相關控制參數
- 演示示例
- Sqoop
- Sqoop是什么?
- Sqoop環境搭建
- RDBMS導入到HDFS
- RDBMS導入到Hive
- RDBMS導入到Hbase
- HDFS導出到RDBMS
- 使用sqoop腳本
- Sqoop常用命令
- Hadoop數據模型
- TextFile
- SequenceFile
- Avro
- Parquet
- RC&ORC
- 文件存儲格式比較
- Spark
- Spark是什么?
- Spark優勢
- Spark與MapReduce比較
- Spark技術棧
- Spark安裝
- Spark Shell
- Spark架構
- Spark編程入口
- 編程入口API
- SparkContext
- SparkSession
- Spark的maven依賴
- Spark RDD編程
- Spark核心數據結構-RDD
- RDD 概念
- RDD 特性
- RDD編程
- RDD編程流程
- pom依賴
- 創建算子
- 轉換算子
- 動作算子
- 持久化算子
- RDD 與閉包
- csv/json數據源
- Spark分布式計算原理
- RDD依賴
- RDD轉換
- RDD依賴
- DAG工作原理
- Spark Shuffle原理
- Shuffle的作用
- ShuffleManager組件
- Shuffle實踐
- RDD持久化
- 緩存機制
- 檢查點
- 檢查點與緩存的區別
- RDD共享變量
- 廣播變量
- 累計器
- RDD分區設計
- 數據傾斜
- 數據傾斜的根本原因
- 定位導致的數據傾斜
- 常見數據傾斜解決方案
- Spark SQL
- SQL on Hadoop
- Spark SQL是什么
- Spark SQL特點
- Spark SQL架構
- Spark SQL運行原理
- Spark SQL編程
- Spark SQL編程入口
- 創建Dataset
- Dataset是什么
- SparkSession創建Dataset
- 樣例類創建Dataset
- 創建DataFrame
- DataFrame是什么
- 結構化數據文件創建DataFrame
- RDD創建DataFrame
- Hive表創建DataFrame
- JDBC創建DataFrame
- SparkSession創建
- RDD、DataFrame、Dataset
- 三者對比
- 三者相互轉換
- RDD轉換為DataFrame
- DataFrame轉換為RDD
- DataFrame API
- DataFrame API分類
- Action 操作
- 基礎 Dataset 函數
- 強類型轉換
- 弱類型轉換
- Spark SQL外部數據源
- Parquet文件
- Hive表
- RDBMS表
- JSON/CSV
- Spark SQL函數
- Spark SQL內置函數
- 自定SparkSQL函數
- Spark SQL CLI
- Spark SQL性能優化
- Spark GraphX圖形數據分析
- 為什么需要圖計算
- 圖的概念
- 圖的術語
- 圖的經典表示法
- Spark Graphix簡介
- Graphx核心抽象
- Graphx Scala API
- 核心組件
- 屬性圖應用示例1
- 屬性圖應用示例2
- 查看圖信息
- 圖的算子
- 連通分量
- PageRank算法
- Pregel分布式計算框架
- Flume日志收集
- Flume是什么?
- Flume官方文檔
- Flume架構
- Flume安裝
- Flume使用過程
- Flume組件
- Flume工作流程
- Flume事務
- Source、Channel、Sink文檔
- Source文檔
- Channel文檔
- Sink文檔
- Flume攔截器
- Flume攔截器概念
- 配置攔截器
- 自定義攔截器
- Flume可靠性保證
- 故障轉移
- 負載均衡
- 多層代理
- 多路復用
- Kafka
- 消息中間件MQ
- Kafka是什么?
- Kafka安裝
- Kafka本地單機部署
- Kafka基本命令使用
- Topic的生產與消費
- 基本命令
- 查看kafka目錄
- Kafka架構
- Kafka Topic
- Kafka Producer
- Kafka Consumer
- Kafka Partition
- Kafka Message
- Kafka Broker
- 存儲策略
- ZooKeeper在Kafka中的作用
- 副本同步
- 容災
- 高吞吐
- Leader均衡機制
- Kafka Scala API
- Producer API
- Consumer API
- Kafka優化
- 消費者參數優化
- 生產者參數優化
- Spark Streaming
- 什么是流?
- 批處理和流處理
- Spark Streaming簡介
- 流數據處理架構
- 內部工作流程
- StreamingContext組件
- SparkStreaming的編程入口
- WordCount案例
- DStream
- DStream是什么?
- Input DStream與Receivers接收器
- DStream API
- 轉換操作
- 輸出操作
- 數據源
- 數據源分類
- Socket數據源
- 統計HDFS文件的詞頻
- 處理狀態數據
- SparkStreaming整合SparkSQL
- SparkStreaming整合Flume
- SparkStreaming整合Kafka
- 自定義數據源
- Spark Streaming優化策略
- 優化運行時間
- 優化內存使用
- 數據倉庫
- 數據倉庫是什么?
- 數據倉庫的意義
- 數據倉庫和數據庫的區別
- OLTP和OLAP的區別
- OLTP的特點
- OLAP的特點
- OLTP與OLAP對比
- 數據倉庫架構
- Inmon架構
- Kimball架構
- 混合型架構
- 數據倉庫的解決方案
- 數據ETL
- 數據倉庫建模流程
- 維度模型
- 星型模式
- 雪花模型
- 星座模型
- 數據ETL處理
- 數倉分層術語
- 數據抽取方式
- CDC抽取方案
- 數據轉換
- 常見的ETL工具