実験的にストリームを処理するFluentdプラグインを書いてみました。
fluent-plugin-stream-filterfluent_stream- fluent-plugin-udp-stream
- fluent_udp_stream
※追記: 実装と機能を諸々修正しました
実用に耐えうるかは不明ですが、単純なストリームの処理なら書けそうだったので、書いてみたという感じです。
fluendの設定は以下の通り。
<match stream.**> type copy <store> type udp_stream #host 127.0.0.1 #port 25000 </store> <store> type stdout </store> </match>
クライアント側のコードは以下の通り。
require 'fluent_udp_stream' require 'pp' # Get records of every 10 seconds FluentUdpStream.new.unit(10).each do |records| pp records end
動作例
while true; do echo '{"hoge":"fuga"}' | fluent-cat stream.data done
$ ruby client.rb [["stream.data", 1391707950, {"hoge"=>"fuga"}], ["stream.data", 1391707950, {"hoge"=>"fuga"}], ["stream.data", 1391707950, {"hoge"=>"fuga"}], ["stream.data", 1391707950, {"hoge"=>"fuga"}], ["stream.data", 1391707951, {"hoge"=>"fuga"}], ["stream.data", 1391707951, {"hoge"=>"fuga"}], ["stream.data", 1391707951, {"hoge"=>"fuga"}], ["stream.data", 1391707951, {"hoge"=>"fuga"}], ...
たとえば10sごとに投げられたレコードの数えるコードは以下のようになります。
FluentUdpStream.new.unit(10).map {|records| records.count }.each do |i| p i end
lazyを使っているのでいるので無応答にはなりません。
その他
別にフィルタにしなくても、copyプラグインでコピーすればいいだけかも…