<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [TOC] # HDFS操作API > hdfs在生產應用中主要是客戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的訪問客戶端對象,然后通過該客戶端對象操作(增刪改查)HDFS上的文件 ## 1 搭建開發環境 1) 引入依賴 ~~~ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> </dependency> ~~~ > 注:如需手動引入jar包,hdfs的jar包----hadoop的安裝目錄的share下 2) window下開發的說明 > 建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上做客戶端應用開發,需要設置以下環境: 1. 用老師給的windows平臺下編譯的hadoop安裝包解壓一份到windows的任意一個目錄下 2. 在window系統中配置HADOOP_HOME指向你解壓的安裝包目錄 3. 在windows系統的path變量中加入HADOOP_HOME的bin目錄 > 注:一般需要重啟才會生效 ## 2 獲取api中的客戶端對象 > 在java中操作hdfs,首先要獲得一個客戶端實例 ~~~ Configuration conf = new Configuration() FileSystem fs = FileSystem.get(conf) ~~~ > 而我們的操作目標是HDFS,所以獲取到的fs對象應該是DistributedFileSystem的實例; > get方法是從何處判斷具體實例化那種客戶端類呢? > ——從conf中的一個參數 fs.defaultFS的配置值判斷; > 如果我們的代碼中沒有指定fs.defaultFS,并且工程classpath下也沒有給定相應的配置,conf中的默認值就來自于hadoop的jar包中的core-default.xml,默認值為: file:///,則獲取的將不是一個DistributedFileSystem的實例,而是一個本地文件系統的客戶端對象 ## 3 DistributedFileSystem實例對象所具備的方法 ![](https://box.kancloud.cn/38c954184eec287c3cc9404d37c91147_740x728.png) ## 4 HDFS客戶端操作數據代碼示例: ### 4.1 文件的增刪改查 ~~~ public class HdfsClient { FileSystem fs = null; @Before public void init() throws Exception { // 構造一個配置參數對象,設置一個參數:我們要訪問的hdfs的URI // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址 // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml // 然后再加載classpath下的hdfs-site.xml Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hdp-node01:9000"); /** * 參數優先級: 1、客戶端代碼中設置的值 2、classpath下的用戶自定義配置文件 3、然后是服務器的默認配置 */ conf.set("dfs.replication", "3"); // 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例 // fs = FileSystem.get(conf); // 如果這樣去獲取,那conf里面就可以不要配"fs.defaultFS"參數,而且,這個客戶端的身份標識已經是hadoop用戶 fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop"); } /** * 往hdfs上傳文件 * * @throws Exception */ @Test public void testAddFileToHdfs() throws Exception { // 要上傳的文件所在的本地路徑 Path src = new Path("g:/redis-recommend.zip"); // 要上傳到hdfs的目標路徑 Path dst = new Path("/aaa"); fs.copyFromLocalFile(src, dst); fs.close(); } /** * 從hdfs中復制文件到本地文件系統 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testDownloadFileToLocal() throws IllegalArgumentException, IOException { fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/")); fs.close(); } @Test public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException { // 創建目錄 fs.mkdirs(new Path("/a1/b1/c1")); // 刪除文件夾 ,如果是非空文件夾,參數2必須給值true fs.delete(new Path("/aaa"), true); // 重命名文件或文件夾 fs.rename(new Path("/a1"), new Path("/a2")); } /** * 查看目錄信息,只顯示文件 * * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException */ @Test public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException { // 思考:為什么返回迭代器,而不是List之類的容器 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); System.out.println(fileStatus.getPath().getName()); System.out.println(fileStatus.getBlockSize()); System.out.println(fileStatus.getPermission()); System.out.println(fileStatus.getLen()); BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for (BlockLocation bl : blockLocations) { System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset()); String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("--------------為angelababy打印的分割線--------------"); } } /** * 查看文件及文件夾信息 * * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException */ @Test public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException { FileStatus[] listStatus = fs.listStatus(new Path("/")); String flag = "d-- "; for (FileStatus fstatus : listStatus) { if (fstatus.isFile()) flag = "f-- "; System.out.println(flag + fstatus.getPath().getName()); } } } ~~~ ### 4.2 通過流的方式訪問hdfs ~~~ /** * 相對那些封裝好的方法而言的更底層一些的操作方式 * 上層那些mapreduce spark等運算框架,去hdfs中獲取數據的時候,就是調的這種底層的api * @author * */ public class StreamAccess { FileSystem fs = null; @Before public void init() throws Exception { Configuration conf = new Configuration(); fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop"); } /** * 通過流的方式上傳文件到hdfs * @throws Exception */ @Test public void testUpload() throws Exception { FSDataOutputStream outputStream = fs.create(new Path("/angelababy.love"), true); FileInputStream inputStream = new FileInputStream("c:/angelababy.love"); IOUtils.copy(inputStream, outputStream); } @Test public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{ //先獲取一個文件的輸入流----針對hdfs上的 FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); //再構造一個文件的輸出流----針對本地的 FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz")); //再將輸入流中數據傳輸到輸出流 IOUtils.copyBytes(in, out, 4096); } /** * hdfs支持隨機定位進行文件讀取,而且可以方便地讀取指定長度 * 用于上層分布式運算框架并發處理數據 * @throws IllegalArgumentException * @throws IOException */ @Test public void testRandomAccess() throws IllegalArgumentException, IOException{ //先獲取一個文件的輸入流----針對hdfs上的 FSDataInputStream in = fs.open(new Path("/iloveyou.txt")); //可以將流的起始偏移量進行自定義 in.seek(22); //再構造一個文件的輸出流----針對本地的 FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt")); IOUtils.copyBytes(in,out,19L,true); } /** * 顯示hdfs上文件的內容 * @throws IOException * @throws IllegalArgumentException */ @Test public void testCat() throws IllegalArgumentException, IOException{ FSDataInputStream in = fs.open(new Path("/iloveyou.txt")); IOUtils.copyBytes(in, System.out, 1024); } } ~~~ ### 4.3 場景編程 > 在mapreduce 、spark等運算框架中,有一個核心思想就是將運算移往數據,或者說,就是要在并發計算中盡可能讓運算本地化,這就需要獲取數據所在位置的信息并進行相應范圍讀取 > 以下模擬實現:獲取一個文件的所有block位置信息,然后讀取指定block中的內容 ~~~ @Test public void testCat() throws IllegalArgumentException, IOException{ FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10")); //拿到文件信息 FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10")); //獲取這個文件的所有block的信息 BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen()); //第一個block的長度 long length = fileBlockLocations[0].getLength(); //第一個block的起始偏移量 long offset = fileBlockLocations[0].getOffset(); System.out.println(length); System.out.println(offset); //獲取第一個block寫入輸出流 // IOUtils.copyBytes(in, System.out, (int)length); byte[] b = new byte[4096]; FileOutputStream os = new FileOutputStream(new File("d:/block0")); while(in.read(offset, b, 0, 4096)!=-1){ os.write(b); offset += 4096; if(offset>=length) return; }; os.flush(); os.close(); in.close(); } ~~~ # 案例1:開發shell采集腳本 ## 1需求說明 > 點擊流日志每天都10T,在業務應用服務器上,需要準實時上傳至(Hadoop HDFS)上 ## 2需求分析 > 一般上傳文件都是在凌晨24點操作,由于很多種類的業務數據都要在晚上進行傳輸,為了減輕服務器的壓力,避開高峰期。 > 如果需要偽實時的上傳,則采用定時上傳的方式 ## 3技術分析 ~~~ HDFS SHELL: hadoop fs –put xxxx.log /data 還可以使用 Java Api 滿足上傳一個文件,不能滿足定時、周期性傳入。 定時調度器: Linux crontab crontab -e */5 * * * * $home/bin/command.sh //五分鐘執行一次 ~~~ > 系統會自動執行腳本,每5分鐘一次,執行時判斷文件是否符合上傳規則,符合則上傳 ## 4實現流程 ### 4.1日志產生程序 > 日志產生程序將日志生成后,產生一個一個的文件,使用滾動模式創建文件名。 ![](https://box.kancloud.cn/5ae3d8cc73afb2216e2ef11711bb1a62_266x148.png) > 日志生成的邏輯由業務系統決定,比如在log4j配置文件中配置生成規則,如:當xxxx.log 等于10G時,滾動生成新日志 ~~~ log4j.logger.msg=info,msg log4j.appender.msg=cn.maoxiangyi.MyRollingFileAppender log4j.appender.msg.layout=org.apache.log4j.PatternLayout log4j.appender.msg.layout.ConversionPattern=%m%n log4j.appender.msg.datePattern='.'yyyy-MM-dd log4j.appender.msg.Threshold=info log4j.appender.msg.append=true log4j.appender.msg.encoding=UTF-8 log4j.appender.msg.MaxBackupIndex=100 log4j.appender.msg.MaxFileSize=10GB log4j.appender.msg.File=/home/hadoop/logs/log/access.log ~~~ > 細節: 1. 如果日志文件后綴是1\2\3等數字,該文件滿足需求可以上傳的話。把該文件移動到準備上傳的工作區間。 2. 工作區間有文件之后,可以使用hadoop put命令將文件上傳。 階段問題: 1) 待上傳文件的工作區間的文件,在上傳完成之后,是否需要刪除掉。 ### 4.2偽代碼 ~~~ 使用ls命令讀取指定路徑下的所有文件信息, ls | while read line //判斷line這個文件名稱是否符合規則 if line=access.log.* ( 將文件移動到待上傳的工作區間 ) //批量上傳工作區間的文件 hadoop fs –put xxx ~~~ > 腳本寫完之后,配置linux定時任務,每5分鐘運行一次。 ## 5代碼實現 > 代碼第一版本,實現基本的上傳功能和定時調度功能 ![](https://box.kancloud.cn/6cf9795e1c013d6d7bbac1a921cb04f6_554x233.png) > 代碼第二版本:增強版V2(基本能用,還是不夠健全) ![](https://box.kancloud.cn/33778409cbe21d8d785efcadb27cde3d_554x93.png) ![](https://box.kancloud.cn/0eae25b386c1375a9900aac4dcb98e88_554x275.png) ## 6效果展示及操作步驟 1) 日志收集文件收集數據,并將數據保存起來,效果如下: ![](https://box.kancloud.cn/a985574b2fb75dfdf02c6af3ba4e1451_554x48.png) 2) 上傳程序通過crontab定時調度 ![](https://box.kancloud.cn/16eae4aa94c9334796e357d20a4469b1_554x40.png) 3) 程序運行時產生的臨時文件 ![](https://box.kancloud.cn/669041e6a91d77d76a5ae161d1fbc23b_554x99.png) 4) Hadoo hdfs上的效果 ![](https://box.kancloud.cn/0d16a49d1a47eaa271327f3b6681ff43_554x47.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看