ProxySQLをつかったRDSの切り替え

ProxySQLとは

ProxySQLはMySQL用のL7のプロキシサーバで、プロキシサーバのレイヤでR/W Splittingできたり、クエリの書き換えをできたり、負荷分散などができたりする便利ミドルウェアです。

www.proxysql.com

Dropboxの中の人が書いているみたいで、Perconaの推しミドルウェアみたいです。(開発にも関わっているのかな?)

あとQiitaにもいくつか記事が上がってます。

https://qiita.com/search?q=ProxySQL

設定の管理が結構独特で、MySQLっぽく振る舞うsqliteで管理されていて、動的にバックエンドのサーバを書き換えたりすることができます。設定まわりの概念的なものは『ProxySQL触ってみた - Qiita』がわかりやすいかも。

動的なバックエンドの切り替え

管理用インターフェースに対して、以下のようなクエリを流すと、バックエンドが瞬間的に切り替わります。

# 元の設定
$ mysql -uadmin -p -S /tmp/proxysql_admin.sock
mysql> select * from runtime_mysql_servers;
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
| hostgroup_id | hostname                                           | port | status | weight | compression | max_connections | max_replication_lag | use_ssl | max_latency_ms | comment |
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
| 0            | mydb.xxx.ap-northeast-1.rds.amazonaws.com | 3306 | ONLINE | 1      | 0           | 10000           | 0                   | 0       | 0              |         |
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
delete from mysql_servers;
insert into mysql_servers (hostname, status, max_connections)
values
('mydb.xxx.ap-northeast-1.rds.amazonaws.com', 'offline_hard', 10000),
('mydb2.xxx.ap-northeast-1.rds.amazonaws.com', 'online', 10000);

load mysql servers to runtime; /* ここで実際の設定が書き換わる */

RDSの瞬間的な切り替え

RDSのフェイルオーバーは通常、1〜2分で完了し、フェイルオーバー後はエンドポイントのIPアドレスが変わります。大抵のユースケースでは問題ないと思うんですが、「瞬間的な切り替えにしたい」とか「DNSTTLに依存したくない」等というときに、間にProxySQLを挟むことで(+別クラスタのスレーブを用意することで)瞬間的にフェイルオーバーすることができます。*1

サーバ構成

f:id:winebarrel:20171230123633p:plain

上図のmydb2mydbのリードレプリカ、という訳ではなくてスナップショットからリストアしたRDSをmysql.rds_set_external_masterレプリケーションしたものです。

クラスタのスレーブは以下の手順で作ります。

  1. mydbのリードレプリカを作る
  2. リードレプリカのレプリケーションを止める
  3. レプリケーションのポジションを記録する
  4. リードレプリカのスナップショットをとる
  5. スナップショットからリストアしてmydb2を作る
  6. 記録したポジションで、mydb2mysql.rds_set_external_masterを実行する

リードレプリカをpromoteしてmysql.rds_set_external_masterを実行すれば良さそう、、、な感じなんですが、promoteしたリードレプリカでmysql.rds_set_external_masterを実行すると、マスタのホスト名が172...と内部のIPにすり替わる、という現象が発生したのでやらない方がよいかと。mysql.rds_set_external_masterの実行前にmysql.rds_reset_external_masterが必要だったりするのを見るに、元のクラスタの情報がきちんと初期化されないのかも。

切り替えスクリプト

#!/usr/bin/env ruby
require 'logger'
require 'mysql2'
require 'open3'
require 'optparse'

