ストリームを処理するfluentdプラグインを書いてみました

実験的にストリームを処理するFluentdプラグインを書いてみました。

※追記: 実装と機能を諸々修正しました

実用に耐えうるかは不明ですが、単純なストリームの処理なら書けそうだったので、書いてみたという感じです。

fluendの設定は以下の通り。

<match stream.**>
  type copy
  <store>
    type udp_stream
    #host 127.0.0.1
    #port 25000
  </store>
  <store>
    type stdout
  </store>
</match>

クライアント側のコードは以下の通り。

require 'fluent_udp_stream'
require 'pp'

# Get records of every 10 seconds
FluentUdpStream.new.unit(10).each do |records|
  pp records
end

動作例

while true; do
  echo '{"hoge":"fuga"}' | fluent-cat stream.data
done
$ ruby client.rb
[["stream.data", 1391707950, {"hoge"=>"fuga"}],
 ["stream.data", 1391707950, {"hoge"=>"fuga"}],
 ["stream.data", 1391707950, {"hoge"=>"fuga"}],
 ["stream.data", 1391707950, {"hoge"=>"fuga"}],
 ["stream.data", 1391707951, {"hoge"=>"fuga"}],
 ["stream.data", 1391707951, {"hoge"=>"fuga"}],
 ["stream.data", 1391707951, {"hoge"=>"fuga"}],
 ["stream.data", 1391707951, {"hoge"=>"fuga"}],
 ...

たとえば10sごとに投げられたレコードの数えるコードは以下のようになります。

FluentUdpStream.new.unit(10).map {|records|
  records.count
}.each do |i|
  p i
end

lazyを使っているのでいるので無応答にはなりません。

その他

別にフィルタにしなくても、copyプラグインでコピーすればいいだけかも…