読者です 読者をやめる 読者になる 読者になる

fluent-plugin-kinesis-alt というのを作りました

勉強もかねてfluent-plugin-kinesis-altという、fluentdのプラグインを書いてみました。 ちょうあるふぁばんです。

2013/12/24 追記

fluent-plugin-kinesisにマージできないかお伺いを立ててます https://github.com/imaifactory/fluent-plugin-kinesis/issues/4

背景とか

実はすでにプラグインはあったりするんですが、ちょっといくつか微妙に思うところがあって変更の量が多くなりそうだったので、コントリビュートする代わりに自作しました。 主な変更点は以下の通りです。

  • AWS SDK for Rubyの使用
  • AWSの鍵の設定名をS3のプラグインに合わせて aws_key_id/aws_sec_key に変更
  • BufferedOutputの使用
  • Rubyのコードで動的にPartitionKeyを設定できるように

使い方

設定ファイルは以下のような感じです。

<source>
  type forward
</source>

<match kinesis.**>
  type kinesis_alt
  stream_name hello
  aws_key_id ...
  aws_sec_key ...
  region us-east-1
  partition_key_proc proc {|i| Time.now.to_i.to_s }
  flush_interval 3
  debug true
</match>

こんな感じのConsumerを書いて、fluent-catなどでデータを突っ込んでみるとstreamにデータが投げられていることが確認できます。

while true; do
  echo '{"hoge":"fuga"}' | fluent-cat kinesis.forward
  sleep 0.1
done
[Consumer] get_records: ["{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805437}"]
[Consumer] get_records: ["{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805438}", "{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805438}", "{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805438}"]
[Consumer] get_records: ["{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805438}", "{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805439}"]
[Consumer] get_records: []
[Consumer] get_records: ["{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805439}", "{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805439}"]
[Consumer] get_records: ["{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805440}", "{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805440}"]
[Consumer] get_records: ["{\"hoge\":\"fuga\",\"__tag\":\"kinesis.forward\",\"__time\":1387805441}", "{\"hoge\":\"fuga\",\"__ta

また、「debug true」にしておくとfluentd側にいかのようなデバッグログが出力されます。

D, [2013-12-23T22:31:02.024826 #53186] DEBUG -- : [AWS Core 200 0.469973 0 retries] put_record("data"=>"eyJob2dlIjoiZnVnYSIsIl9fdGFnIjoia2luZXNpcy5mb3J3YXJkIiwiX190aW1lIjoxMzg3ODA1NDQ4fQ==","partition_key"=>"1387805448","stream_name"=>"hello")

D, [2013-12-23T22:31:02.501342 #53186] DEBUG -- : [AWS Core 200 0.475558 0 retries] put_record("data"=>"eyJob2dlIjoiZnVnYSIsIl9fdGFnIjoia2luZXNpcy5mb3J3YXJkIiwiX190aW1lIjoxMzg3ODA1NDQ4fQ==","partition_key"=>"1387805448","stream_name"=>"hello")

D, [2013-12-23T22:31:02.982184 #53186] DEBUG -- : [AWS Core 200 0.479238 0 retries] put_record("data"=>"eyJob2dlIjoiZnVnYSIsIl9fdGFnIjoia2luZXNpcy5mb3J3YXJkIiwiX190aW1lIjoxMzg3ODA1NDQ4fQ==","partition_key"=>"1387805448","stream_name"=>"hello")

所感

厳密にはかったわけではないですがBatchPutRecordみたいなActionがないとパフォーマンスが厳しそうです。 上記の例でもバッファにたまりまくってましたし。たぶんそのうち追加されるような気がします。

プラグインでは普通にaws-sdkを使おうとして固まるという症状がありました。どうも動的にメソッドを定義する時に問題になっているみたいですが詳細は分からず…。startでdescribe_streamを実行しているのはバグ回避の意味合いもあります。

あと、format<=> writeのデータの受け渡しにMessagePack使っていますが、なんか無駄っぽいきがしてます。もうしこしまともなやり方がありそうですが…