[TOC]
# HDFS操作API
> hdfs在生產應用中主要是客戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的訪問客戶端對象,然后通過該客戶端對象操作(增刪改查)HDFS上的文件
## 1 搭建開發環境
1) 引入依賴
~~~
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.4</version>
</dependency>
~~~
> 注:如需手動引入jar包,hdfs的jar包----hadoop的安裝目錄的share下
2) window下開發的說明
> 建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上做客戶端應用開發,需要設置以下環境:
1. 用老師給的windows平臺下編譯的hadoop安裝包解壓一份到windows的任意一個目錄下
2. 在window系統中配置HADOOP_HOME指向你解壓的安裝包目錄
3. 在windows系統的path變量中加入HADOOP_HOME的bin目錄
> 注:一般需要重啟才會生效
## 2 獲取api中的客戶端對象
> 在java中操作hdfs,首先要獲得一個客戶端實例
~~~
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(conf)
~~~
> 而我們的操作目標是HDFS,所以獲取到的fs對象應該是DistributedFileSystem的實例;
> get方法是從何處判斷具體實例化那種客戶端類呢?
> ——從conf中的一個參數 fs.defaultFS的配置值判斷;
> 如果我們的代碼中沒有指定fs.defaultFS,并且工程classpath下也沒有給定相應的配置,conf中的默認值就來自于hadoop的jar包中的core-default.xml,默認值為: file:///,則獲取的將不是一個DistributedFileSystem的實例,而是一個本地文件系統的客戶端對象
## 3 DistributedFileSystem實例對象所具備的方法

