AWS Lambda Go without Go

とある勉強会用にLTネタを作っていたのですが、発表できなくなったので腐る前にブログに書いておきます。

お前は何を(ry

記事のタイトルについて お前は何を言っているんだ なのですが、元ネタは以下のツイートです。

LTのネタになりそうだったのでやってみたわけです。

aws-lambda-goについて

じゃあ、まあaws-lambda-goは一体どういう仕組みで動いているんだろうと、ソースを読んでみました。

で、entry.gofunction.goあたりでだいたい分かりましたが、net/rpcパッケージをつかってFunction#InvokeFunction#Pingを呼んでいる感じでした。

ミニマムなハンドラ

必要なrpcのメソッドさえ実装すればaws-lambda-goがなくても動きます。 いろいろと削ってみて、ほぼ最小のコードは以下のようになりました。

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net"
    "net/rpc"
    "os"
)

type PingRequest struct {
}

type PingResponse struct {
}

type Function struct {
    // handler lambdaHandler
}

type InvokeRequest struct {
    Payload []byte
    //RequestId             string
    //XAmznTraceId          string
    //Deadline              InvokeRequest_Timestamp
    //InvokedFunctionArn    string
    //CognitoIdentityId     string
    //CognitoIdentityPoolId string
    //ClientContext         []byte
}

type InvokeResponse struct {
    Payload []byte
    //Error   *InvokeResponse_Error
}

func (fn *Function) Ping(req *PingRequest, res *PingResponse) (err error) {
    *res = PingResponse{}
    return
}

func (fn *Function) Invoke(req *InvokeRequest, response *InvokeResponse) error {
    response.Payload, _ = json.Marshal(100)
    return nil
}

func main() {
    port := os.Getenv("_LAMBDA_SERVER_PORT")

    l, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", port))

    if err != nil {
        log.Fatal(err)
    }

    f := new(Function)
    rpc.Register(f)
    rpc.Accept(l)
    log.Fatal("accept should not have returned")
}

構造体のメンバを結構削っても、一応動くんですよね…

ローカルでハンドラを動かす

ハンドラはrpcのメソッドを呼ばれてるだけなので、ローカルから実行することもできます。

  • ハンドラ
package main

import (
    "github.com/aws/aws-lambda-go/lambda"
)

func hello(event interface{}) (int, error) {
    return 1, nil
}

func main() {
    lambda.Start(hello)
}
  • ローカル実行用のクライアント
package main

import (
    "fmt"
    "github.com/aws/aws-lambda-go/lambda/messages"
    "log"
    "net/rpc"
)


func ping(client *rpc.Client) {
    req := &messages.PingRequest{}
    var res *messages.PingResponse

    err := client.Call("Function.Ping", req, &res)

    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Ping: %v\n", *res)
}

func invoke(client *rpc.Client) {
    req := &messages.InvokeRequest{Payload: []byte("{\"foo\":100}")}
    res := messages.InvokeResponse{}

    err := client.Call("Function.Invoke", req, &res)

    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Invoke: %v\n", string(res.Payload))
}

func main() {
    client, err := rpc.Dial("tcp", "localhost:1234")

    if err != nil {
        log.Fatal("dialing:", err)
    }

    ping(client)
    invoke(client)
}

実行はこんな感じで

_LAMBDA_SERVER_PORT=1234 ./hello
./client
Ping: {}
Invoke: 1

RustのハンドラをAWS Lambda Goで動かす

それで本題なのですが、net/rpcパッケージはGoに特化しているとはいえ、シリアライズされたデータをネットワーク経由でやりとりしているので、やろうと思えばほかの言語とも通信ができるはずです。 ただシリアライズにつかっているエンコーディングgobで、さすがにこのエンコーダの他言語実装を見つけることはできませんでした。

仕方ないので、net/rpcのサーバ・クライアント間にプロキシ立ててパケットをキャプチャして流れているデータを調べた上で、そのデータをそのまま返すようなサーバを作ってみました。

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::Read;
use std::io::Write;
use std::env;

