ruby-binlogを使ったサンプル。
mysql2mongo.rbは以下の通り。
#!/usr/bin/env ruby require "rubygems" require "binlog" require "mongo" # カラム情報はMySQLから取得した方がよいかも… TABLE_COLUMNS = { "employees" => %w(id name age), } def insert(event, mongo) column_names = TABLE_COLUMNS[event.table_name] rows = event.rows.map do |row| alist = column_names.zip(event.columns, row).map do |name, column_type, value| if column_type == "LONG" [name, value.to_i] else [name, value] end end Hash[*alist.flatten] end db = mongo.db(event.db_name) rows.each do |row| db[event.table_name].insert(row) end end def update(event, mongo) column_names = TABLE_COLUMNS[event.table_name] rows = event.rows.map do |pair| old_row, new_row = pair alist = column_names.zip(event.columns, new_row).map do |name, column_type, value| if column_type == "LONG" [name, value.to_i] else [name, value] end end Hash[*alist.flatten] end db = mongo.db(event.db_name) rows.each do |row| row_id = row["id"] db[event.table_name].update({"id" => row_id}, row) end end def delete(event, mongo) column_names = TABLE_COLUMNS[event.table_name] rows = event.rows.map do |row| alist = column_names.zip(event.columns, row).map do |name, column_type, value| if column_type == "LONG" [name, value.to_i] else [name, value] end end Hash[*alist.flatten] end db = mongo.db(event.db_name) rows.each do |row| row_id = row["id"] db[event.table_name].remove({"id" => row_id}, row) end end # 起動時のポジションは最後尾 master_log_file = nil master_log_pos = nil mongo = Mongo::Connection.new('localhost', 27017) begin client = Binlog::Client.new("mysql://scott:tiger@localhost") sleep 0.3 until client.connect if master_log_file and master_log_pos client.set_position(master_log_file, master_log_pos) end while event = client.wait_for_next_event master_log_pos = event.next_position case event when Binlog::RowEvent case event.event_type when /Write/ insert(event, mongo) when /Update/ update(event, mongo) when /Delete/ delete(event, mongo) end when Binlog::RotateEvent master_log_file = event.binlog_file end end rescue Binlog::Error => e puts e retry if client.closed? raise e end
MySQLにはcompany.employeesを定義。
CREATE TABLE `employees` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
それから『binlog_format=row』を設定しておく。
動作確認
mysql2mongo.rbを立ち上げて、MySQLにデータを入れてみる。
mysql> insert into employees (name, age) values ('yamada', 21);
Query OK, 1 row affected (0.01 sec)mysql> insert into employees (name, age) values ('sato', 23);
Query OK, 1 row affected (0.02 sec)mysql> insert into employees (name, age) values ('suzuki', 27);
Query OK, 1 row affected (0.02 sec)mysql> insert into employees (name, age) values ('tanaka', 34);
Query OK, 1 row affected (0.02 sec)mysql> insert into employees (name, age) values ('ishikawa', 26);
Query OK, 1 row affected (0.02 sec)
MongoDBにレプリケーションされていることを確認。
MongoDB shell version: 1.8.2
connecting to: test
> use company
switched to db company
> db.employees.find()
{ "_id" : ObjectId("5041da21044ae05d59000001"), "age" : 21, "name" : "yamada", "id" : 1 }
{ "_id" : ObjectId("5041da21044ae05d59000002"), "age" : 23, "name" : "sato", "id" : 2 }
{ "_id" : ObjectId("5041da21044ae05d59000003"), "age" : 27, "name" : "suzuki", "id" : 3 }
{ "_id" : ObjectId("5041da21044ae05d59000004"), "age" : 34, "name" : "tanaka", "id" : 4 }
{ "_id" : ObjectId("5041da21044ae05d59000005"), "age" : 26, "name" : "ishikawa", "id" : 5 }