class MySQL
  def initialize(host:, port:, user:, pass:, dry_run:, logger:)
    @client = Mysql2::Client.new(host: host, port: port.to_i, username: user, password: pass)
    @host = host
    @dry_run = dry_run
    @logger = logger
  end

  def enable_log_bin
    query('SET SQL_LOG_BIN = 1', force: true)
  end

  def disable_log_bin
    query('SET SQL_LOG_BIN = 0', force: true)

    if block_given?
      begin
        yield
      ensure
        begin
          enable_log_bin
        rescue => e
          @logger.warn(e.message)
        end
      end
    end
  end

  def enable_read_only
    query('SET GLOBAL READ_ONLY = 1')
  end

  def disable_read_only
    query('SET GLOBAL READ_ONLY = 0')
  end

  def read_only?
    query('SELECT @@read_only AS read_only').first.fetch('read_only').nonzero?
  end

  def enable_read_only_and_check
    enable_read_only

    if !@dry_run && !read_only?
      raise "[#{@host}] Failed to enable read only!"
    end
  end

  def find_users(user)
    query("SELECT user, host FROM mysql.user where user = '#{user}'", force: true).map {|row|
      "'#{row.fetch('user')}'@'#{row.fetch('host')}'"
    }.uniq
  end

  def drop_user(user)
    users = find_users(user)

    if users.empty?
      raise "Cannot find drop user: #{user}"
    end

    grants = users.flat_map do |user|
      privs = show_grants(user)
      query("DROP USER #{user}")
      privs
    end

    query('FLUSH PRIVILEGES')
    grants
  end

  def show_grants(user)
    query("SHOW GRANTS FOR #{user}", force: true).flat_map(&:values)
  end

  def create_user(grants, password:)
    grants.each do |sql|
      sql.sub!(/IDENTIFIED\b.*/, '')
      sql << " IDENTIFIED BY '#{password}'"
      query(sql)
    end
    query('FLUSH PRIVILEGES')
  end

  def while_threads(time_until, tstart: Time.now, user: nil)
    threads = get_threads_util(user: user)

    while time_until > 0 && threads.length > 0
      if (time_until % 5 ).zero?
        @logger.info 'Waiting all running %d threads are disconnected.. (max %d milliseconds)' % [threads.length + 1, time_until * 100]

        if threads.length < 5
          threads.each {|th| @logger.info th.inspect }
        end
      end

      sleep_until(tstart)
      tstart = Time.now
      time_until -= 1

      threads = get_threads_util(user: user)
    end

    return [threads, tstart]
  end

  def kill_threads(threads)
    threads.each do |th|
      begin
        thread_id = th.fetch('Id')
        @logger.info("KILL #{th}")
        query("CALL mysql.rds_kill(#{thread_id})")
      rescue Mysql2::Error => e
        raise e if e.error_number != 1094 # MYSQL_UNKNOWN_TID
      end
    end
  end

  def show_master_status
    query('SHOW MASTER STATUS', force: true, hidden: true).first
  end

  def show_slave_status
    query('SHOW SLAVE STATUS', force: true, hidden: true).first
  end

  def wait_replication(master, interval: 0.1)
    master_log_file, master_log_pos, slave_log_file, slave_log_pos = update_repl_info(master)

    @logger.info 'Waiting replication..'
    @logger.info "master: file=#{master_log_file} pos=#{master_log_pos}"
    @logger.info "slave: file=#{slave_log_file} pos=#{slave_log_pos}"

    #return if @dry_run

    loop do
      if master_log_file == slave_log_file && master_log_pos == slave_log_pos
        @logger.info 'Caught up!'
        @logger.info "master: file=#{master_log_file} pos=#{master_log_pos}"
        @logger.info "slave: file=#{slave_log_file} pos=#{slave_log_pos}"
        break
      end

      sleep interval
      master_log_file, master_log_pos, slave_log_file, slave_log_pos = update_repl_info(master)
      #_, _, slave_log_file, slave_log_pos = update_repl_info(master)
    end
  end

  private

  def query(sql, force: false, hidden: false)
    unless hidden
      log_msg = "[#{@host}] #{sql}"
      log_msg << ' (dry-run)' if @dry_run
      @logger.info log_msg
    end

    if force || !@dry_run
      @client.query(sql)
    end
  end

  def get_threads_util(running_time_threshold: 0, type: 0, user: nil)
    my_connection_id = @client.thread_id
    threads = []

    query('SHOW PROCESSLIST', force: true, hidden: true).each do |row|
      th_id         = row.fetch('Id')
      th_user       = row.fetch('User')
      th_host       = row.fetch('Host')
      th_command    = row.fetch('Command')
      th_state      = row.fetch('State')
      th_query_time = row.fetch('Time')
      th_info       = row.fetch('Info')

      th_info.sub!(/\A\s*(.*?)\s*\Z/) { $1 } if th_info

      next if my_connection_id == th_id
      next if th_query_time && th_query_time < running_time_threshold
      next if th_command && th_command == 'Binlog Dump'
      next if th_user && th_user == 'system user'
      next if user && th_user != user

      unless user
        if th_command && th_command == 'Sleep' && th_query_time && th_query_time >= 1
          next
        end
      end

      if type >= 1
        next if th_command && th_command == 'Sleep'
        next if th_command && th_command == 'Connect'
      end

      if type >= 2
        next if th_info && th_info =~ /\Aselect/i
        next if th_info && th_info =~ /\Ashow/i
      end

      threads << row
    end

    return threads
  end

  def sleep_until(tstart, running_interval: 0.1)
    elapsed = Time.now - tstart

    if running_interval > elapsed
      sleep(running_interval - elapsed)
    end
  end

  def update_repl_info(master)
    master_status = master.show_master_status
    slave_status = show_slave_status
    master_log_file = master_status.fetch('File')
    master_log_pos = master_status.fetch('Position')
    slave_log_file = slave_status.fetch('Relay_Master_Log_File')
    slave_log_pos = slave_status.fetch('Exec_Master_Log_Pos')
    [master_log_file, master_log_pos, slave_log_file, slave_log_pos]
  end