fn handle_client(mut stream: TcpStream) {
    let mut buf;
    loop {
        buf = [0; 2048];
        let _ = match stream.read(&mut buf) {
            Err(e) => panic!("Got an error: {}", e),
            Ok(m) => {
                if m == 0 {
                    break;
                }
                m
            }
        };

        let s = String::from_utf8_lossy(&buf);
        let ret: &[u8];

        if s.contains("Ping") {
            ret = b":\xFF\x81\x03\x01\x01\x08Response\x01\xFF\x82\x00\x01\x03\x01\rServiceMethod\x01\x0c\x00\x01\x03Seq\x01\x06\x00\x01\x05Error\x01\x0c\x00\x00\x00\x12\xFF\x82\x01\rFunction.Ping\x00\x18\xFF\x83\x03\x01\x01\x0cPingResponse\x01\xFF\x84\x00\x00\x00\x03\xFF\x84\x00";
        } else {
            ret = b"\x16\xFF\x82\x01\x0FFunction.Invoke\x01\x01\x00(\xFF\x85\x03\x01\x01\x0EInvokeResponse\x01\xFF\x86\x00\x01\x01\x01\x07Payload\x01\n\x00\x00\x00\x08\xFF\x86\x01\x03111\x00";
        }

        match stream.write(ret) {
            Err(_) => break,
            Ok(_) => continue,
        }
    }
}

fn main() {
    let port = env::var("_LAMBDA_SERVER_PORT").unwrap();

    let listener = TcpListener::bind(format!("localhost:{}", port)).unwrap();
    for stream in listener.incoming() {
        match stream {
            Err(e) => println!("failed: {}", e),
            Ok(stream) => {
                thread::spawn(move || handle_client(stream));
            }
        }
    }
}

コードはRust Echo Server Example | Andrei Vacariu, Software Developerをほぼそのままコピーしています。 飛んできたメッセージを無理矢理Stringにして「Ping」という文字が入っていたらPing用のデータ、それ以外のデータはInveke用のデータを返すようにしています。

これ、linux-amd64でビルドしてLambdaにGolangとして登録すると、普通に動きます(「111」という値が返ってきます)

ということで、AWS Lambda Goはほかの言語でも動きます! やったぜ!

その他

Goに特化したrpcに対応するなら、先にgRPCに対応してもよかったのでは…と思わなくもない。

追記

そういえばRustはstatic linkにしてません。 ぱっとリンクしているライブラリを調べてみると

[ec2-user@ip-10-0-1-204 release]$ ldd hello
    linux-vdso.so.1 =>  (0x00007fff055a1000)
    libdl.so.2 => /lib64/libdl.so.2 (0x00007fee696d1000)
    librt.so.1 => /lib64/librt.so.1 (0x00007fee694c9000)
    libpthread.so.0 => /lib64/libpthread.so.0 (0x00007fee692ad000)
    libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00007fee69097000)
    libc.so.6 => /lib64/libc.so.6 (0x00007fee68cd3000)
    /lib64/ld-linux-x86-64.so.2 (0x00007fee69b4f000)

こんな感じでした。 まあGoでもオプション指定しないとdynamic linkになるので、libcくらいは使えますね…と。

ProxySQLをつかったRDSの切り替え

ProxySQLとは

ProxySQLはMySQL用のL7のプロキシサーバで、プロキシサーバのレイヤでR/W Splittingできたり、クエリの書き換えをできたり、負荷分散などができたりする便利ミドルウェアです。

www.proxysql.com

Dropboxの中の人が書いているみたいで、Perconaの推しミドルウェアみたいです。(開発にも関わっているのかな?)

あとQiitaにもいくつか記事が上がってます。

https://qiita.com/search?q=ProxySQL

設定の管理が結構独特で、MySQLっぽく振る舞うsqliteで管理されていて、動的にバックエンドのサーバを書き換えたりすることができます。設定まわりの概念的なものは『ProxySQL触ってみた - Qiita』がわかりやすいかも。

動的なバックエンドの切り替え

管理用インターフェースに対して、以下のようなクエリを流すと、バックエンドが瞬間的に切り替わります。

# 元の設定
$ mysql -uadmin -p -S /tmp/proxysql_admin.sock
mysql> select * from runtime_mysql_servers;
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
| hostgroup_id | hostname                                           | port | status | weight | compression | max_connections | max_replication_lag | use_ssl | max_latency_ms | comment |
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
| 0            | mydb.xxx.ap-northeast-1.rds.amazonaws.com | 3306 | ONLINE | 1      | 0           | 10000           | 0                   | 0       | 0              |         |
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
delete from mysql_servers;
insert into mysql_servers (hostname, status, max_connections)
values
('mydb.xxx.ap-northeast-1.rds.amazonaws.com', 'offline_hard', 10000),
('mydb2.xxx.ap-northeast-1.rds.amazonaws.com', 'online', 10000);

