DynamoDB Streamsを触ってみた / ツール作った

なぜかDynamoDB StreamsのLimited Previewの利用申請が通ったので、DynamoDB Streamsを触ってみ…ようとしたが、 何をするにもAPIたたくのは不便なので、とりあえず各種ツールを作った。

ddbcli 0.4.2.beta2

https://github.com/winebarrel/ddbcli/tree/dynamodb-stream-support http://rubygems.org/gems/ddbcli/versions/0.4.2.beta2

まず、ddbcliを修正してストリームが有効なテーブルを作成できるようにした。 以下のような感じで、ストリーム付きのテーブルを作れる。

$ ddbcli --url (エンドポイント)
us-east-1> CREATE TABLE `test` (
        ->   `id` STRING HASH
        -> ) read=2 write=2 stream=KEYS_ONLY;
us-east-1> desc test;
{
  "AttributeDefinitions": [
    {
      "AttributeName": "id",
      "AttributeType": "S"
    }
  ],
  ...
  "StreamSpecification": {
    "StreamEnabled": true,
    "StreamViewType": "KEYS_ONLY"
  },
  ...
}
// 10 rows in set (0.87 sec)

StreamViewTypeはテーブルを操作したときにストリームに流れるデータのタイプ(キーだけとか、新しいのだけとか)のよう。

dynamodb-streams-client

https://github.com/winebarrel/dynamodb-streams-client http://rubygems.org/gems/dynamodb-streams-client

こっちはストリームを受け取る方のツールRubyのClientライブラリ+CLI。 本当はddbcliにストリーム操作用のコマンドを追加しようと思ったけど、エンドポイントが違うようなので別ツールにした。

以下のような感じで、tailのようにストリームに流れるデータを追うことができる。

$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export DYNAMODB_STREAMS_ENDPOINT=...
$ for stream_id in `dynamodb-streams list_streams | jq -r '.StreamIds[]'`
do dynamodb-streams describe_stream $stream_id | jq '.StreamDescription | {StreamId, TableName, StreamStatus, Shards:(.Shards[] | {ShardId, StartingSequenceNumber:.SequenceNumberRange.StartingSequenceNumber})} | select(.StreamStatus=="ENABLED")'
done
{
  "StreamId": "...",
  "TableName": "test",
  "StreamStatus": "ENABLED",
  "Shards": {
    "ShardId": "shardId-...",
    "StartingSequenceNumber": "..."
  }
}
~$ dynamodb-streams get_shard_iterator (ストリームID) shardId-... AT_SEQUENCE_NUMBER --sequence-number ...
{
  "ShardIterator": "..."
}

$ dynamodb-streams get_records "(シャードイテレーター)" -f --limit 10
{
  "NextShardIterator": "...",
  "Records": [

  ]
}
{
  "NextShardIterator": "...",
  "Records": [

  ]
}
...

ShardIterator長い…。あと、list_streamsでテーブル名とステータスが分からないのが地味にめんどい。

他のターミナルからこんな感じでレコードを追加してやると

for i in {1..10000}
do
ddbcli --url ... -e "insert into test (id, val) values ('$i', 1)"
done

追加されたレコードの情報がストリームに流れてくるのが分かる。

{
  "NextShardIterator": "...",
  "Records": [
    {
      "awsRegion": "us-east-1",
      "dynamodb": {
        "Keys": {
          "id": {
            "S": "9"
          }
        },
        "SequenceNumber": "...",
        "SizeBytes": 3,
        "StreamViewType": "KEYS_ONLY"
      },
      "eventID": "...",
      "eventName": "INSERT",
      "eventSource": "aws:dynamodb",
      "eventVersion": "1.0"
    }
  ]
}
{
  "NextShardIterator": "...",
  "Records": [
    {
      "awsRegion": "us-east-1",
      "dynamodb": {
        "Keys": {
          "id": {
            "S": "10"
          }
        },
        "SequenceNumber": "...",
        "SizeBytes": 4,
        "StreamViewType": "KEYS_ONLY"
      },
      "eventID": "...",
      "eventName": "INSERT",
      "eventSource": "aws:dynamodb",
      "eventVersion": "1.0"
    }
  ]
}
...

所感

  • 相変わらずストリーム系のAPIの使い勝手は悪い…
  • とりあえずデータを追うことはできたものの、取りこぼしがないか等厳密な動作については要検証

DynamoDB Streamsプレビュー版のDynamoDB Localでも、一応動作するはずなので、どうぞご利用ください。