end

def parse_options(argv)
  options = argv.getopts(
    '',
    'orig-host:',
    'orig-port:',
    'orig-user:',
    'orig-pass:',
    'new-host:',
    'new-port:',
    'new-user:',
    'new-pass:',
    'drop-user:',
    'drop-user-pass:',
    'failover:',
    'execute'
  )

  if options.values.any?(&:nil?)
    raise "not enough options: #{options}"
  end

  Hash[options.map {|k, v| [k.tr(?-, ?_).to_sym, v] }]
end

def failover(script, logger, dry_run:)
  logger.info 'Failing over..'
  logger.info "Execute #{script}"

  unless dry_run
    out, err, status = Open3.capture3(script)
    logger.info out unless out.empty?
    logger.error err unless err.empty?
    raise err unless status.success?
  end
end

def main(argv)
  logger = Logger.new($stdout)
  options = parse_options(argv)

  if options.fetch(:execute)
    logger.info '!!! execute mode !!!'
  else
    logger.info '*** dry-run mode ***'
  end

  orig_mysql = MySQL.new(
    host: options.fetch(:orig_host),
    port: options.fetch(:orig_port),
    user: options.fetch(:orig_user),
    pass: options.fetch(:orig_pass),
    dry_run: !options.fetch(:execute),
    logger: logger)

  new_mysql = MySQL.new(
    host: options.fetch(:new_host),
    port: options.fetch(:new_port),
    user: options.fetch(:new_user),
    pass: options.fetch(:new_pass),
    dry_run: !options.fetch(:execute),
    logger: logger)

  #logger.info 'Set read_only on the new master..'
  #new_mysql.enable_read_only_and_check

  #orig_mysql.disable_log_bin do
    drop_user = options.fetch(:drop_user)
    grants = orig_mysql.drop_user(drop_user)
    threads, tstart = orig_mysql.while_threads(15, user: drop_user)
    #orig_mysql.enable_read_only_and_check
    threads, tstart = orig_mysql.while_threads(5, tstart: tstart, user: drop_user)
    logger.info 'Killing all application threads..'
    orig_mysql.kill_threads(threads) if threads.length > 0
  #end

  sleep 1
  new_mysql.wait_replication(orig_mysql)

  new_mysql.create_user(grants, password: options.fetch(:drop_user_pass))

  failover(options.fetch(:failover), logger, dry_run: !options.fetch(:execute))

  #new_mysql.disable_log_bin do
  #  logger.info 'Set read_only=0 on the new master.'
  #  new_mysql.disable_read_only
  #end
end

main(ARGV)

MHA用スクリプトの雑な流用です。 以下のような感じで使います。

#!/bin/bash
ORIG=mydb.xxx.ap-northeast-1.rds.amazonaws.com
NEW=mydb2.xxx.ap-northeast-1.rds.amazonaws.com
MYSQL_USER=root
MYSQL_PASS='...'

./mysql-online-switch.rb \
  --orig-host=$ORIG --orig-port=3306 --orig-user=$MYSQL_USER --orig-pass="$MYSQL_PASS" \
   --new-host=$NEW  --new-port=3306  --new-user=$MYSQL_USER  --new-pass="$MYSQL_PASS" \
  --drop-user=app_mysql_user --drop-user-pass='...' --failover ./switch.sh  #--execute