load mysql servers to runtime; /* ここで実際の設定が書き換わる */

RDSの瞬間的な切り替え

RDSのフェイルオーバーは通常、1〜2分で完了し、フェイルオーバー後はエンドポイントのIPアドレスが変わります。大抵のユースケースでは問題ないと思うんですが、「瞬間的な切り替えにしたい」とか「DNSTTLに依存したくない」等というときに、間にProxySQLを挟むことで(+別クラスタのスレーブを用意することで)瞬間的にフェイルオーバーすることができます。*1

サーバ構成

f:id:winebarrel:20171230123633p:plain

上図のmydb2mydbのリードレプリカ、という訳ではなくてスナップショットからリストアしたRDSをmysql.rds_set_external_masterレプリケーションしたものです。

クラスタのスレーブは以下の手順で作ります。

  1. mydbのリードレプリカを作る
  2. リードレプリカのレプリケーションを止める
  3. レプリケーションのポジションを記録する
  4. リードレプリカのスナップショットをとる
  5. スナップショットからリストアしてmydb2を作る
  6. 記録したポジションで、mydb2mysql.rds_set_external_masterを実行する

リードレプリカをpromoteしてmysql.rds_set_external_masterを実行すれば良さそう、、、な感じなんですが、promoteしたリードレプリカでmysql.rds_set_external_masterを実行すると、マスタのホスト名が172...と内部のIPにすり替わる、という現象が発生したのでやらない方がよいかと。mysql.rds_set_external_masterの実行前にmysql.rds_reset_external_masterが必要だったりするのを見るに、元のクラスタの情報がきちんと初期化されないのかも。

切り替えスクリプト

#!/usr/bin/env ruby
require 'logger'
require 'mysql2'
require 'open3'
require 'optparse'

