なぜかDynamoDB StreamsのLimited Previewの利用申請が通ったので、DynamoDB Streamsを触ってみ…ようとしたが、 何をするにもAPIたたくのは不便なので、とりあえず各種ツールを作った。
ddbcli 0.4.2.beta2
https://github.com/winebarrel/ddbcli/tree/dynamodb-stream-support http://rubygems.org/gems/ddbcli/versions/0.4.2.beta2
まず、ddbcliを修正してストリームが有効なテーブルを作成できるようにした。 以下のような感じで、ストリーム付きのテーブルを作れる。
$ ddbcli --url (エンドポイント) us-east-1> CREATE TABLE `test` ( -> `id` STRING HASH -> ) read=2 write=2 stream=KEYS_ONLY; us-east-1> desc test; { "AttributeDefinitions": [ { "AttributeName": "id", "AttributeType": "S" } ], ... "StreamSpecification": { "StreamEnabled": true, "StreamViewType": "KEYS_ONLY" }, ... } // 10 rows in set (0.87 sec)
StreamViewTypeはテーブルを操作したときにストリームに流れるデータのタイプ(キーだけとか、新しいのだけとか)のよう。
dynamodb-streams-client
https://github.com/winebarrel/dynamodb-streams-client http://rubygems.org/gems/dynamodb-streams-client
こっちはストリームを受け取る方のツール。RubyのClientライブラリ+CLI。 本当はddbcliにストリーム操作用のコマンドを追加しようと思ったけど、エンドポイントが違うようなので別ツールにした。
以下のような感じで、tailのようにストリームに流れるデータを追うことができる。
$ export AWS_ACCESS_KEY_ID=... $ export AWS_SECRET_ACCESS_KEY=... $ export DYNAMODB_STREAMS_ENDPOINT=... $ for stream_id in `dynamodb-streams list_streams | jq -r '.StreamIds[]'` do dynamodb-streams describe_stream $stream_id | jq '.StreamDescription | {StreamId, TableName, StreamStatus, Shards:(.Shards[] | {ShardId, StartingSequenceNumber:.SequenceNumberRange.StartingSequenceNumber})} | select(.StreamStatus=="ENABLED")' done { "StreamId": "...", "TableName": "test", "StreamStatus": "ENABLED", "Shards": { "ShardId": "shardId-...", "StartingSequenceNumber": "..." } } ~$ dynamodb-streams get_shard_iterator (ストリームID) shardId-... AT_SEQUENCE_NUMBER --sequence-number ... { "ShardIterator": "..." } $ dynamodb-streams get_records "(シャードイテレーター)" -f --limit 10 { "NextShardIterator": "...", "Records": [ ] } { "NextShardIterator": "...", "Records": [ ] } ...
ShardIterator長い…。あと、list_streamsでテーブル名とステータスが分からないのが地味にめんどい。
他のターミナルからこんな感じでレコードを追加してやると
for i in {1..10000} do ddbcli --url ... -e "insert into test (id, val) values ('$i', 1)" done
追加されたレコードの情報がストリームに流れてくるのが分かる。
{ "NextShardIterator": "...", "Records": [ { "awsRegion": "us-east-1", "dynamodb": { "Keys": { "id": { "S": "9" } }, "SequenceNumber": "...", "SizeBytes": 3, "StreamViewType": "KEYS_ONLY" }, "eventID": "...", "eventName": "INSERT", "eventSource": "aws:dynamodb", "eventVersion": "1.0" } ] } { "NextShardIterator": "...", "Records": [ { "awsRegion": "us-east-1", "dynamodb": { "Keys": { "id": { "S": "10" } }, "SequenceNumber": "...", "SizeBytes": 4, "StreamViewType": "KEYS_ONLY" }, "eventID": "...", "eventName": "INSERT", "eventSource": "aws:dynamodb", "eventVersion": "1.0" } ] } ...
所感
- 相変わらずストリーム系のAPIの使い勝手は悪い…
- とりあえずデータを追うことはできたものの、取りこぼしがないか等厳密な動作については要検証
DynamoDB Streamsプレビュー版のDynamoDB Localでも、一応動作するはずなので、どうぞご利用ください。