LogStash管道通常有一個或多個輸入,過濾,輸出插件。本節中的場景通過Logstash配置文件來指定這些插件,并討論了每個插件在做什么。
Logstash配置文件定義了Logstash 管道,當你使用-f <path/to/file>啟動一個Logstash實例,其實使用了一個配置文件定義了管道實例。
一個Logstash管道有兩個必備元素,輸入和輸出,一個可選元素,過濾器。input插件從一個源攝取數據,filter插件按照你指定的方式修改數據,output插件寫出數據到一個目標數據庫。

下面是一個管道配置的模板:
```
# #是注釋,使用注釋描述配置
input {
}
# 該部分被注釋,表示filter是可選的
# filter {
#
# }
output {
}
```
這個模板是不具備功能的,因為輸入和輸出部分沒有定義的任何有效的選項。
將模板粘貼到你的Logstash根目錄,命名為first-pipeline.conf。
## 解析Apache日志輸出到Elasticsearch
這個例子創建一個Logstash管道,使用Apache web logs 作為輸入,解析這些日志到特定的命名字段,并且輸出這些解析數據到ElashticSearch集群
你可以下載樣例數據,[這里](https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz)
### 配置Logstash用于文件輸入
啟動Logstash管道,使用file input插件配置Logstash實例。
修改first-pipeline.conf 添加如下內容:
```
input {
file {
path => "/path/to/logstash-tutorial.log"
start_position => beginning
ignore_older => 0
}
}
```
* file input 插件的默認行為是監視文件中的末尾新內容,和Unix中的tail -f 命令異曲同工。改變這種默認行為的方式是指定Logstash從文件的什么位置開始處理文件。
* 文件輸入插件的默認行為是忽略最后時間距今超過86400秒的文件。要更改這種默認行為,并處理tutorial file(日期距今大于1天的文件),我們需要指定不能忽視舊文件。
用實際的logstash-tutorial.log日志文件目錄替換 /path/to/。
### 用Grok 過濾器插件解析web logs
gork 過濾器插件是Logstash的幾個默認可用插件之一。關于如何管理Logstash 插件的更多細節請參考:[reference documentation](https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html)
grok 過濾器插件在輸入日志中查找指定的模式,配置文件中需要指定哪些模式的識別是你的案例所感興趣的。一個代表性的web server 日志文件行信息,如下:
```
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
```
行首的IP地址信息很容易被識別,括號中的時間戳也是如此。在此教程中,使用```%{COMBINEDAPACHELOG} grok``` 模式,它的行信息結構有如下的一些字段。
|Information|Field Name|
|-|-|
|IP Address|clientip|
|User ID|ident|
|User Authentication|auth|
|timestamp|timestamp|
|HTTP Verb|verb|
|Request body|request|
|HTTP Version|httpversion|
|HTTP Status Code|response|
|Bytes served|bytes|
|Referrer URL|referrer|
|User agent|agent|
編寫 first-pipeline.conf添加如下的內容:
```
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
```
過濾器處理之后,一個行信息是如下的JSON的結構:
```
{
"clientip" : "83.149.9.216",
"ident" : ,
"auth" : ,
"timestamp" : "04/Jan/2015:05:13:42 +0000",
"verb" : "GET",
"request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"httpversion" : "HTTP/1.1",
"response" : "200",
"bytes" : "203023",
"referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/",
"agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
}
```
### 索引數據到ElasticSearch集群
Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file to add the following text after the input section:
現在,網絡日志被分解成具體的字段, 使用Logstash管道可以將數據索引到Elasticsearch集群。編輯first-pipeline.conf 文件,在input 段之后添加以下文字:
```
output {
elasticsearch {
}
}
```
用這樣的配置, Logstash使用http協議連接到Elasticsearch 。上面的例子假設Logstash和Elasticsearch運行在同一實例中。您可以使用主機配置像```hosts => "es-machine:9092"```指定遠程Elasticsearch實例。
### 使用geoip插件豐富你的數據內容
除了解析日志數據,filter插件可以從現有數據基礎上補充信息。例如: geoip插件查找IP地址,從而導出地址地理位置信息,并將該位置信息發送給日志。
使用geoip filter插件配置您的Logstash,如:通過添加以下行到first-pipeline.conf文件的filter部分:
```
geoip {
source => "clientip"
}
```
插件geoip的配置需要的數據已經被定義為獨立的字段中的數據。確保在配置文件中,geoip 配置段在grok配置段之后。
需要指定包含IP地址的字段的名稱。在本例中,字段名稱是clientip(grok插件字段) 。
### 測試一下
此時此刻,你的first-pipeline.conf文件有input,filter和output段,正確的配置看起來是這樣的:
```
input {
file {
path => "/Users/palecur/logstash-1.5.2/logstash-tutorial-dataset"
start_position => beginning
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {}
stdout {}
}
```
測試配置語法的正確性,使用下面的命令:
```
bin/logstash -f first-pipeline.conf --configtest
```
使用--configtest 選項解析你的配置文件并報告錯誤.當配置文件通過檢查之后,用如下命令啟動Logstash:
```
bin/logstash -f first-pipeline.conf
```
基于gork filter插件的字段索引,在Elasticsearch運行一個測試查詢,
```
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=response=200'
```
使用當前日期替換$DATE(YYYY.MM.DD格式).因為我們的例子只有一條200 HTTP response,所以查詢命中一條結果:
```
{"took":2,
"timed_out":false,
"_shards":{"total":5,
"successful":5,
"failed":0},
"hits":{"total":1,
"max_score":1.5351382,
"hits":[{"_index":"logstash-2015.07.30",
"_type":"logs",
"_id":"AU7gqOky1um3U6ZomFaF",
"_score":1.5351382,
"_source":{"message":"83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"@version":"1",
"@timestamp":"2015-07-30T20:30:41.265Z",
"host":"localhost",
"path":"/path/to/logstash-tutorial-dataset",
"clientip":"83.149.9.216",
"ident":"-",
"auth":"-",
"timestamp":"04/Jan/2015:05:13:45 +0000",
"verb":"GET",
"request":"/presentations/logstash-monitorama-2013/images/frontend-response-codes.png",
"httpversion":"1.1",
"response":"200",
"bytes":"52878",
"referrer":"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
"agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
}
}]
}
}
```
嘗試一個基于ip地址的地理信息查詢:
```
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=geoip.city_name=Buffalo'
```
使用當前日期替換$DATE(YYYY.MM.DD格式).
只有一條log記錄是來自于城市Buffalo,所以查詢的結果是:
```
{"took":3,
"timed_out":false,
"_shards":{
"total":5,
"successful":5,
"failed":0},
"hits":{"total":1,
"max_score":1.03399,
"hits":[{"_index":"logstash-2015.07.31",
"_type":"logs",
"_id":"AU7mK3CVSiMeBsJ0b_EP",
"_score":1.03399,
"_source":{
"message":"108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"",
"@version":"1",
"@timestamp":"2015-07-31T22:11:22.347Z",
"host":"localhost",
"path":"/path/to/logstash-tutorial-dataset",
"clientip":"108.174.55.234",
"ident":"-",
"auth":"-",
"timestamp":"04/Jan/2015:05:27:45 +0000",
"verb":"GET",
"request":"/?flav=rss20",
"httpversion":"1.1",
"response":"200",
"bytes":"29941",
"referrer":"\"-\"",
"agent":"\"-\"",
"geoip":{
"ip":"108.174.55.234",
"country_code2":"US",
"country_code3":"USA",
"country_name":"United States",
"continent_code":"NA",
"region_name":"NY",
"city_name":"Buffalo",
"postal_code":"14221",
"latitude":42.9864,
"longitude":-78.7279,
"dma_code":514,
"area_code":716,
"timezone":"America/New_York",
"real_region_name":"New York",
"location":[-78.7279,42.9864]
}
}
}]
}
}
```
## 使用多個輸入和輸出插件
你需要管理的信息通常來自于不同的源,你的案例也可能將數據送到多個目標。Logstash 管道需要使用多個輸出或者輸出插件來滿足這些需求。
這個例子創建一個Logstash 管道來從Twitter feed 和Filebeat client獲取輸入信息,然后將這些信息送到Elasticsearch集群,同時寫入一個文件.
### 從Twitter feed讀數據
要添加一個Twitter feed,你需要一系列的條件:
* 一個consumer key :能唯一標識你的Twitter app(在這個例子中是Logstash)
* 一個consumer secret, 作為你的Twitter app的密碼
* 一個或多個 keywords用于查詢 feed.
* 一個oauth token,標志使用這個app的Twitter賬號
* 一個 oauth token secret, 作為 Twitter 賬號密碼.
訪問 https://dev.twitter.com/apps 注冊一個Twitter 賬號 并且生成你的 consumer key 和 secret, 還有你的 OAuth token 和 secret.
把如下的內容追加到first-pipeline.conf文件的input段:
```
twitter {
consumer_key =>
consumer_secret =>
keywords =>
oauth_token =>
oauth_token_secret =>
}
```
### Filebeat Client
filebeat 客戶端是一個輕量級的資源友好的文件服務端日志收集工具,并且把這些日志送到Logstash實例來處理。filebeat 客戶端使用安全Beats協議與Logstash實例通信。lumberjack協議被設計成可靠的低延遲的.Filebeat 使用托管源數據的計算機的計算資源,并且Beats input插件減少了 Logstash實例的資源需求.
>注意
一個典型的用法是,Filebeat運行在,與Logstash實例分開的一個單獨的計算機上.為了教學方便,我們將Logstash 和 Filebeat運行在同一個計算機上.
默認的Logstash配置包含Beats input插件,它被設計成資源友好的.安裝Filebeat在數據源所在的機器,從[Filebeat產品頁](https://www.elastic.co/downloads/beats/filebeat)下載合適的包.
創建一個和這個例子類似的配置文件:
```
filebeat:
prospectors:
-
paths:
- "/path/to/sample-log"
fields:
type: syslog
output:
logstash:
hosts: ["localhost:5043"]
tls:
certificate: /path/to/ssl-certificate.crt
certificate_key: /path/to/ssl-certificate.key
certificate_authorities: /path/to/ssl-certificate.crt
timeout: 15
```
* Filebeat 處理的一個或多個文件的路徑
* Logstash實例的SSL證書路徑
以filebeat.yml為名,保存配置文件
配置你的Logstash實例,加入Filebeat input插件.將如下內容添加到first-pipeline.conf文件的input段中.
```
beats {
port => "5043"
ssl => true
ssl_certificate => "/path/to/ssl-cert"
ssl_key => "/path/to/ssl-key"
}
```
* Logstash實例驗證自己到Filebeat的SSL證書的路徑。
* SSL證書的key的保存路徑。
### 將Logstash數據寫入一個文件
可以通過 file output插件配置讓Logstash管道將數據直接寫入一個文件.
將如下內容添加到first-pipeline.conf文件的output段,完成Logstash實例配置:
```
file {
path => /path/to/target/file
}
```
### 輸出數據到多個 Elasticsearch節點
輸出數據到多個 Elasticsearch節點,可以減輕某個給定節點的資源需求.當某一個節點掛掉的時候,提供了數據記錄輸入Elasticsearch的更多選擇.
修改你的first-pipeline.conf文件的output段,這樣就可以讓你的Logstash實例將數據寫入到多個Elasticsearch節點.
```
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
}
```
host參數要使用三個非master的Elasticsearch集群節點ip.當host參數列出多個ip地址時,Logstash負載均衡請求到列表中的地址.另外Elasticsearch的默認端口是9200,在上面的配置中可以省略.
### 測試管道
此時,你的 first-pipeline.conf 文件應該是下面的這樣:
```
input {
twitter {
consumer_key =>
consumer_secret =>
keywords =>
oauth_token =>
oauth_token_secret =>
}
beats {
port => "5043"
ssl => true
ssl_certificate => "/path/to/ssl-cert"
ssl_key => "/path/to/ssl-key"
}
}
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
file {
path => /path/to/target/file
}
}
```
Logstash從你配置的Twitter feed消費數據, 從 Filebeat獲取數據, 并且將這些數據索引到Elasticsearch的三個節點,并且同時將數據寫入一個文件.
在數據源節點上,使用如下命令啟動Filebeat:
```
sudo ./filebeat -e -c filebeat.yml -d "publish"
```
Filebeat嘗試連接5403端口,知道你的Logstash的Beats 插件生效,這個端口都沒有響應,所以現在可以將任何的連接異常看作是正常的.
校驗你的配置,使用如下命令:
```
bin/logstash -f first-pipeline.conf --configtest
```
使用--configtest 選項解析你的配置文件并報告錯誤.當配置文件通過檢查之后,用如下命令啟動Logstash:
```
bin/logstash -f first-pipeline.conf
```
使用grep命令在你的目標文件中查找,驗證Mozilla信息的存在.
```
grep Mozilla /path/to/target/file
```
在你的Elasticsearch集群中運行Elasticsearch查詢,驗證同樣的信息也存在!
```
curl -XGET 'localhost:9200/logstash-2015.07.30/_search?q=agent=Mozilla'
```