class MySQL
  def initialize(host:, port:, user:, pass:, dry_run:, logger:)
    @client = Mysql2::Client.new(host: host, port: port.to_i, username: user, password: pass)
    @host = host
    @dry_run = dry_run
    @logger = logger
  end

  def enable_log_bin
    query('SET SQL_LOG_BIN = 1', force: true)
  end

  def disable_log_bin
    query('SET SQL_LOG_BIN = 0', force: true)

    if block_given?
      begin
        yield
      ensure
        begin
          enable_log_bin
        rescue => e
          @logger.warn(e.message)
        end
      end
    end
  end

  def enable_read_only
    query('SET GLOBAL READ_ONLY = 1')
  end

  def disable_read_only
    query('SET GLOBAL READ_ONLY = 0')
  end

  def read_only?
    query('SELECT @@read_only AS read_only').first.fetch('read_only').nonzero?
  end

  def enable_read_only_and_check
    enable_read_only

    if !@dry_run && !read_only?
      raise "[#{@host}] Failed to enable read only!"
    end
  end

  def find_users(user)
    query("SELECT user, host FROM mysql.user where user = '#{user}'", force: true).map {|row|
      "'#{row.fetch('user')}'@'#{row.fetch('host')}'"
    }.uniq
  end

  def drop_user(user)
    users = find_users(user)

    if users.empty?
      raise "Cannot find drop user: #{user}"
    end

    grants = users.flat_map do |user|
      privs = show_grants(user)
      query("DROP USER #{user}")
      privs
    end

    query('FLUSH PRIVILEGES')
    grants
  end

  def show_grants(user)
    query("SHOW GRANTS FOR #{user}", force: true).flat_map(&:values)
  end

  def create_user(grants, password:)
    grants.each do |sql|
      sql.sub!(/IDENTIFIED\b.*/, '')
      sql << " IDENTIFIED BY '#{password}'"
      query(sql)
    end
    query('FLUSH PRIVILEGES')
  end

  def while_threads(time_until, tstart: Time.now, user: nil)
    threads = get_threads_util(user: user)

    while time_until > 0 && threads.length > 0
      if (time_until % 5 ).zero?
        @logger.info 'Waiting all running %d threads are disconnected.. (max %d milliseconds)' % [threads.length + 1, time_until * 100]

        if threads.length < 5
          threads.each {|th| @logger.info th.inspect }
        end
      end

      sleep_until(tstart)
      tstart = Time.now
      time_until -= 1

      threads = get_threads_util(user: user)
    end

    return [threads, tstart]
  end

  def kill_threads(threads)
    threads.each do |th|
      begin
        thread_id = th.fetch('Id')
        @logger.info("KILL #{th}")
        query("CALL mysql.rds_kill(#{thread_id})")
      rescue Mysql2::Error => e
        raise e if e.error_number != 1094 # MYSQL_UNKNOWN_TID
      end
    end
  end

  def show_master_status
    query('SHOW MASTER STATUS', force: true, hidden: true).first
  end

  def show_slave_status
    query('SHOW SLAVE STATUS', force: true, hidden: true).first
  end

  def wait_replication(master, interval: 0.1)
    master_log_file, master_log_pos, slave_log_file, slave_log_pos = update_repl_info(master)

    @logger.info 'Waiting replication..'
    @logger.info "master: file=#{master_log_file} pos=#{master_log_pos}"
    @logger.info "slave: file=#{slave_log_file} pos=#{slave_log_pos}"

    #return if @dry_run

    loop do
      if master_log_file == slave_log_file && master_log_pos == slave_log_pos
        @logger.info 'Caught up!'
        @logger.info "master: file=#{master_log_file} pos=#{master_log_pos}"
        @logger.info "slave: file=#{slave_log_file} pos=#{slave_log_pos}"
        break
      end

      sleep interval
      master_log_file, master_log_pos, slave_log_file, slave_log_pos = update_repl_info(master)
      #_, _, slave_log_file, slave_log_pos = update_repl_info(master)
    end
  end

  private

  def query(sql, force: false, hidden: false)
    unless hidden
      log_msg = "[#{@host}] #{sql}"
      log_msg << ' (dry-run)' if @dry_run
      @logger.info log_msg
    end

    if force || !@dry_run
      @client.query(sql)
    end
  end

  def get_threads_util(running_time_threshold: 0, type: 0, user: nil)
    my_connection_id = @client.thread_id
    threads = []

    query('SHOW PROCESSLIST', force: true, hidden: true).each do |row|
      th_id         = row.fetch('Id')
      th_user       = row.fetch('User')
      th_host       = row.fetch('Host')
      th_command    = row.fetch('Command')
      th_state      = row.fetch('State')
      th_query_time = row.fetch('Time')
      th_info       = row.fetch('Info')

      th_info.sub!(/\A\s*(.*?)\s*\Z/) { $1 } if th_info

      next if my_connection_id == th_id
      next if th_query_time && th_query_time < running_time_threshold
      next if th_command && th_command == 'Binlog Dump'
      next if th_user && th_user == 'system user'
      next if user && th_user != user

      unless user
        if th_command && th_command == 'Sleep' && th_query_time && th_query_time >= 1
          next
        end
      end

      if type >= 1
        next if th_command && th_command == 'Sleep'
        next if th_command && th_command == 'Connect'
      end

      if type >= 2
        next if th_info && th_info =~ /\Aselect/i
        next if th_info && th_info =~ /\Ashow/i
      end

      threads << row
    end

    return threads
  end

  def sleep_until(tstart, running_interval: 0.1)
    elapsed = Time.now - tstart

    if running_interval > elapsed
      sleep(running_interval - elapsed)
    end
  end

  def update_repl_info(master)
    master_status = master.show_master_status
    slave_status = show_slave_status
    master_log_file = master_status.fetch('File')
    master_log_pos = master_status.fetch('Position')
    slave_log_file = slave_status.fetch('Relay_Master_Log_File')
    slave_log_pos = slave_status.fetch('Exec_Master_Log_Pos')
    [master_log_file, master_log_pos, slave_log_file, slave_log_pos]
  end
end

def parse_options(argv)
  options = argv.getopts(
    '',
    'orig-host:',
    'orig-port:',
    'orig-user:',
    'orig-pass:',
    'new-host:',
    'new-port:',
    'new-user:',
    'new-pass:',
    'drop-user:',
    'drop-user-pass:',
    'failover:',
    'execute'
  )

  if options.values.any?(&:nil?)
    raise "not enough options: #{options}"
  end

  Hash[options.map {|k, v| [k.tr(?-, ?_).to_sym, v] }]
