<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 分析 采集需求:比如業務系統使用log4j生成的日志,日志內容不斷增加,需要把追加到日志文件中的數據實時采集到hdfs,使用agent串聯 ![](https://box.kancloud.cn/88f4fb197886cbaf9c9220395918d357_932x270.png) 根據需求,首先定義以下3大要素 第一臺flume agent * 采集源,即source——監控文件內容更新 : `exec 'tail -F file'` * 下沉目標,即sink——數據的發送者,實現序列化 : avro sink * Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內存channel 第二臺flume agent * 采集源,即source——接受數據。并實現反序列化 : avro source * 下沉目標,即sink——HDFS文件系統 : HDFS sink * Source和sink之間的傳遞通道——channel,可用file channel 也可以用 內存channel # 配置文件 第一臺配置 Flume-agent1 ~~~ #tail-avro-avro-logger.conf # Name the components on this agent # 定義名稱 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec # 監聽這個文件 a1.sources.r1.command = tail -F /root/logs/test.log # Describe the sink ##sink端的avro是一個數據發送者 a1.sinks.k1.type = avro # 推給這個機器,自己定義 a1.sinks.k1.hostname = master # 端口 a1.sinks.k1.port = 41414 # 批量大小 a1.sinks.k1.batch-size = 10 # Use a channel which buffers events in memory # 內存channels a1.channels.c1.type = memory # 管道的容量,字節 a1.channels.c1.capacity = 1000 # 事務的類型,多少條之后source推送到channel或者channel推送到sinks a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # 組裝起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~~~ Flume-agent2: avro-hdfs.conf ~~~ a1.sources = r1 a1.sinks =s1 a1.channels = c1 ##source中的avro組件是一個接收者服務 a1.sources.r1.type = avro # 綁定一個ip和端口 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 類型hdfs a1.sinks.s1.type=hdfs # hdfs目錄 a1.sinks.s1.hdfs.path=hdfs://master:9000/flumedata # 文件的前綴,在hdfs上前綴 a1.sinks.s1.hdfs.filePrefix = access_log # 批次大小,就是文件達到多少條才提交到hdfs a1.sinks.s1.hdfs.batchSize= 100 # 當前文件存儲數據類型,還可以用壓縮格式 a1.sinks.s1.hdfs.fileType = DataStream # 文件的格式類型 a1.sinks.s1.hdfs.writeFormat =Text # 達到下面的三個任何一個就按照那個標準生成一個新文件 #滾動生成的文件按大小生成 agent1.sinks.sink1.hdfs.rollSize = 10240 #滾動生成的文件按行數生成 agent1.sinks.sink1.hdfs.rollCount = 1000 #滾動生成的文件按時間生成,秒 agent1.sinks.sink1.hdfs.rollInterval = 10 # 整體就是每10分鐘滾動生成一個目錄 #開啟滾動生成目錄 agent1.sinks.sink1.hdfs.round = true #以10為一梯度滾動生成,單位在下面 agent1.sinks.sink1.hdfs.roundValue = 10 #單位為分鐘 agent1.sinks.sink1.hdfs.roundUnit = minute # 管道的類型 a1.channels.c1.type = memory # 管道的容量,字節 a1.channels.c1.capacity = 1000 # 事務的類型,多少條之后source推送到channel或者channel推送到sinks a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.s1.channel = c1 ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看