0%

[ELK] 利用 ELK 架構分析 log

ELK

前言

通常我們架設網站或服務時,會把 Log 寫在檔案中,方便追蹤系統狀態,當發生問題時也能夠查看紀錄,但是當我們的服務是分散在許多台機器時,每次發生問題就要到各個機器開啟 Log file 查看,這樣的過程很繁瑣又很沒效率,所以這一篇文章主要紀錄如何利用 ELK 架構來分析 Log,透過 ELK 架構可以更方便地追蹤系統狀態、分析 Log.

ELK 是由 ElasticsearchLogstashKibana 所組成的 Log 收集、分析和查詢的架構:

  • Elasticsearch: 是一個以 Apache Lucene 為核心,分散式的 RESTful 風格的搜尋和數據分析引擎。它是以 JSON 的形式儲存資料,並提供即時的分析及搜尋。在此架構中, 主要作為儲存和查詢 Log 的搜尋引擎。
  • Logstash: 是開源的 Log 收集、處理的工具,並將處理後的 Log 資料儲存到 Elasticsearch.
  • Kibana: 將 Elasticsearch 中的資料以視覺化的方式呈現,並提供操作 Elastic Stack 的 UI 介面。

安裝及設定

Elasticsearch & Kibana

可以參考 Elasticsearch 使用 Elasticsearch + Kibana 實現中文全文檢索 來安裝和設定 Elasticsearch 和 Kibana.

Logstash

直接從 Logstash 網站中下載並解壓縮:

1
2
3
$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.tar.gz
$ tar zxvf logstash-6.2.3.tar.gz
$ cd logstash-6.2.3

使用 ELK 架構

依照上面步驟安裝完 Elasticsearch、Kibana 和 Logstash 之後,可以開始來設定 Logstash 蒐集 Log 資料,將資料處理過後儲存到 Elasticsearch, 最後透過 Kibana 呈現 Log 內容,流程如下:

ELK流程

設定 Logstash

Logstash 處理資料的流程: 輸入資料 → 過濾/處理資料 → 輸出資料, 設定檔格式如下:

1
2
3
4
5
6
7
8
9
input {

}
filter {

}
output {

}

這裡稍微介紹一下 Logstash 常用的 plugins:

Input plugins

我們主要透過 Input plugins 來讀取 Log 資料,以下是幾個常用的 Input plugins 及使用方式,更多 Input plugins 可以參考官方網站 - Input plugins.

tcp

tcp plugin 是可以接收來自 TCP socket 的資料,將各個機器的 Log 送到指定的 port 就可以蒐集不同來源的資料了。

範例:

1
2
3
4
5
6
7
8
9
10
11
12
input {
tcp {
port => 9250
codec => json # vaule type, default is "plain"
type => dev_log # 分類, 可以用在 filter 進行資料處理
}
tcp {
port => 9260
codec => json
type => sys_log
}
}
file

file plugin 主要是用來讀取已經寫成純文字檔的 Log file, 預設會持續觀察指定檔案,如果檔案有更新則會觸發 Logstash 進行資料擷取。

範例:

1
2
3
4
5
6
7
8
9
input {
file {
path => ["/var/log/*.log"] # path can be an array
type => "sys_log"
start_position => "beginning" # 如果是第一次啟動,可以設定讀取全部檔案內容
discover_interval => 15 # 多久察看一次指定檔案, 預設15秒
exclude => ["*.tar.gz"] # 排除的檔案
}
}
elasticsearch

elasticsearch plugin 可以用來讀取 Elasticsearch 的 query result, 也可以排程定期執行一次 query.

範例:

1
2
3
4
5
6
7
8
input {
elasticsearch {
hosts => ["localhost:9200"] # hosts can be an array
index => "mylog" # default is "logstash-*"
query => '{ \"query\": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
schedule => "0 * * * *" # cron format, 每小時執行一次
}
}

上面範例等同於每小時執行一次以下指令:

1
2
3
4
5
6
7
8
$ curl 'http://localhost:9200/mylog/_search?&scroll=1m&size=1000' -d '{
"query": {
"match": {
"statuscode": 200
}
},
"sort": [ "_doc" ]
}'

Filter plugins

透過 Filter plugins 可以過濾資料,這裡介紹一些 Filter plugins 的使用方式,更多 Filter plugins 可以參考官方網站 - Filter plugins:

grok

我們所要過濾的資料都在 message 這個欄位,也就是 Log file 的完整內容,而 grok 這個 plugin 有方便的字串處理功能,可以使用正規表達式及 grok 語法來處理資料。範例:

