AWS SDK for RubyがKinesisに対応したので、ちょっと触ってみました。
なにはさておき…
aws-sdkを1.31.3以上にしましょう。 1.30.0はバグがあってまともに動かないです。
Kinesisについて
コンセプトとかについてはドキュメントを読めばだいたい分かると思います。たぶん
コード
ProducerとConsumerを異なるスレッドで動かして、ProducerがputしたデータをConsumerでgetするだけです。 Shardの数は1なので、Shard IDは固定。 Iterator TypeはAT_SEQUENCE_NUMBER、AFTER_SEQUENCE_NUMBER、TRIM_HORIZON、LATESTがありますが、違いについてはまだ調べてません。
#!/usr/bin/env ruby require 'aws-sdk' require 'base64' AWS.config(region: 'us-east-1') client = AWS.kinesis.client STREAM_NAME = 'hello' SHARD_ID = 'shardId-000000000000' # shard_idの取得 #res = client.describe_stream(stream_name: STREAM_NAME } #p res[:stream_description][:shards].map {|i| i[:shard_id] } # Consumer Thread.start do shard_iterator = client.get_shard_iterator( stream_name: STREAM_NAME, shard_id: SHARD_ID, shard_iterator_type: 'LATEST' )[:shard_iterator] loop do res = client.get_records(shard_iterator: shard_iterator) records = res[:records] shard_iterator = res[:next_shard_iterator] datas = records.map {|i| Base64.decode64(i[:data])} puts "[Consumer] get_records: #{datas.inspect}" sleep 0.3 end end # Producer loop do data = "London Bridge Is Falling Down / #{Time.now}" client.put_record( stream_name: STREAM_NAME, data: Base64.strict_encode64(data), partition_key: Time.now.to_i.to_s ) puts "[Producer] put_record: #{data}" sleep 1 end
実行結果
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:06 +0900 [Consumer] get_records: [] [Consumer] get_records: [] [Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:09 +0900 [Consumer] get_records: [] [Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:10 +0900 [Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:06 +0900"] [Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:09 +0900"] [Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:12 +0900 [Consumer] get_records: [] [Consumer] get_records: [] [Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:13 +0900 [Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:10 +0900", "London Bridge Is Falling Down / 2013-12-22 19:59:12 +0900"] [Consumer] get_records: [] [Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:15 +0900 [Consumer] get_records: [] [Consumer] get_records: [] [Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:16 +0900 [Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:13 +0900"]
所感
最初の頃は「結局、Consumerに負荷が来るんじゃないかなぁ…」と思ったのですが、Shardできるから並列でなんとかしろってことなんでしょうねぇ。 ただ、結局データストア作業をConsumerががんばらないといけないところが、大ざっぱというか「さすがDIYのAWSさんやでぇー」って感じですね。
おまけ
本家SDKのデバックの過程で意図せずシンプルなクライアントができたので、リンクを張っておきます。 https://gist.github.com/winebarrel/8080643