fluent-plugin-unit-time-filter

先日に引き続き、fluent-plugin-unit-time-filterというfluentdのプラグインを書きました。 流れてきたデータを単位時間ごとに集計するためのプラグインです。

使い方

fluentdの設定は以下のような感じ。

<match my.**>
  type unit_time_filter
  filter_path /foo/bar/my_filrer.rb
  unit_sec 10
  #prefix filtered
</match>

<match filtered.my.**>
  type stdout
</match>

filter_pathで参照しているフィルタではレコードを集約するためのProcオブジェクトを返します。 単位時間ごとのレコードに集約するなら以下の通り。

proc {|i|
  {'count' => i.count }
}

while true; do echo '{"hoge":"fuga"}' | fluent-cat my.data; done みたいにして連続したデータを送信すると、フィルタには単位時間にまとまったレコードが渡されます。

# 10sごとのレコードの配列が引数「i」に渡される
[["my.data", 1391820170, {"hoge"=>"fuga"}],
 ["my.data", 1391820170, {"hoge"=>"fuga"}],
 ["my.data", 1391820170, {"hoge"=>"fuga"}],
 ["my.data", 1391820170, {"hoge"=>"fuga"}],
 ["my.data", 1391820171, {"hoge"=>"fuga"}],
...

集約されたデータは、タグにfiltered.というプリフィックスを次のチェインに渡されます。 上記の例だと<match filtered.my.**>ではフィルタで集約したデータが出力されます。

2014-02-08 09:45:20 +0900 filtered.my.data: {"count":42}
2014-02-08 09:45:30 +0900 filtered.my.data: {"count":43}
2014-02-08 09:45:40 +0900 filtered.my.data: {"count":41}
...

misc.

  • Filberを使っているけど大丈夫だろうか…
  • fluentdのこの手のプラグインではタグ名を変えて次のチェインに渡すのが定石…ぽい
    • スタンダードなやり方なのか別のやり方があるのか不明
  • プラグインの並列処理具合がよく分からなかった。別のプラグインではインスタンス変数に対する操作をクリティカルセッションにしているものもあったけれど、並列で処理されるんだろうか?
    • 並列で処理されるとしたら、集約の方法も考えないと…
  • タグの書き換えで、一つのfluentd内でMapReduce的なことができるなぁ、とアホなことを考えたが未検証