読者です 読者をやめる 読者になる 読者になる

RubyでKinesisを使ってみた

AWS SDK for RubyKinesisに対応したので、ちょっと触ってみました。

なにはさておき…

aws-sdkを1.31.3以上にしましょう。 1.30.0はバグがあってまともに動かないです。

Kinesisについて

コンセプトとかについてはドキュメントを読めばだいたい分かると思います。たぶん

コード

ProducerとConsumerを異なるスレッドで動かして、ProducerがputしたデータをConsumerでgetするだけです。 Shardの数は1なので、Shard IDは固定。 Iterator TypeはAT_SEQUENCE_NUMBER、AFTER_SEQUENCE_NUMBER、TRIM_HORIZON、LATESTがありますが、違いについてはまだ調べてません。

#!/usr/bin/env ruby
require 'aws-sdk'
require 'base64'

AWS.config(region: 'us-east-1')
client = AWS.kinesis.client
STREAM_NAME = 'hello'
SHARD_ID = 'shardId-000000000000'

# shard_idの取得
#res = client.describe_stream(stream_name: STREAM_NAME }
#p res[:stream_description][:shards].map {|i| i[:shard_id] }

# Consumer
Thread.start do
  shard_iterator = client.get_shard_iterator(
    stream_name: STREAM_NAME,
    shard_id: SHARD_ID,
    shard_iterator_type: 'LATEST'
  )[:shard_iterator]

  loop do
    res = client.get_records(shard_iterator: shard_iterator)
    records = res[:records]
    shard_iterator = res[:next_shard_iterator]

    datas = records.map {|i| Base64.decode64(i[:data])}

    puts "[Consumer] get_records: #{datas.inspect}"
    sleep 0.3
  end
end

# Producer
loop do
  data = "London Bridge Is Falling Down / #{Time.now}"

  client.put_record(
    stream_name: STREAM_NAME,
    data: Base64.strict_encode64(data),
    partition_key: Time.now.to_i.to_s
  )

  puts "[Producer] put_record: #{data}"

  sleep 1
end

実行結果

[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:06 +0900
[Consumer] get_records: []
[Consumer] get_records: []
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:09 +0900
[Consumer] get_records: []
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:10 +0900
[Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:06 +0900"]
[Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:09 +0900"]
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:12 +0900
[Consumer] get_records: []
[Consumer] get_records: []
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:13 +0900
[Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:10 +0900", "London Bridge Is Falling Down / 2013-12-22 19:59:12 +0900"]
[Consumer] get_records: []
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:15 +0900
[Consumer] get_records: []
[Consumer] get_records: []
[Producer] put_record: London Bridge Is Falling Down / 2013-12-22 19:59:16 +0900
[Consumer] get_records: ["London Bridge Is Falling Down / 2013-12-22 19:59:13 +0900"]

所感

最初の頃は「結局、Consumerに負荷が来るんじゃないかなぁ…」と思ったのですが、Shardできるから並列でなんとかしろってことなんでしょうねぇ。 ただ、結局データストア作業をConsumerががんばらないといけないところが、大ざっぱというか「さすがDIYのAWSさんやでぇー」って感じですね。

おまけ

本家SDKのデバックの過程で意図せずシンプルなクライアントができたので、リンクを張っておきます。 https://gist.github.com/winebarrel/8080643