end

def failover(script, logger, dry_run:)
  logger.info 'Failing over..'
  logger.info "Execute #{script}"

  unless dry_run
    out, err, status = Open3.capture3(script)
    logger.info out unless out.empty?
    logger.error err unless err.empty?
    raise err unless status.success?
  end
end

def main(argv)
  logger = Logger.new($stdout)
  options = parse_options(argv)

  if options.fetch(:execute)
    logger.info '!!! execute mode !!!'
  else
    logger.info '*** dry-run mode ***'
  end

  orig_mysql = MySQL.new(
    host: options.fetch(:orig_host),
    port: options.fetch(:orig_port),
    user: options.fetch(:orig_user),
    pass: options.fetch(:orig_pass),
    dry_run: !options.fetch(:execute),
    logger: logger)

  new_mysql = MySQL.new(
    host: options.fetch(:new_host),
    port: options.fetch(:new_port),
    user: options.fetch(:new_user),
    pass: options.fetch(:new_pass),
    dry_run: !options.fetch(:execute),
    logger: logger)

  #logger.info 'Set read_only on the new master..'
  #new_mysql.enable_read_only_and_check

  #orig_mysql.disable_log_bin do
    drop_user = options.fetch(:drop_user)
    grants = orig_mysql.drop_user(drop_user)
    threads, tstart = orig_mysql.while_threads(15, user: drop_user)
    #orig_mysql.enable_read_only_and_check
    threads, tstart = orig_mysql.while_threads(5, tstart: tstart, user: drop_user)
    logger.info 'Killing all application threads..'
    orig_mysql.kill_threads(threads) if threads.length > 0
  #end

  sleep 1
  new_mysql.wait_replication(orig_mysql)

  new_mysql.create_user(grants, password: options.fetch(:drop_user_pass))

  failover(options.fetch(:failover), logger, dry_run: !options.fetch(:execute))

  #new_mysql.disable_log_bin do
  #  logger.info 'Set read_only=0 on the new master.'
  #  new_mysql.disable_read_only
  #end
end

main(ARGV)

MHA用スクリプトの雑な流用です。 以下のような感じで使います。

#!/bin/bash
ORIG=mydb.xxx.ap-northeast-1.rds.amazonaws.com
NEW=mydb2.xxx.ap-northeast-1.rds.amazonaws.com
MYSQL_USER=root
MYSQL_PASS='...'

./mysql-online-switch.rb \
  --orig-host=$ORIG --orig-port=3306 --orig-user=$MYSQL_USER --orig-pass="$MYSQL_PASS" \
   --new-host=$NEW  --new-port=3306  --new-user=$MYSQL_USER  --new-pass="$MYSQL_PASS" \
  --drop-user=app_mysql_user --drop-user-pass='...' --failover ./switch.sh  #--execute

--executeオプションをつけないとdry-runモードで起動します

やっていることは

  1. RDSのアプリ用ユーザを削除する
  2. 当該ユーザの既存セッションをKILLする
  3. レプリケーションが追いつくのを待つ
  4. 移行先に新しくアプリ用ユーザを作る(レプリケーションで削除されるので)
  5. 切り替えスクリプトを実行

切り替えスクリプトは前述のSQLを実行するだけのものです。

#!/bin/bash
cat <<SQL | mysql -S /tmp/proxysql_admin.sock -u admin -pPASSWORD 2> /dev/null
delete from mysql_servers;
insert into mysql_servers (hostname, status, max_connections)
values
('mydb.xxx.ap-northeast-1.rds.amazonaws.com', 'offline_hard', 10000),
('mydb2.xxx.ap-northeast-1.rds.amazonaws.com', 'online', 10000);
load mysql servers to runtime;
SQL

実行結果

感じで切り替わります。

