在本系列文章的第3部分關(guān)于實(shí)時(shí)流處理的文章中,我們學(xué)習(xí)了如何使用ElasticSearch的批量API以及利用REST API將.json航班數(shù)據(jù)文件導(dǎo)入ElasticSearch。
在這篇文章中,我們將介紹另一種方式,Logstash。
Logstash介紹
Logstash是一個(gè)開源的數(shù)據(jù)收集引擎,具有實(shí)時(shí)流水線功能。
它從多個(gè)源頭接收數(shù)據(jù),進(jìn)行數(shù)據(jù)處理,然后將轉(zhuǎn)化后的信息發(fā)送到stash,即存儲。
Logstash允許我們將任何格式的數(shù)據(jù)導(dǎo)入到任何數(shù)據(jù)存儲中,不僅僅是ElasticSearch。
它可以用來將數(shù)據(jù)并行導(dǎo)入到其他NoSQL數(shù)據(jù)庫,如MongoDB或Hadoop,甚至導(dǎo)入到AWS。
數(shù)據(jù)可以存儲在文件中,也可以通過流等方式進(jìn)行傳遞。
Logstash對數(shù)據(jù)進(jìn)行解析、轉(zhuǎn)換和過濾。它還可以從非結(jié)構(gòu)化數(shù)據(jù)中推導(dǎo)出結(jié)構(gòu),對個(gè)人數(shù)據(jù)進(jìn)行匿名處理,可以進(jìn)行地理位置查詢等等。
一個(gè)Logstash管道有兩個(gè)必要的元素,輸入和輸出,以及一個(gè)可選的元素,過濾器。
輸入組件從源頭消耗數(shù)據(jù),過濾組件轉(zhuǎn)換數(shù)據(jù),輸出組件將數(shù)據(jù)寫入一個(gè)或多個(gè)目的地。
所以,我們的示例場景的Logstash架構(gòu)基本如下。
我們從.json文件中讀取我們的航班數(shù)據(jù),我們對它們進(jìn)行處理/轉(zhuǎn)換,應(yīng)用一些過濾器并將它們存儲到ElasticSearch中。
Logstash安裝
有幾種選擇來安裝Logstash。
一種是訪問網(wǎng)站下載你平臺的存檔,然后解壓到一個(gè)文件夾。
你也可以使用你的平臺的包管理器來安裝,比如yum、apt-get或homebrew,或者作為docker鏡像來安裝。
確保你已經(jīng)定義了一個(gè)環(huán)境變量JAVA_HOME,指向JDK 8或11或14的安裝(Logstash自帶嵌入式AdoptJDK)。
Logstash工作流
一旦你安裝了它,讓我們通過運(yùn)行最基本的Logstash工作流來測試你的Logstash安裝情況。
bin/logstash -e ‘input { stdin { } } output { stdout {} }’
上面的工作流接受來自stdin(即你的鍵盤)的輸入,并將其輸出到stdout(即你的屏幕)。
上面的工作流中沒有定義任何過濾器。一旦你看到logstash被成功啟動(dòng)的消息,輸入一些東西(我輸入的是Hello world),按ENTER鍵,你應(yīng)該看到產(chǎn)生的消息的結(jié)構(gòu)格式,像下面這樣。
[2021-02-11T21:52:57,120][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}Hello world{ “message” => “Hello world”, “@version” => “1”, “@timestamp” => 2021-02-11T19:57:46.208Z, “host” => “MacBook-Pro.local”}
然而,通常Logstash是通過配置文件來工作的,配置文件告訴它該做什么,即在哪里找到它的輸入,如何轉(zhuǎn)換它,在哪里存儲它。Logstash配置文件的結(jié)構(gòu)基本上包括三個(gè)部分:輸入、過濾和輸出。
你在輸入部分指定數(shù)據(jù)的來源,在輸出部分指定目的地。在過濾器部分,你可以使用支持的過濾器插件來操作、測量和創(chuàng)建事件。
配置文件的結(jié)構(gòu)如下面的代碼示例所示。
input {…}filter {…}output{…}
你需要?jiǎng)?chuàng)建一個(gè)配置文件,指定你要使用的組件和每個(gè)組件的設(shè)置。在config文件夾中已經(jīng)存在一個(gè)配置文件樣本,logstash-sample.conf。
其內(nèi)容如下所示。
# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline. input { beats { port => 5044 }} output { elasticsearch { hosts => [“http://localhost:9200”] index => “%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}” #user => “elastic” #password => “changeme” }}
這里input部分定義了Logstash應(yīng)該從哪里獲取數(shù)據(jù)。這里有一個(gè)可用的輸入插件列表。
我們的輸入不是來自Beats組件,而是來自文件系統(tǒng),所以我們使用文件輸入組件。
input { file { start_position => “beginning” path => “/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json” codec => “json” }}
我們使用start_position參數(shù)來告訴插件從頭開始讀取文件。
需要注意,數(shù)據(jù)路徑必須是絕對的。
我們使用的是json編解碼器,除了json,還可以使用純文本形式。
在下載的數(shù)據(jù)中,可以找到一個(gè)名為test.json的文件。它只由2條航班數(shù)據(jù)組成的文件。
輸出塊定義了Logstash應(yīng)該在哪里存儲數(shù)據(jù)。我們將使用ElasticSearch來存儲我們的數(shù)據(jù)。
我們添加了第二個(gè)輸出作為我們的控制臺,并使用rubydebugger格式化輸出,第三個(gè)輸出作為文件系統(tǒng),最后兩個(gè)用于測試我們的輸出。 我們將輸出存儲在output.json中。
output { elasticsearch { hosts => [“http://localhost:9200”] index => “testflight” } file { path => “/usr/local/Cellar/logstash-full/7.11.0/data/output.json” } stdout { codec => rubydebug }}
此外,還可以定義過濾器來對數(shù)據(jù)進(jìn)行轉(zhuǎn)換。
Logstash提供了大量的過濾器,下面介紹一些非常常用的的過濾器:
- grok:解析任何任意文本并添加結(jié)構(gòu),它包含120種內(nèi)置模式
- mutate:對字段進(jìn)行一般的轉(zhuǎn)換,例如重命名、刪除、替換和修改字段
- drop:丟棄一個(gè)數(shù)據(jù)
- clone:復(fù)制一個(gè)數(shù)據(jù),可能增加或刪除字段
- geoip:添加IP地址的地理位置信息
- split:將多行消息、字符串或數(shù)組分割成不同的數(shù)據(jù)
可以通過執(zhí)行下方命令查看 Logstash 安裝中安裝的全部插件列表。
$ bin/logstash-plugin list
你會(huì)注意到,有一個(gè)JSON過濾器插件。這個(gè)插件可以解析.json文件并創(chuàng)建相應(yīng)的JSON數(shù)據(jù)結(jié)構(gòu)。
正確地選擇和配置過濾器是非常重要的,否則,你最終的輸出中沒有數(shù)據(jù)。
所以,在我們的過濾塊中,我們啟用json插件,并告訴它我們的數(shù)據(jù)在消息字段中。
filter { json { source => “message” } }
到此為止,完成的配置文件config/testflight.conf內(nèi)容如下:
input { file { start_position => “beginning” path => “/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json” codec => “json” }} filter { json { source => “message” }} output {# elasticsearch {# hosts => [“http://localhost:9200/”]# index => “testflight” # } file { path => “/usr/local/Cellar/logstash-full/7.11.0/data/output.json” } stdout { codec => rubydebug }}
你可以通過如下命令進(jìn)行一下測試:
bin/logstash -f config/testflight.conf –config.test_and_exit…Configuration OK[2021-02-11T23:15:38,997][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
如果配置文件通過了配置測試,用以下命令啟動(dòng)Logstash。
bin/logstash -f config/testflight.conf –config.reload.automatic…
–config.reload.automatic配置選項(xiàng)可以實(shí)現(xiàn)自動(dòng)重載配置,這樣你就不必每次修改配置文件時(shí)都要停止并重新啟動(dòng)Logstash。
如果一切順利,你應(yīng)該會(huì)看到如下的輸出結(jié)果。
{ “CMsgs” => 1, “@version” => “1”, “PosTime” => 1467378028852, “Rcvr” => 1, “EngMount” => 0, “Tisb” => false, “Mil” => false, “Trt” => 2, “Icao” => “A0835D”, “Long” => -82.925616, “InHg” => 29.9409447, “VsiT” => 1, “ResetTrail” => true, “CallSus” => false, “@timestamp” => 2021-02-14T18:32:16.337Z, “host” => “MacBook-Pro.local”, “OpIcao” => “RPA”, “Man” => “Embraer”, “GAlt” => 2421, “TT” => “a”, “Bad” => false, “HasSig” => true, “TSecs” => 1, “Vsi” => 2176, “EngType” => 3, “Reg” => “N132HQ”, “Alt” => 2400, “Species” => 1, “FlightsCount” => 0, “WTC” => 2, “Cos” => [ [0] 39.984322, [1] -82.925616, [2] 1467378028852.0, [3] nil ],”message” => “{“Id”:10519389,”Rcvr”:1,”HasSig”:true,”Sig”:0,”Icao”:”A0835D”,”Bad”:false,”Reg”:”N132HQ”,”FSeen”:”/Date(1467378028852)/”,”TSecs”:1,”CMsgs”:1,”Alt”:2400,”GAlt”:2421,”InHg”:29.9409447,”AltT”:0,”Lat”:39.984322,”Long”:-82.925616,”PosTime”:1467378028852,”Mlat”:true,”Tisb”:false,”Spd”:135.8,”Trak”:223.2,”TrkH”:false,”Type”:”E170″,”Mdl”:”2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR”,”Man”:”Embraer”,”CNum”:”17000216″,”Op”:”REPUBLIC AIRLINE INC – INDIANAPOLIS, IN”,”OpIcao”:”RPA”,”Sqk”:””,”Vsi”:2176,”VsiT”:1,”WTC”:2,”Species”:1,”Engines”:”2″,”EngType”:3,”EngMount”:0,”Mil”:false,”Cou”:”United States”,”HasPic”:false,”Interested”:false,”FlightsCount”:0,”Gnd”:false,”SpdTyp”:0,”CallSus”:false,”ResetTrail”:true,”TT”:”a”,”Trt”:2,”Year”:”2008″,”Cos”:[39.984322,-82.925616,1467378028852.0,null]}”, “Lat” => 39.984322, “TrkH” => false, “Op” => “REPUBLIC AIRLINE INC – INDIANAPOLIS, IN”, “Engines” => “2”, “Sqk” => “”, “Id” => 10519389, “Gnd” => false, “CNum” => “17000216”, “path” => “/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json”, “Cou” => “United States”, “HasPic” => false, “FSeen” => “/Date(1467378028852)/”, “Interested” => false, “Mdl” => “2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR”, “Spd” => 135.8, “Sig” => 0, “Trak” => 223.2, “Year” => “2008”, “SpdTyp” => 0, “AltT” => 0, “Type” => “E170”, “Mlat” => true}
數(shù)據(jù)轉(zhuǎn)換
首先,讓我們從輸出中刪除path, @version, @timestamp, host和message,這些都是logstash添加的。
filter { json { source => “message” } mutate { remove_field => [“path”, “@version”, “@timestamp”, “host”, “message”] }}
mutate過濾器組件可以刪除不需要的字段。
重新運(yùn)行:
bin/logstash -f config/flightdata-logstash.conf –-config.test_and_exitbin/logstash -f config/flightdata-logstash.conf –config.reload.automatic
接下來,我們將_id設(shè)置為Id。
output { elasticsearch { hosts => [“http://localhost:9200”] index => “testflight” document_id => “%{Id}” }
我們在輸出組件中通過設(shè)置document_id來實(shí)現(xiàn)。
然而,如果你重新運(yùn)行l(wèi)ogstash,你會(huì)發(fā)現(xiàn)Id字段仍然存在。
有一個(gè)竅門,在過濾插件中把它改名為[@metadata][Id],然后在輸出中使用,@metadata字段被自動(dòng)刪除。
filter { json { source => “message” } mutate { remove_field => [“path”, “@version”, “@timestamp”, “host”, “message”] rename => { “[Id]” => “[@metadata][Id]” } }} output { elasticsearch { hosts => [“http://localhost:9200”] index => “flight-logstash” document_id => “%{[@metadata][Id]}” }…
現(xiàn)在讓我們嘗試解析日期。如果你還記得,這是我們在上一篇文章中沒有做的事情,我們需要將日期轉(zhuǎn)換為更適合人們熟悉的格式。
例如:
“FSeen” => “/Date(1467378028852)/”
需要將時(shí)間1467378028852轉(zhuǎn)化成容易閱讀的格式,并且去掉前后多余的字符串,通過gsub組件可以實(shí)現(xiàn)這項(xiàng)功能:
gsub => [ # get rid of /Date( “FSeen”, “/Date(“, “”, # get rid of )/ “FSeen”, “)/”, “” ]
這里通過gsub去掉了數(shù)據(jù)中/Date()等多余部分,輸出結(jié)果為:
“FSeen” : “1467378028852”
然后把時(shí)間戳轉(zhuǎn)換成熟悉的格式:
date { timezone => “UTC” match => [“FSeen”, “UNIX_MS”] target => “FSeen”}
UNIX_MS是UNIX時(shí)間戳,單位是毫秒。我們匹配字段FSeen并將結(jié)果存儲在同一字段中,輸出結(jié)果為:
“FSeen” : “2016-07-01T13:00:28.852Z”,
上述轉(zhuǎn)換的完整代碼如下:
mutate { gsub => [ # get rid of /Date( “FSeen”, “/Date(“, “”, # get rid of )/ “FSeen”, “)/”, “” ] }date { timezone => “UTC” match => [“FSeen”, “UNIX_MS”] target => “FSeen”}
在這部分中,我們學(xué)習(xí)了如何使用Logstash將.json航班數(shù)據(jù)批量文件導(dǎo)入到ElasticSearch中。Logstash是一個(gè)非常方便的方式,它有很多過濾器,支持很多數(shù)據(jù)類型,你只需要學(xué)習(xí)如何編寫一個(gè)配置文件就可以了!
Logstash是否適合實(shí)時(shí)數(shù)據(jù)處理?
答案是:要看情況
Logstash主要是為批處理數(shù)據(jù)而設(shè)計(jì)的,比如日志數(shù)據(jù),也許不適合處理來自傳感器的實(shí)時(shí)航班數(shù)據(jù)。
不過,你可以參考一些參考資料,這些資料描述了如何創(chuàng)建可以擴(kuò)展的Logstash部署,并使用Redis作為Logstash代理和Logstash中央服務(wù)器之間的中介,以便處理許多事件并實(shí)時(shí)處理它們。