[TOC]
以下示例說明如何配置LogStash來過濾事件,對Apache日志和syslog進行處理,并且使用條件式來控制被filter或output處理的事件.
> <font color=#1E90FF size=4>TIP</font>:如果你需要構建grok匹配的幫助,試試我們的[Grok Debugger](https://www.elastic.co/guide/en/kibana/6.5/xpack-grokdebugger.html).Grok Debugger是基礎授權下的X-Pack的一個特性所以是可以免費使用的.
### 配置Filters
Filters是一種在線處理機制,可以靈活地對數據進行切片和切割,以滿足您的需求.讓我們看一些正在使用的filters.下面的配置文件設置了`grok`和`date`filters.
```ruby
input { stdin { } }
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
```
使用此配置文件運行Logstash:
```shell
bin/logstash -f logstash-filter.conf
```
現在,將下面的內容粘貼到你的終端上,并按下回車stdin input進行處理:
```ruby
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
```
你會看到類似下面的輸出:
```
{
"message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
"@timestamp" => "2013-12-11T08:01:45.000Z",
"@version" => "1",
"host" => "cadenza",
"clientip" => "127.0.0.1",
"ident" => "-",
"auth" => "-",
"timestamp" => "11/Dec/2013:00:01:45 -0800",
"verb" => "GET",
"request" => "/xampp/status.php",
"httpversion" => "1.1",
"response" => "200",
"bytes" => "3891",
"referrer" => "\"http://cadenza/xampp/navi.php\"",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}
```
如你所見,Logstash(在`grok`filter的幫助下)可以解析日志行(日志剛好是Apache "combined log"格式的)并將其分解為分散的信息位.這是非常有用的,一旦你開始查詢和分析我們的日志數據.比如,你可以更加輕松的管理關于HTTP響應碼,IP地址,referres等信息. Logstash中包含了相當多的開箱即用的grok匹配.因此,如果你需要解析一個通用的日志格式,很有可能有人已經為你做了這項工作。更多信息,可以查看我們在GitHub上的[Logstash grok patterns](https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns)列表.
另一個在本示例中使用的filter是`date`filter.這個filter解析出一個timestamp(時間戳)并將其作為事件的時間戳(不論你何時獲取的你的日志數據).你可能注意到了在這個示例中`@timestamp`字段被設置為2013.12.11盡管Logstash是在這之后的某個事件點接收的這個事件.
### 處理Apache日志
讓我們做一些真正有用的事情:處理Apache2的訪問日志文件!我們將從本地讀取一個文件到input,并且使用[條件式](https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals)來根據我們的需要處理事件.首先,創建一個名字類似于logstash-apache.conf的文件,內容如下(你可以根據自己的情況修改日志文件的路徑):
```ruby
input {
file {
path => "/tmp/access_log"
start_position => "beginning"
}
}
filter {
if [path] =~ "access" {
mutate { replace => { "type" => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
```
然后,創建你在上述配置中的input文件(示例中為"/tmp/access_log")并使用下面的日志內容(或者你可以使用來自自己網站的內容):
```js
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
```
現在,用-f選項來使用指定的配置文件啟動Logstash
```shell
bin/logstash -f logstash-apache.conf
```
現在你應該可以在Elasticsearch中看到你的Apache日志數據.Logstash打開并讀取指定的input文件,處理每一個遇到的事件.所有向此文件追加的行也會被捕獲,被Logstash作為事件處理,然后存儲在Elasticsearch中.一個額外的好處是"type"字段被設置為"apache_access"(這是由 type => "apache_access"這一行決定的).
這個配置文件中,Logstash只監視了Apache訪問日志,不過你可以很輕松的監控訪問日志和錯誤日志(事實上,所有可以匹配`*log`的文件),通過修改上述配置中的一行:
```ruby
input {
file {
path => "/tmp/*_log"
}
}
```
當你重啟Logstash,它將同時處理錯誤日志和訪問日志.不過,如果你檢查你的數據(使用Elasticsearch-kopf之類的),你可能會發現access_log被解析成了多個字段,但是error_log不是.這是因為我們使用的`grok`filter只匹配了標準的apache日志格式,并自動將其分割成不同的字段.如果我們能根據每一行的格式來控制其解析方式不是更好嗎?當然,我們可以...
注意Logstash不會重新處理已經看到過的access_log文件中的時間.當從一個文件讀取的時候,Logstash會保存其位置并且只處理新增的行.Neat!
### 使用條件式
你可以使用條件式來控制被filter或output處理的事件.比如,你可以根據事件出現在的文件對其進行標記(access_log, error_log, 或者隨便其他以 "log"結尾的文件).
```ruby
input {
file {
path => "/tmp/*_log"
}
}
filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
```
這個示例使用`type`字段標記了所有事件,但并沒有真正解析`error`或`random`文件.錯誤日志的類型如此之多,如何給它們標記取決于你使用的日志文件.
同樣的,你可以使用條件式來將時間重定向到指定的outputs.比如,你可以:
+ 將apache狀態碼為5xx的時間發送給nagios報警
+ 將狀態碼4xx的記錄到Elasticsearch
+ 將所有的信息記錄到statsd
要通知nagios關于http 5xx的事件,首先需要檢查`type`字段.如果是apache,就看它的狀態碼是否包含5xx錯誤.如果是,發送給nagios.如果不是5xx錯誤,檢查是夠包含4xx錯誤,如果是,發送給Elasticsearch.最后將所有的apache狀態碼發送給statsd.
```ruby
output {
if [type] == "apache" {
if [status] =~ /^5\d\d/ {
nagios { ... }
} else if [status] =~ /^4\d\d/ {
elasticsearch { ... }
}
statsd { increment => "apache.%{status}" }
}
}
```
### 處理Syslog信息
處理Syslog是Logstash最常見的用例之一.并且處理的非常好(只要日志大致符合RFC3164).Syslog是事實上的UNIX網絡日志記錄標準,從客戶端機器向本地文件發送消息,或者通過rsyslog發送給集中式的日志服務器.對于這個例子,你不需要一個syslog實例,我們將從命令行中模擬它,以便您能夠了解所發生的事情。
首先,我們為Logstash+syslog創建一個簡單的配置文件,叫做logstash-syslog.conf.
```ruby
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
```
使用這個新的配置文件來運行Logstash
```shell
bin/logstash -f logstash-syslog.conf
```
通常,客戶端機器將連接到端口5000上的Logstash實例并發送其消息。在這個例子中,我們telnet到Logstash并輸入一個日志行(類似于我們之前在STDIN中輸入日志行).打開另一個shell窗口,與Logstash Syslog輸入進行交互,并輸入以下命令:
```shell
telnet localhost 5000
```
粘貼下面的示例內容(你可以使用自己的內容,但要注意,`grok`filter可能不能正確解析你的數據)
```ruby
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
```
現在你應該可以在你之前的Logstash窗口看到處理并解析過的信息.
```ruby
{
"message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
"@timestamp" => "2013-12-23T22:30:01.000Z",
"@version" => "1",
"type" => "syslog",
"host" => "0:0:0:0:0:0:0:1:52617",
"syslog_timestamp" => "Dec 23 14:30:01",
"syslog_hostname" => "louis",
"syslog_program" => "CRON",
"syslog_pid" => "619",
"syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
"received_at" => "2013-12-23 22:49:22 UTC",
"received_from" => "0:0:0:0:0:0:0:1:52617",
"syslog_severity_code" => 5,
"syslog_facility_code" => 1,
"syslog_facility" => "user-level",
"syslog_severity" => "notice"
}
```
- Emmm
- Logstash簡介
- 開始使用Logstash
- 安裝Logstash
- 儲存你的第一個事件
- 通過Logstash解析日志
- 多個輸入和輸出插件的混合使用
- Logstash是如何工作的
- 執行模型Execution Model
- 設置并運行Logstash
- Logstash目錄布局
- Logstash配置文件
- logstash.yml
- Secrets keystore for secure settings
- 從命令行運行Logstash
- 以服務的方式運行Logstash
- 在Docker中運行Logstash
- 配置容器版Logstash
- Logging
- 關閉Logstash
- 安裝X-Pack
- 設置X-Pack
- 升級Logstash
- 使用包管理升級
- 直接下載進行升級
- 升級至6.0
- Upgrading with the Persistent Queue Enabled
- 配置Logstash
- 管道配置文件的結構
- 訪問配置中的事件數據和字段
- 在配置中使用環境變量
- Logstash配置示例
- 多管道
- 管道間通信(beta)
- 重載配置文件
- 管理多行事件
- Glob Pattern Support
- Converting Ingest Node Pipelines
- Logstash間通信
- 配置集中式管道管理
- X-Pack Monitoring
- X-Pack Security
- X-Pack Settings
- Field References Deep Dive(深入字段引用)
- 管理Logstash
- 集中式管道管理
- 使用Logstash模塊
- 使用Elastic Cloud
- Logstash ArcSight模塊