先日に引き続き、fluent-plugin-unit-time-filterというfluentdのプラグインを書きました。 流れてきたデータを単位時間ごとに集計するためのプラグインです。
使い方
fluentdの設定は以下のような感じ。
<match my.**> type unit_time_filter filter_path /foo/bar/my_filrer.rb unit_sec 10 #prefix filtered </match> <match filtered.my.**> type stdout </match>
filter_pathで参照しているフィルタではレコードを集約するためのProcオブジェクトを返します。 単位時間ごとのレコードに集約するなら以下の通り。
proc {|i| {'count' => i.count } }
while true; do echo '{"hoge":"fuga"}' | fluent-cat my.data; done
みたいにして連続したデータを送信すると、フィルタには単位時間にまとまったレコードが渡されます。
# 10sごとのレコードの配列が引数「i」に渡される [["my.data", 1391820170, {"hoge"=>"fuga"}], ["my.data", 1391820170, {"hoge"=>"fuga"}], ["my.data", 1391820170, {"hoge"=>"fuga"}], ["my.data", 1391820170, {"hoge"=>"fuga"}], ["my.data", 1391820171, {"hoge"=>"fuga"}], ...
集約されたデータは、タグにfiltered.
というプリフィックスを次のチェインに渡されます。
上記の例だと<match filtered.my.**>
ではフィルタで集約したデータが出力されます。
2014-02-08 09:45:20 +0900 filtered.my.data: {"count":42} 2014-02-08 09:45:30 +0900 filtered.my.data: {"count":43} 2014-02-08 09:45:40 +0900 filtered.my.data: {"count":41} ...