<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 分析 采集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就需要把文件采集到HDFS中去 根據需求,首先定義以下3大要素 * 采集源,即source——監控文件目錄 : spooldir * 下沉目標,即sink——HDFS文件系統 : hdfs sink * source和sink之間的傳遞通道——channel,可用file channel 也可以用內存memory channel # 配置文件 ~~~ #定義三大組件的名稱 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置source組件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /root/data/ # 是否使用當前文件頭 agent1.sources.source1.fileHeader = false #配置攔截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = timestamp # 配置sink組件 # 類型是hdfs agent1.sinks.sink1.type = hdfs # 在hdfs上產生的目錄 agent1.sinks.sink1.hdfs.path =/weblog/flume-collection/%y-%m-%d/%H-%M # 文件的前綴,在hdfs上的前置 agent1.sinks.sink1.hdfs.filePrefix = access_log # 最大文件打開數量 agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 # 批次大小,就是文件達到多少條才提交到hdfs agent1.sinks.sink1.hdfs.batchSize= 100 # 當前文件存儲數據類型,還可以用壓縮格式 agent1.sinks.sink1.hdfs.fileType = DataStream # 文件的格式類型 agent1.sinks.sink1.hdfs.writeFormat =Text # 達到下面的三個任何一個就按照那個標準生成一個新文件 #滾動生成的文件按大小生成 agent1.sinks.sink1.hdfs.rollSize = 102400 #滾動生成的文件按行數生成 agent1.sinks.sink1.hdfs.rollCount = 100 #滾動生成的文件按時間生成,秒 agent1.sinks.sink1.hdfs.rollInterval = 10 #開啟滾動生成目錄 agent1.sinks.sink1.hdfs.round = true #以10為一梯度滾動生成 agent1.sinks.sink1.hdfs.roundValue = 10 #單位為分鐘 agent1.sinks.sink1.hdfs.roundUnit = minute # Use a channel which buffers events in memory # 管道的類型 agent1.channels.channel1.type = memory # 管道的容量 agent1.channels.channel1.capacity = 500000 # 事務的類型 agent1.channels.channel1.transactionCapacity = 600 # 多久之后將數據從source移動到channel,channel移動到sink agent1.channels.channel1.keep-alive = 120 # Bind the source and sink to the channel # 對應的source,channel進行組裝 agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 ~~~ # 測試 啟動 ~~~ flume-ng agent -c conf -f fhd.conf -n agent1 -Dflume.root.logger=INFO,console ~~~ fhd.conf換成你自己寫的,不同的目錄加上目錄 -n代表上面定義的agent的名字 啟動后可以看到打印的日志,只要`/root/data/`下面有文件就會移動到hdfs **flume的source采用spoodir時! 目錄下面不允許存放同名的文件,否則報錯!** # 其他組件:Interceptor(攔截器) 用于Source的一組Interceptor,按照預設的順序在必要地方裝飾和過濾events。 內建的Interceptors允許增加event的headers比如:時間戳、主機名、靜態標記等等 定制的interceptors可以通過內省event payload(讀取原始日志),實現自己的業務邏輯(很強大)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看