## 4 HDFS客戶端操作數據代碼示例:
### 4.1 文件的增刪改查
~~~
public class HdfsClient {
FileSystem fs = null;
@Before
public void init() throws Exception {
// 構造一個配置參數對象,設置一個參數:我們要訪問的hdfs的URI
// 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址
// new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml
// 然后再加載classpath下的hdfs-site.xml
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
/**
* 參數優先級: 1、客戶端代碼中設置的值 2、classpath下的用戶自定義配置文件 3、然后是服務器的默認配置
*/
conf.set("dfs.replication", "3");
// 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例
// fs = FileSystem.get(conf);
// 如果這樣去獲取,那conf里面就可以不要配"fs.defaultFS"參數,而且,這個客戶端的身份標識已經是hadoop用戶
fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
/**
* 往hdfs上傳文件
*
* @throws Exception
*/
@Test
public void testAddFileToHdfs() throws Exception {
// 要上傳的文件所在的本地路徑
Path src = new Path("g:/redis-recommend.zip");
// 要上傳到hdfs的目標路徑
Path dst = new Path("/aaa");
fs.copyFromLocalFile(src, dst);
fs.close();
}
/**
* 從hdfs中復制文件到本地文件系統
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
fs.close();
}
@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 創建目錄
fs.mkdirs(new Path("/a1/b1/c1"));
// 刪除文件夾 ,如果是非空文件夾,參數2必須給值true
fs.delete(new Path("/aaa"), true);
// 重命名文件或文件夾
fs.rename(new Path("/a1"), new Path("/a2"));
}
/**
* 查看目錄信息,只顯示文件
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:為什么返回迭代器,而不是List之類的容器
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------為angelababy打印的分割線--------------");
}
}
/**
* 查看文件及文件夾信息
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- ";
for (FileStatus fstatus : listStatus) {
if (fstatus.isFile()) flag = "f-- ";
System.out.println(flag + fstatus.getPath().getName());
}
}
}
~~~
### 4.2 通過流的方式訪問hdfs
~~~
/**
* 相對那些封裝好的方法而言的更底層一些的操作方式
* 上層那些mapreduce spark等運算框架,去hdfs中獲取數據的時候,就是調的這種底層的api
* @author
*
*/
public class StreamAccess {
FileSystem fs = null;
@Before
public void init() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
/**
* 通過流的方式上傳文件到hdfs
* @throws Exception
*/
@Test
public void testUpload() throws Exception {
FSDataOutputStream outputStream = fs.create(new Path("/angelababy.love"), true);
FileInputStream inputStream = new FileInputStream("c:/angelababy.love");
IOUtils.copy(inputStream, outputStream);
}
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先獲取一個文件的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再構造一個文件的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再將輸入流中數據傳輸到輸出流
IOUtils.copyBytes(in, out, 4096);
}
/**
* hdfs支持隨機定位進行文件讀取,而且可以方便地讀取指定長度
* 用于上層分布式運算框架并發處理數據
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
//先獲取一個文件的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以將流的起始偏移量進行自定義
in.seek(22);
//再構造一個文件的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/**
* 顯示hdfs上文件的內容
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024);
}
}
~~~
### 4.3 場景編程
> 在mapreduce 、spark等運算框架中,有一個核心思想就是將運算移往數據,或者說,就是要在并發計算中盡可能讓運算本地化,這就需要獲取數據所在位置的信息并進行相應范圍讀取
> 以下模擬實現:獲取一個文件的所有block位置信息,然后讀取指定block中的內容
~~~
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
//拿到文件信息
FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
//獲取這個文件的所有block的信息
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
//第一個block的長度
long length = fileBlockLocations[0].getLength();
//第一個block的起始偏移量
long offset = fileBlockLocations[0].getOffset();
System.out.println(length);
System.out.println(offset);
//獲取第一個block寫入輸出流
// IOUtils.copyBytes(in, System.out, (int)length);
byte[] b = new byte[4096];
FileOutputStream os = new FileOutputStream(new File("d:/block0"));
while(in.read(offset, b, 0, 4096)!=-1){
os.write(b);
offset += 4096;
if(offset>=length) return;
};
os.flush();
os.close();
in.close();
}
~~~
# 案例1:開發shell采集腳本
## 1需求說明
> 點擊流日志每天都10T,在業務應用服務器上,需要準實時上傳至(Hadoop HDFS)上
## 2需求分析
> 一般上傳文件都是在凌晨24點操作,由于很多種類的業務數據都要在晚上進行傳輸,為了減輕服務器的壓力,避開高峰期。
> 如果需要偽實時的上傳,則采用定時上傳的方式
## 3技術分析
~~~
HDFS SHELL: hadoop fs –put xxxx.log /data 還可以使用 Java Api
滿足上傳一個文件,不能滿足定時、周期性傳入。
定時調度器:
Linux crontab
crontab -e
*/5 * * * * $home/bin/command.sh //五分鐘執行一次
~~~
> 系統會自動執行腳本,每5分鐘一次,執行時判斷文件是否符合上傳規則,符合則上傳
## 4實現流程
### 4.1日志產生程序
> 日志產生程序將日志生成后,產生一個一個的文件,使用滾動模式創建文件名。

> 日志生成的邏輯由業務系統決定,比如在log4j配置文件中配置生成規則,如:當xxxx.log 等于10G時,滾動生成新日志
~~~
log4j.logger.msg=info,msg
log4j.appender.msg=cn.maoxiangyi.MyRollingFileAppender
log4j.appender.msg.layout=org.apache.log4j.PatternLayout
log4j.appender.msg.layout.ConversionPattern=%m%n
log4j.appender.msg.datePattern='.'yyyy-MM-dd
log4j.appender.msg.Threshold=info
log4j.appender.msg.append=true
log4j.appender.msg.encoding=UTF-8
log4j.appender.msg.MaxBackupIndex=100
log4j.appender.msg.MaxFileSize=10GB
log4j.appender.msg.File=/home/hadoop/logs/log/access.log
~~~
> 細節:
1. 如果日志文件后綴是1\2\3等數字,該文件滿足需求可以上傳的話。把該文件移動到準備上傳的工作區間。
2. 工作區間有文件之后,可以使用hadoop put命令將文件上傳。
階段問題:
1) 待上傳文件的工作區間的文件,在上傳完成之后,是否需要刪除掉。
### 4.2偽代碼
~~~
使用ls命令讀取指定路徑下的所有文件信息,
ls | while read line
//判斷line這個文件名稱是否符合規則
if line=access.log.* (
將文件移動到待上傳的工作區間
)
//批量上傳工作區間的文件
hadoop fs –put xxx
~~~
> 腳本寫完之后,配置linux定時任務,每5分鐘運行一次。
## 5代碼實現
> 代碼第一版本,實現基本的上傳功能和定時調度功能

> 代碼第二版本:增強版V2(基本能用,還是不夠健全)


## 6效果展示及操作步驟
1) 日志收集文件收集數據,并將數據保存起來,效果如下:

2) 上傳程序通過crontab定時調度

3) 程序運行時產生的臨時文件

4) Hadoo hdfs上的效果

- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