test.log:

1
[2018-04-16 13:51:17.482] INFO MyServer - client:127.0.0.1, Server start.

使用範例:

1
2
3
4
5
6
7
filter {
grok {
match => {
"message" => "\[(?<date>.+?)\] %{LOGLEVEL:level} %{DATA:logger} - client:%{IPV4:client_ip}, %{DATA:message}"
}
}
}

上面的範例使用正規表達式切割出日期 date,剩下的則是使用 grok 語法來切割。grok 語法的格式如: %{grok patterns:自訂屬性名稱},透過以上設定就可以將 Log 資料處理成:

1
2
3
4
5
6
7
{
"date": "2018-04-16 13:51:17.482",
"level": "INFO",
"logger": "MyServer",
"client_ip": "127.0.0.1",
"message": "Server start."
}

詳細的 grok patterns 可以參考: grok patterns, 這裡也提供一個方便的工具 grok debugger, 可以用來測試 grok 語法是否正確。

json

透過 json plugin, 可以處理 json 格式的資料,範例:

test.log:

1
{"date":"2018-04-16 13:51:17.482","loglevel":"INFO","message":"Server start.","more":["test1", "test2"]}

filter 設定:

1
2
3
4
5
6
filter {
json {
source => "message"
target => "jsoncontent"
}
}

上面的範例是將 json 格式的資料處理後,儲存到 jsoncontent 屬性,結果如下:

1
2
3
4
5
6
7
8
{
"jsoncontent": {
"date": "2018-04-16 13:51:17.482",
"loglevel": "INFO",
"message": "Server start.",
"more": ["test1", "test2"]
}
}
mutate

mutate 這個 plugin 除了有字串切割的功能之外,還有型態轉換和基本運算的功能,使用範例如下:

test.log:

1
2018-04-16 13:51:17.482,INFO,MyServer,client:127.0.0.1,Server start.

我們可以將 grok 處理後的資料取出來,再透過 mutate convert 轉換至其他型態:

1
2
3
4
5
filter {
mutate {
convert => ["exec_time", "float"]
}
}

也可以將 log file 內容以指定符號切割:

1
2
3
4
5
filter {
mutate {
split => ["message", ","]
}
}

也可以取代特定字串:

1
2
3
4
5
filter {
mutate {
gsub => ["message", " ", "_"]
}
}

另外還可以重新命名屬性欄位:

1
2
3
4
5
filter {
mutate {
rename => ["exec_time", "sys_exec_time"]
}
}
geoip

geoip 可以分析 IPv4 或 IPv6, 提供 IP 的所在位置及相關資訊。

使用範例:

1
2
3
4
5
6
7
8
9
10
filter {
grok {
"match" => {
"message" => "%{IPV4:ip}"
}
}
geoip {
source => "ip"
}
}

會產生 geoip 屬性,裡面包含 IP 的相關資訊:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
"geoip": {
"city_name": "Hanoi",
"timezone": "Asia/Ho_Chi_Minh",
"ip": "123.30.238.16",
"latitude": 21.0333,
"country_code2": "VN",
"country_name": "Vietnam",
"continent_code": "AS",
"country_code3": "VN",
"region_name": "Thanh Pho Ha Noi",
"location": [ 105.85, 21.0333 ],
"longitude": 105.85,
"region_code": "64"
}

Output plugins

最後我們透過 Output plugins 來將處理後的資料輸出,在 ELK 架構中,我們搭配 elasticsearch plugin 來將資料輸出到 Elasticsearch, 其他的 Output plugins 可以參考: 官方網站 - Output plugins.

完整的 Logstash config 範例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input {
file {
path => ["/home/user/test.log"]
type => "test_log"
}
}
filter {
grok {
match => {
"message" => "\[(?<date>.+?)\] %{LOGLEVEL:level} %{DATA:logger} - client:%{IPV4:client_ip}, %{DATA:detail}"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"] # hosts can be an array
index => "mylog" # Elasticsearch index name
}
}

寫好設定檔之後,啟動 Logstash:

1
$ ./bin/logstash -f ./config/log.conf

瀏覽 Log 資料

接著我們開啟 Kibana 的 Discover 頁面, 就可以看到由 Logstash 輸出的 Log 資料囉!
如果是第一次使用,記得要先到 Management 建立 Index pattern, 就能在 Discover 中看到 Elasticsearch 中的資料了!

Kibana UI

參考資料