MySQLからMongoDBへレプリケーションする

ruby-binlogを使ったサンプル。

mysql2mongo.rbは以下の通り。

#!/usr/bin/env ruby
require "rubygems"
require "binlog"
require "mongo"

# カラム情報はMySQLから取得した方がよいかも…
TABLE_COLUMNS = {
  "employees" => %w(id name age),
}

def insert(event, mongo)
  column_names = TABLE_COLUMNS[event.table_name]

  rows = event.rows.map do |row|
    alist = column_names.zip(event.columns, row).map do |name, column_type, value|
      if column_type == "LONG"
        [name, value.to_i]
      else
        [name, value]
      end
    end

    Hash[*alist.flatten]
  end

  db = mongo.db(event.db_name)

  rows.each do |row|
    db[event.table_name].insert(row)
  end
end

def update(event, mongo)
  column_names = TABLE_COLUMNS[event.table_name]

  rows = event.rows.map do |pair|
    old_row, new_row = pair

    alist = column_names.zip(event.columns, new_row).map do |name, column_type, value|
      if column_type == "LONG"
        [name, value.to_i]
      else
        [name, value]
      end
    end

    Hash[*alist.flatten]
  end

  db = mongo.db(event.db_name)

  rows.each do |row|
    row_id = row["id"]
    db[event.table_name].update({"id" => row_id}, row)
  end
end

def delete(event, mongo)
  column_names = TABLE_COLUMNS[event.table_name]

  rows = event.rows.map do |row|
    alist = column_names.zip(event.columns, row).map do |name, column_type, value|
      if column_type == "LONG"
        [name, value.to_i]
      else
        [name, value]
      end
    end

    Hash[*alist.flatten]
  end

  db = mongo.db(event.db_name)

  rows.each do |row|
    row_id = row["id"]
    db[event.table_name].remove({"id" => row_id}, row)
  end
end

# 起動時のポジションは最後尾
master_log_file = nil
master_log_pos = nil

mongo = Mongo::Connection.new('localhost', 27017)

begin
  client = Binlog::Client.new("mysql://scott:tiger@localhost")
  sleep 0.3 until client.connect

  if master_log_file and master_log_pos
    client.set_position(master_log_file, master_log_pos)
  end

  while event = client.wait_for_next_event
    master_log_pos = event.next_position

    case event
    when Binlog::RowEvent
      case event.event_type
      when /Write/
        insert(event, mongo)
      when /Update/
        update(event, mongo)
      when /Delete/
        delete(event, mongo)
      end
    when Binlog::RotateEvent
      master_log_file = event.binlog_file
    end
  end
rescue Binlog::Error => e
  puts e
  retry if client.closed?
  raise e
end


MySQLにはcompany.employeesを定義。

 CREATE TABLE `employees` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

それから『binlog_format=row』を設定しておく。

動作確認

mysql2mongo.rbを立ち上げて、MySQLにデータを入れてみる。


mysql> insert into employees (name, age) values ('yamada', 21);
Query OK, 1 row affected (0.01 sec)

mysql> insert into employees (name, age) values ('sato', 23);
Query OK, 1 row affected (0.02 sec)

mysql> insert into employees (name, age) values ('suzuki', 27);
Query OK, 1 row affected (0.02 sec)

mysql> insert into employees (name, age) values ('tanaka', 34);
Query OK, 1 row affected (0.02 sec)

mysql> insert into employees (name, age) values ('ishikawa', 26);
Query OK, 1 row affected (0.02 sec)

MongoDBにレプリケーションされていることを確認。


MongoDB shell version: 1.8.2
connecting to: test
> use company
switched to db company
> db.employees.find()
{ "_id" : ObjectId("5041da21044ae05d59000001"), "age" : 21, "name" : "yamada", "id" : 1 }
{ "_id" : ObjectId("5041da21044ae05d59000002"), "age" : 23, "name" : "sato", "id" : 2 }
{ "_id" : ObjectId("5041da21044ae05d59000003"), "age" : 27, "name" : "suzuki", "id" : 3 }
{ "_id" : ObjectId("5041da21044ae05d59000004"), "age" : 34, "name" : "tanaka", "id" : 4 }
{ "_id" : ObjectId("5041da21044ae05d59000005"), "age" : 26, "name" : "ishikawa", "id" : 5 }