$ ./mysql-online-switch.rb ...
I, [2017-10-01T11:30:04.277194 #23953]  INFO -- : !!! execute mode !!!
I, [2017-10-01T11:30:04.365187 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] SELECT user, host FROM mysql.user where user = 'app_mysql_user'
I, [2017-10-01T11:30:04.365557 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] DROP USER 'app_mysql_user'@'%'
I, [2017-10-01T11:30:04.365594 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] FLUSH PRIVILEGES
I, [2017-10-01T11:30:04.377357 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 1500 milliseconds)
I, [2017-10-01T11:30:04.876041 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 1000 milliseconds)
I, [2017-10-01T11:30:05.377405 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 500 milliseconds)
I, [2017-10-01T11:30:05.887999 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 500 milliseconds)
I, [2017-10-01T11:30:06.379264 #23953]  INFO -- : Killing all application threads..
I, [2017-10-01T11:30:06.379462 #23953]  INFO -- : KILL {"Id"=>72515760, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:57520", "db"=>"mydb", "Command"=>"Sleep", "Time"=>401, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379504 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72515760)
I, [2017-10-01T11:30:06.379632 #23953]  INFO -- : KILL {"Id"=>72517447, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:57536", "db"=>"mydb", "Command"=>"Sleep", "Time"=>38, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379668 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72517447)
I, [2017-10-01T11:30:06.379754 #23953]  INFO -- : KILL {"Id"=>72523822, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:58008", "db"=>"mydb", "Command"=>"Sleep", "Time"=>48, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379790 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72523822)
...
I, [2017-10-01T11:30:07.402341 #23953]  INFO -- : Waiting replication..
I, [2017-10-01T11:30:07.402428 #23953]  INFO -- : master: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402455 #23953]  INFO -- : slave: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402476 #23953]  INFO -- : Caught up!
I, [2017-10-01T11:30:07.402511 #23953]  INFO -- : master: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402561 #23953]  INFO -- : slave: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402605 #23953]  INFO -- : [mydb2.xxx.ap-northeast-1.rds.amazonaws.com] GRANT USAGE ON *.* TO `app_mysql_user` IDENTIFIED BT '...'
...
I, [2017-10-01T11:30:07.402605 #23953]  INFO -- : Failing over..
I, [2017-10-01T11:30:07.402653 #23953]  INFO -- : Execute ./switch.sh

まとめ

MHA便利。 ProxySQL便利。

OFFLINE_SOFTについて

OFFLINE_HARDに対してOFFLINE_SOFTというステータスがあります。 これは、アクティブなトランザクションとコネクションはそのままで、新しいトラフィックは新しい別のバックエンドに送信するというものです。

コネクション張りっぱなしでも、一応、gracefulに新しいバックエンドに切り替えてくれるのですが

という感じで、トランザクションはさておき、セッションを操作すると、当たり前ですがバックエンドの切り替えは行われないようでした。 RailsでこのOFFLINE_SOFTを無理に活用しようと思うと、接続時にセッション変数を書いたり読んだりしているところを潰して、全部グローバル変数とかmy.cnfに設定する…とか? リクエストごとにコネクション切るような設計でないとあんまり活用できない気がしますね。

*1:もちろんクライアント側の再接続などのケアは必要ですが

ES DeskというElasticsearch検証用Webアプリを作った

このエントリはElastic stack (Elasticsearch) Advent Calendar 2017の12/8分です。


ES DeskというElasticsearch用のWebアプリを作成しました

github.com

まずはデモサイトをご覧ください。

http://es-desk.winebarrel.jp/

これは何?

Elasticsearchのインデックスを編集しながら、トライアンドエラーで検証するためのWebアプリです。

以下のような機能があります。

  • ブラウザからのQuery DSLの送信と結果の保存
  • データセットの登録・インポート
  • インデックスの作成・編集
  • Query DSLの登録・編集

Elasticsearchのインデックスのアナライザを変更する検証を行うときに、いままでは「元のインデックスを消して」「curlで修正したインデックスを投入して」「データをロードして」みたいなことをやっていたんですが、ES Deskではブラウザから直接インデックスの定義を編集することができ、元のデータの残る*1ので、「こまごまとインデックスの編集→検索結果を見る」という作業をやるときに手間がかなり省けました。

また、検索結果を「クエリ+そのときインデックス定義+検索結果+そのときのデータセット」という感じでスナップショット的に保存しておけるので、他の人に「こういうアナライザをつかったら、こんな結果になりました」というのを共有しやすいです。

なかなか便利に使っております。

どうぞご利用ください。

*1:正確にはデータを再ロードしています

utsusemiというプロキシサーバを書いた

github.com

これは何?

リクエストが来たとき、バックエンド①が404ならバックエンド②にアクセスして、バックエンド②が404ならバックエンド③にアクセスして…みたいなプロキシサーバ。

開発用の画像配信サーバに画像がなかったときに、本番にアクセスしにいく、みたいな用途で使います。

それnginxで(ry

できます。 なんかこんなQAがあって、

server {
  listen 80;
  location / {
    proxy_pass http://foo.bar.dev/
    proxy_intercept_errors on;
    error_page 404 =200 /production/$uri;
  }

  location /production {
    internal;
    rewrite ^/production/(.*) /$1 break;
    proxy_pass http://foo.bar.prod/
  }
}

こんな設定にしたら、できました。

が、バックエンドが三つは無理でした。 できるのかもしれないけど、わからなかった。。。

それvarnishで(ry

できます。 return (restart);とかでできます。 というか、もともとそうやって実装されてたんですけど、オーバーキル感があるので、結局、自前で実装しました。

でまあ、utsusemiというプロキシサーバをGoで書きました。 Golang便利。

こういう設定を書いて、起動すると

#port = 11080

[[backend]]
target = "http://httpstat.us/404"
#ok = [200]

[[backend]]
target = "https://www.google.co.jp"
#ok = [200]

[[backend]]
target = "https://s.yimg.jp"
#ok = [200]

yahooの画像にもgoogleの画像にもlocalhost:11080でアクセスできます。

その他

はてなブログ、tomlのsyntax highlightをサポートしてくれないかなー (nginxをサポートしてて驚いた。すばらしい)

Rails / Active RecordでMySQLのgeometry型をサポートするライブラリを書いた

Armgという、ARをMySQLのgeometry型に対応させるためのライブラリを書きました。

github.com

使い方は以下の通り。

require 'active_record'
require 'armg'

ActiveRecord::Base.establish_connection(adapter: 'mysql2', database: 'my_db');

ActiveRecord::Migration.create_table :geoms, options: 'ENGINE=MyISAM' do |t|
  t.geometry 'location', null: false
  t.index ['location'], name: 'idx_location', type: :spatial
end

class Geom < ActiveRecord::Base; end

wkt_parser = RGeo::WKRep::WKTParser.new(nil, support_ewkt: true)
point = wkt_parser.parse('SRID=4326;Point(-122.1 47.3)')
Geom.create!(location: point)

Geom.first
#=> #<Geom id: 1, location: #<RGeo::Cartesian::PointImpl:0x... "POINT (-122.1 47.3)">>

ridgepoleにPRをもらって、コードを眺めていたら何となく対応できそうだったので、書いてみた感じです。当初は自前でWKBのパースをやっていたんですが、まじめに考えると大変そうだったので、RGeoを使うようにしました。 MySQL 5.7の最新版だとInnoDBでもgeometry型にインデックスを貼れるので、利用する機会も増えるかも。

ridgepoleでは以下のように-rオプションでarmgを渡してやると、ダンプできます(が、動作はきちんと確認できてません)

$ ridgepole -c mysql2://root:pass@127.0.0.1:13306/test -r armg -e
# Export Schema
create_table "geoms", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=latin1" do |t|
  t.geometry "location", null: false
  t.index ["location"], name: "idx_location", type: :spatial
end

ath: a interactive Amazon Athena shell

Webコンソールからパーティションをぽちぽち追加するのに疲れたので、Athena用のシェルを書きました。

github.com

使い方は以下のような感じです。

$ export ATH_OUTPUT_LOCATION=s3://my-bucket

$ ath

default> show databases;
default
sampledb

default> /use sampledb
sampledb> show tables;
elb_logs

sampledb> select * from elb_logs limit 3;
"request_timestamp","elb_name","request_ip","request_port","backend_ip","backend_port","request_processing_time","backend_processing_time","client_response_time","elb_response_code","backend_response_code","received_bytes","sent_bytes","request_verb","url","protocol","user_agent","ssl_cipher","ssl_protocol"
"2015-01-01T08:00:00.516940Z","elb_demo_009","240.136.98.149","25858","172.51.67.62","8888","9.99E-4","8.11E-4","0.001561","200","200","0","428","GET","https://www.example.com/articles/746","HTTP/1.1","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/602.1.50 (KHTML, like Gecko) Version/10.0 Safari/602.1.50""","DHE-RSA-AES128-SHA","TLSv1.2"
"2015-01-01T08:00:00.902953Z","elb_demo_008","244.46.184.108","27758","172.31.168.31","443","6.39E-4","0.001471","3.73E-4","200","200","0","4231","GET","https://www.example.com/jobs/688","HTTP/1.1","""Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1""","DHE-RSA-AES128-SHA","TLSv1.2"
"2015-01-01T08:00:01.206255Z","elb_demo_008","240.120.203.212","26378","172.37.170.107","8888","0.001174","4.97E-4","4.89E-4","200","200","0","2075","GET","http://www.example.com/articles/290","HTTP/1.1","""Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.246""","-","-"

sampledb> select * from elb_logs limit 3 &
QueryExecution 2335c77b-d138-4c5d-89df-12f2781c311b

sampledb> /desc 2335c77b-d138-4c5d-89df-12f2781c311b
{
  "query_execution_id": "2335c77b-d138-4c5d-89df-12f2781c311b",
  "query": "select * from elb_logs limit 3",
  "result_configuration": {
    "output_location": "s3://sugawara-test/2335c77b-d138-4c5d-89df-12f2781c311b.csv"
  },
  "query_execution_context": {
    "database": "sampledb"
  },
  "status": {
    "state": "SUCCEEDED",
    "submission_date_time": "2017-07-02 16:29:57 +0900",
    "completion_date_time": "2017-07-02 16:29:58 +0900"
  },
  "statistics": {
    "engine_execution_time_in_millis": 719,
    "data_scanned_in_bytes": 422696
  }
}

sampledb> /result 2335c77b-d138-4c5d-89df-12f2781c311b
"request_timestamp","elb_name","request_ip","request_port","backend_ip","backend_port","request_processing_time","backend_processing_time","client_response_time","elb_response_code","backend_response_code","received_bytes","sent_bytes","request_verb","url","protocol","user_agent","ssl_cipher","ssl_protocol"
"2015-01-01T16:00:00.516940Z","elb_demo_009","242.76.140.141","18201","172.42.159.57","80","0.001448","8.46E-4","9.97E-4","302","302","0","2911","GET","https://www.example.com/articles/817","HTTP/1.1","""Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1""","DHE-RSA-AES128-SHA","TLSv1.2"
"2015-01-01T16:00:00.902953Z","elb_demo_005","246.233.91.115","1950","172.42.232.155","8888","9.59E-4","0.001703","8.93E-4","200","200","0","3027","GET","http://www.example.com/jobs/509","HTTP/1.1","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/602.1.50 (KHTML, like Gecko) Version/10.0 Safari/602.1.50""","-","-"
"2015-01-01T16:00:01.206255Z","elb_demo_002","250.96.73.238","12800","172.34.87.144","80","0.001549","9.68E-4","0.001908","200","200","0","888","GET","http://www.example.com/articles/729","HTTP/1.1","""Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.246""","-","-"

どうぞご利用ください。

Ridgepole v0.7.0.beta2

Ridgepole v0.7.0.beta2をリリースしました。 開発中にコメントやフィードバックをしていただいた方にはありがとうございました。

github.com

主な変更点は以下の通りです。

  • Rails(ActiveRecord) 4.xのサポートを止めた
    • 5.xと両方のフォーマットをサポートする必要があったspecがだいぶきれいになりました
    • またactiverecord-mysql-awesomeの機能は5.xに取り込まれているので--enable-mysql-awesomeオプションを削除しました
  • Rails(ActiveRecord) 5.1に対応
  • Ruby 2.4のサポート…というかテストケースを追加
  • DROP TABLEをスキップする--skip-drop-tableオプションを追加
  • MySQLのテーブルオプションの差分を適用する--mysql-change-table-optionsオプションを追加
    • kamipoさんの実装をほとんどそのまま取り入れました
  • MySQL 5.7のサポート
    • JSON型とGenerated Columnsが使えるようになりました
  • URL形式の接続設定のサポート
  • 環境変数経由での接続設定の受け渡し(-c env:MY_DB_URL
  • 名無しの外部キーのサポート
  • 外部キーの適用順序の変更(FK削除→テーブル変更→FK追加)

AR 5.1サポート・MySQL 5.7サポート・FKまわりの改善が大きなところです。 特にFKまわりついては、はまる人が多いようだったので、それなりに使い勝手を良くしたつもりです。 (とはいえ、ARの名無しFKの実装についてはもやもやするところですが)

0.7系について、不具合や要望等があれば随時Issue上げていただけると助かります。