<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # filter ## 1. filter作用 > Logstash過濾器根據規則提取數據 ## 2. 過濾器種類 * [ ] date過濾器:獲得時間格式的數據 * [ ] extractnumbers過濾器:從字符串中提取數字 * [ ] grok過濾器:使用grok格式提取數據中的指定內容,比如從一長串的日志中,提取出ip地址 ### 2.1 date過濾器 > 用于解析事件中的時間數據,日期過濾器用于解析字段中的日期,然后使用該日期或時間戳作為事件的logstash時間戳。 > 默認情況下如果不加date插件的話,在kibana上創建索引模式的時候,選擇的時間戳字段是logstash處理日志的時間,如果日志輸出不是實時的,顯示日志時間與對應的事件是不符合的(即es存儲日志的時間字段是當前時間,不是log實際發生的時間) ~~~dart match => ["timestamp", "ISO8601", "dd-MMM-yyyy HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss"] 默認target指向的就是@timestamp,意思就是如果timestamp匹配上后面自定義的日志格式,就會替換對應的@times ~~~ 可配置的參數: | 參數 | 作用 | 參數類型 | | --- | ---| --- | | locale | 指定用于日期解析的區域設置 |string | | match| 如何匹配時間格式 |array | | tag_on_failure | 匹配失敗后追加的內容 | array | | target | 匹配成功后的內容需要設置的目標字段 | string | | timezone | 指定用于日期解析的時區規范ID |string | 配置 ``` input { redis { key => "logstash-date" host => "localhost" password => "dailearn" port => 6379 db => "0" data_type => "list" type => "date" } } filter { date { match => [ "message", "yyyy-MM-dd HH:mm:ss" ] locale => "Asia/Shanghai" timezone => "Europe/Paris" target => "messageDate" } } output { stdout { codec => rubydebug } } ``` 測試數據 2020-05-07 23:59:59 控制臺輸出 ``` { "@timestamp" => 2020-05-12T13:47:26.094Z, "type" => "date", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "message" => "2020-05-07 23:59:59", "messageDate" => 2020-05-07T21:59:59.000Z } ``` 需要注意我是在2020-05-12 21:47:26發送的消息,而服務器使用的時區為UTC/GMT 0。所以這個時候我插入了2020-05-07 23:59:59的時間,被指定為Europe/Paris時區,所以最后被轉為2020-05-07T21:59:59.000Z ### 2.2 extractnumbers過濾器 注意默認情況下次過濾器為捆綁需要執行bin/logstash-plugin install logstash-filter-extractnumbers操作安裝插件 此過濾器自動提取字符串中找到的所有數字 可配置的參數 ![](https://img.kancloud.cn/af/72/af728770dafcb72b8e461e04898733e3_964x82.png) 配置 ``` input { redis { key => "logstash-extractnumbers" host => "localhost" password => "dailearn" port => 6379 db => "0" data_type => "list" type => "extractnumbers" } } filter { extractnumbers { source => "message" target => "message2" } } ``` ``` output { stdout { codec => rubydebug } } ``` 測試數據 zhangSan5,age16 + 0.5 456 789 控制臺輸出 { "float1" => 0.5, "int2" => 789, "int1" => 456, "@timestamp" => 2020-05-17T03:51:23.695Z, "@version" => "1", "type" => "extractnumbers", "message" => "zhangSan5,age16 + 0.5 456 789", "tags" => [ [0] "_jsonparsefailure" ] } 雖然文檔中介紹,此過濾器會嘗試提取出字符串中所有的字段,但是實際中部分和字母結合的字符串結構并沒有被提取出來,而那些被字母和其他符號被包裹起來的數字沒有被完整的提取出來,比如之前測試的時候使用的zhangSan5,age16 + 0.5,456[789],888這樣的數據就沒有任何內容被提取出來。 ### 2.3 grok過濾器 > Grok 是 Logstash 最重要的插件。它的主要作用就是將文本格式的字符串,轉換成為具體的結構化的數據,配合正則表達式使用。 下面針對Apache日志來分割處理 ![](https://img.kancloud.cn/59/b1/59b1b84967304c71fffbf14cd8ca475e_712x268.png) 下面是apache日志 ![](https://img.kancloud.cn/0e/3a/0e3a07461bfed61b77d240fcc812bf71_710x64.png) 日志中每個字段之間空格隔開,分別對應message中的字段。 如:%{IPORHOST:addre} ?--> 192.168.10.197 ? 但問題是IPORHOST又不是正則表達式,怎么能匹配IP地址呢? 因為IPPRHOST是grok表達式,它代表的正則表達式如下: ![](https://img.kancloud.cn/8d/d2/8dd2e54a5ac6864329a0805cf1516358_725x382.png) IPORHOST代表的是ipv4或者ipv6或者HOSTNAME所匹配的grok表達式。 上面的IPORHOST有點復雜,我們來看看簡單點的,如USER ``` USERNAME \[a-zA-Z0-9.\_-\]+ ? ?? ``` #USERNAME是匹配由字母,數字,“.”, "\_", "-"組成的任意字符 USER %{USERNAME} #USER代表USERNAME的正則表達式 第一行,用普通的正則表達式來定義一個 grok 表達式; 第二行,通過打印賦值格式,用前面定義好的 grok 表達式來定義另一個 grok 表達式。 **grok的語法:** %{syntax:semantic} syntax代表的是正則表達式替代字段,semantic是代表這個表達式對應的字段名,你可以自由命名。這個命名盡量能簡單易懂的表達出這個字段代表的意思。 logstash安裝時就帶有已經寫好的正則表達式。路徑如下: /usr/local/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns 或者直接訪問https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns 上面IPORHOST,USER等都是在里面已經定義好的!當然還有其他的,基本能滿足我們的需求 **日志匹配** 當我們拿到一段日志,按照上面的grok表達式一個個去匹配時,我們如何確定我們匹配的是否正確呢? http://grokdebug.herokuapp.com/ 這個地址可以滿足我們的測試需求。就拿上面apache的日志測試。 ![](https://img.kancloud.cn/8c/fc/8cfc03fd858727ace5bdf38bb4669ede_942x290.png) 點擊后就出現如下數據,你寫的每個grok表達式都獲取到值了。為了測試準確,可以多測試幾條日志。 ~~~ { ??"addre":?[ ????[ ??????"192.168.10.97" ????] ??], ??"HOSTNAME":?[ ????[ ??????"192.168.10.97", ??????"192.168.10.175" ????] ...........中間省略多行........... ??"http_referer":?[ ????[ ??????"http://192.168.10.175/" ????] ??], ??"URIPROTO":?[ ????[ ??????"http" ????] ??], ??"URIHOST":?[ ????[ ??????"192.168.10.175" ????] ??], ??"IPORHOST":?[ ????[ ??????"192.168.10.175" ????] ??], ??"User_Agent":?[ ????[ ??????"Mozilla/5.0?(Windows?NT?6.1;?WOW64)?AppleWebKit/537.36?(KHTML,?like?Gecko)?Chrome/45.0.2454.101?Safari/537.36" ????] ??] } ~~~ 每條日志總有些字段是沒有數據顯示,然后以“-”代替的。所有我們在匹配日志的時候也要判斷。 如:(?:%{NUMBER:bytes}|-) ? 但是有些字符串是在太長,如:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36 我們可以使用%{GREEDYDATA browser}. ? ? ? ?? 對應的grok表達式: GREEDYDATA ?.\* ?? #GREEDYDATA表達式的意思能匹配任意字符串 **自定義grok表達式** 如果你感覺logstash自帶的grok表達式不能滿足需要,你也可以自己定義 如: ![](https://img.kancloud.cn/49/df/49dfd520d0fa2f833bbde64d5725ee5c_705x261.png) patterns\_dir為自定義的grok表達式的路徑。 自定義的patterns中按照logstash自帶的格式書寫。 APACHE\_LOG %{IPORHOST:addre} %{USER:ident} %{USER:auth} \\\[%{HTTPDATE:timestamp}\\\] \\"%{WORD:http\_method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\\" %{NUMBER:status} (?:%{NUMBER:bytes}|-) \\"(?:%{URI:http\_referer}|-)\\" \\"%{GREEDYDATA:User\_Agent}\\" 我只是把apache日志匹配的grok表達式寫入自定義文件中,簡化conf文件。單個字段的正則表達式匹配你可以自己書寫測試。 ### 2.3 提取Java日志字段 一般來說,我們從filebeat或者其他地方拿到下面的日志: ~~~text 2018-04-13 16:03:49.822 INFO o.n.p.j.c.XXXXX - Star Calculator ~~~ 然后想把它其中的一些信息提取出來再扔到es中存儲,我們就需要grok match把日志信息切分成[索引數據](https://www.zhihu.com/search?q=%E7%B4%A2%E5%BC%95%E6%95%B0%E6%8D%AE&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)(match本質是一個正則匹配) [grok match](https://www.zhihu.com/search?q=grok+match&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D): ~~~json match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{GREEDYDATA:log_content}"} ~~~ 切出來的數據 ~~~json { "log_date": [ [ "2018-04-13" ] ], "log_localtime": [ [ "16:03:49.822" ] ], "HOUR": [ [ "16" ] ], "MINUTE": [ [ "03" ] ], "SECOND": [ [ "49.822" ] ], "log_type": [ [ "INFO" ] ], "log_file": [ [ "o.n.p.j.c.XXXX" ] ], "log_content": [ [ "Star Calculator" ] ] } ~~~ 上面所有切出來的field都是es中[mapping index](https://www.zhihu.com/search?q=mapping+index&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D),都可以在用來做條件查詢. [grokdebug.herokuapp.com](https://link.zhihu.com/?target=http%3A//grokdebug.herokuapp.com/)里面可以做測試. [grokdebug.herokuapp.com/patterns](https://link.zhihu.com/?target=http%3A//grokdebug.herokuapp.com/patterns)所有可用的patterns都可以在這里查到. 現在我們在用的配置見/logstash/logstash-k8s.conf Q: 需要指定mapping index的數據類型怎么辦? A: grok match本質是一個正則匹配,默認出來的數據都是String.有些時候我們知道某個值其實是個數據類型,這時候可以直接指定數據類型. 不過match中僅支持直接轉換成int ,float,語法是 %{NUMBER:response\_time:int} 完整配置: ~~~json match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"} ~~~ Q:[索引文件](https://www.zhihu.com/search?q=%E7%B4%A2%E5%BC%95%E6%96%87%E4%BB%B6&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)想需要按日期分別存放,怎么辦? A: out中指定index格式,如 index=> "k8s-%{+YYYY.MM.dd}" 完整out如下: ~~~json output { elasticsearch { hosts => "${ES_URL}" manage_template => false index => "k8s-%{+YYYY.MM.dd}" } } ~~~ 完整[logstash.conf](https://www.zhihu.com/search?q=logstash.conf&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D) ~~~text input { beats { host => "0.0.0.0" port => 5043 } } filter { if [type] == "kube-logs" { mutate { rename => ["log", "message"] } date { match => ["time", "ISO8601"] remove_field => ["time"] } grok { match => { "source" => "/var/log/containers/%{DATA:pod_name}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{DATA:container_id}.log"} match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"} remove_field => ["source"] break_on_match => false } } } output { elasticsearch { hosts => "${ES_URL}" manage_template => false index => "k8s-%{+YYYY.MM.dd}" } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看