--executeオプションをつけないとdry-runモードで起動します

やっていることは

  1. RDSのアプリ用ユーザを削除する
  2. 当該ユーザの既存セッションをKILLする
  3. レプリケーションが追いつくのを待つ
  4. 移行先に新しくアプリ用ユーザを作る(レプリケーションで削除されるので)
  5. 切り替えスクリプトを実行

切り替えスクリプトは前述のSQLを実行するだけのものです。

#!/bin/bash
cat <<SQL | mysql -S /tmp/proxysql_admin.sock -u admin -pPASSWORD 2> /dev/null
delete from mysql_servers;
insert into mysql_servers (hostname, status, max_connections)
values
('mydb.xxx.ap-northeast-1.rds.amazonaws.com', 'offline_hard', 10000),
('mydb2.xxx.ap-northeast-1.rds.amazonaws.com', 'online', 10000);
load mysql servers to runtime;
SQL

実行結果

感じで切り替わります。

$ ./mysql-online-switch.rb ...
I, [2017-10-01T11:30:04.277194 #23953]  INFO -- : !!! execute mode !!!
I, [2017-10-01T11:30:04.365187 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] SELECT user, host FROM mysql.user where user = 'app_mysql_user'
I, [2017-10-01T11:30:04.365557 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] DROP USER 'app_mysql_user'@'%'
I, [2017-10-01T11:30:04.365594 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] FLUSH PRIVILEGES
I, [2017-10-01T11:30:04.377357 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 1500 milliseconds)
I, [2017-10-01T11:30:04.876041 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 1000 milliseconds)
I, [2017-10-01T11:30:05.377405 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 500 milliseconds)
I, [2017-10-01T11:30:05.887999 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 500 milliseconds)
I, [2017-10-01T11:30:06.379264 #23953]  INFO -- : Killing all application threads..
I, [2017-10-01T11:30:06.379462 #23953]  INFO -- : KILL {"Id"=>72515760, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:57520", "db"=>"mydb", "Command"=>"Sleep", "Time"=>401, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379504 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72515760)
I, [2017-10-01T11:30:06.379632 #23953]  INFO -- : KILL {"Id"=>72517447, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:57536", "db"=>"mydb", "Command"=>"Sleep", "Time"=>38, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379668 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72517447)
I, [2017-10-01T11:30:06.379754 #23953]  INFO -- : KILL {"Id"=>72523822, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:58008", "db"=>"mydb", "Command"=>"Sleep", "Time"=>48, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379790 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72523822)
...
I, [2017-10-01T11:30:07.402341 #23953]  INFO -- : Waiting replication..
I, [2017-10-01T11:30:07.402428 #23953]  INFO -- : master: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402455 #23953]  INFO -- : slave: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402476 #23953]  INFO -- : Caught up!
I, [2017-10-01T11:30:07.402511 #23953]  INFO -- : master: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402561 #23953]  INFO -- : slave: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402605 #23953]  INFO -- : [mydb2.xxx.ap-northeast-1.rds.amazonaws.com] GRANT USAGE ON *.* TO `app_mysql_user` IDENTIFIED BT '...'
...
I, [2017-10-01T11:30:07.402605 #23953]  INFO -- : Failing over..
I, [2017-10-01T11:30:07.402653 #23953]  INFO -- : Execute ./switch.sh

まとめ

MHA便利。 ProxySQL便利。

OFFLINE_SOFTについて

OFFLINE_HARDに対してOFFLINE_SOFTというステータスがあります。 これは、アクティブなトランザクションとコネクションはそのままで、新しいトラフィックは新しい別のバックエンドに送信するというものです。

コネクション張りっぱなしでも、一応、gracefulに新しいバックエンドに切り替えてくれるのですが

という感じで、トランザクションはさておき、セッションを操作すると、当たり前ですがバックエンドの切り替えは行われないようでした。 RailsでこのOFFLINE_SOFTを無理に活用しようと思うと、接続時にセッション変数を書いたり読んだりしているところを潰して、全部グローバル変数とかmy.cnfに設定する…とか? リクエストごとにコネクション切るような設計でないとあんまり活用できない気がしますね。

*1:もちろんクライアント側の再接続などのケアは必要ですが