Postfixでの流量制限について

そろそろ腐りそうな知見なので、腐りきる前にメモだけ残しておく。

まあ、基本的にはSendGridなどのクラウドサービスを使った方がいいと思うけど、どこかの誰かの役に立つかもしれないので…


Postfixでメール配信の流量制限を行う場合、送る量を制限するか、MTAに入ってくる量を制限するかの2通りの方法があると思う。 送る量を制限しようと思うとslow_destination_rate_delay=1sみたいな設定値を追加して、送信毎に遅延をかけるような感じになると思う。

knowledge4linux.blogspot.com

しかし、この方法だと1s以下に設定することができないので、毎分何万通も送るようなシステムだと極端に配信量が下がってしまう。 また、流量制限を行う場合は単に遅延を設けるだけでなく「毎分xxx通」といった厳密な指定がしたい。

そこで、MTAに入ってくる量を制限する。

単純にMTAに入ってくる量を制限すると、上限超えたときにクライアントでエラーが出る。 なので、以下のようにPostfixを2段構成にする。

f:id:winebarrel:20180829040934p:plain

インターネットに面したPostfixでは、master.cfで送信先毎の流量を設定する

11125     inet  n       -       n       -       -       smtpd
  -o syslog_name=postfix/gmail
  -o smtp_destination_concurrency_limit=1
  -o smtp_destination_recipient_limit=1
  -o smtpd_client_message_rate_limit=1000
11126     inet  n       -       n       -       -       smtpd
  -o syslog_name=postfix/docomo
  -o smtp_destination_concurrency_limit=1
  -o smtp_destination_recipient_limit=1
  -o smtpd_client_message_rate_limit=400

クライアントに面したPostfixでは流量制限したい宛先のポートをtransportで変更する。

gmail.com smtp:[outside-load-balancer]:11125
docomo.ne.jp smtp:[outside-load-balancer]:11126

また、main.cfで再送間隔を短くしておく。

minimal_backoff_time = 3m
maximal_backoff_time = 15m
queue_run_delay = 3m

こういう構成にしておくと、gmail宛には1000通/分、docomo宛には400通/分、みたいな感じで流量を制限することができる。

経験上、3キャリアとGMailについては流量に制限をかけておいた方が無難だと思う…

またこの構成でのもう一つのメリットは、流量制限によってメールが溜まるキューがインターネット側のPostfixではなく、クライアント側のPostfixであるという点だ。 インターネット側のPostfixはたぶんグローバルIPを持つことになると思うが、特定のIP のPostfixのキューにメールが溜まってしまうと、そのIPが大量配信などで受け取り拒否されたときに困ったことになってしまう。 クライアント側のキューの方にメールが溜まっている状態なら、流量制限のリミットの引っかかった場合、再送でロードバランサーが別のPostfixに適度に振り分けてくれる。

DatadogでのECSタスクのCPUの監視について

ECSタスクのDockerコンテナのCPU使用率については、監視自体に意味があるのか微妙なところもあるけれど、タスクの制限いっぱいまで常にCPUを使い切っているコンテナは不健康な場合もありそうだということで、Datadogで監視しようとしたことがあった。

docker.cpu.usageはホストの1コアに対するCPUの使用率なので、50%となっていても、ECSのタスク定義でCPU=512としていたら、リミットいっぱいいっぱいまで使っていることになるので、タスクやイメージによって閾値を変える必要が出てくる。そうなるとmulti alertが使えない。

docker.cpu.throttledというメトリクスがあって

www.datadoghq.com

これはnr_thresholdの値で、割り当てられたCPU時間の限界に達した数をカウントしてくれるのでこれを見ると良さそうだ…と思ったんだけれど、この値が変動するのは(たしか)--cpusだったか--cpu-quotaだったかのオプションの時で、ECSのタスク定義でつかうcpu--cpu-sharesの時には変動しない。

どうしたものか、と思っていたところ /sys/fs/cgroup/cpu/cpu.shares からcpu.sharesの値が取れたので、それで正規化すればいいじゃんということで、パッチを投げた。

github.com

めでたくマージされ、v6も変更が入って、docker.cpu.sharesというメトリクスが入った。

これで、

avg:docker.cpu.usage{*} by {container_name} * 1024 / avg:docker.cpu.shares{*} by {container_name}

というモニタを作れば、100%に正規化された値を閾値として、multi alertが作れる…はずだったんだけれど

  • cpuが定義されていないと、使えない
  • たまに cpu.shares=3 みたいなタスクがあって、値が1000%みたいなことになる

ということがあって、結果は微妙だった…

あんまりこの手の知見が見当たらなかったので、とりあえずメモとして残しておく。

srvdというデーモンを書いた

一身上の都合によりsrvdというデーモンを書いた。

github.com

これは何?

DNSSRVレコードをバックエンドにしたconfdみたいなものです。

SRVレコードの値に合わせてミドルウェアの設定ファイルを書き換えて、設定ファイルが変更されたらミドルウェアをリロードする、みたいな。

ことの発端

某所ではMySQLのスレーブへのロードバランサーとして、Railsサーバに同居しているHAProxyを使ってるんですよ。 中央集権的ロードバランサーに比べて、スループットがよいとか、大量のコネクションが一カ所に集中しないとか性能的にはいいんですが、いかんせん設定ファイルをRailsサーバにばらまくのがめんどくさい。設定ファイルをばらまいた後は大量のHAProxyのリロードとRailsのリロード。

それをなんとかしたいと思ったので、いろいろ試してみたんですよ。

四苦八苦していたら同僚がHAProxyのserver-template(とそのデメリット)を教えてくれたので、さらに検証。

www.haproxy.com

HAProxy 1.8だと

server-template db 10 _mysql._tcp.example.com:3306 check port 3306 resolvers dns

みたいな設定を書くと、SRVレコードの値に合わせてバックエンドを設定してくれるんですよね。

一件、なかなかよいのですが

  • バックエンドのサーバ数が固定
    • 例えば、server-template db 10と書くとSRVレコードの返す値が2個でもdb1〜db10までのバックエンドが作られて、db1・db2以外は「ヘルスチェック失敗」というステータスになる
  • 減ったサーバはメンテ状態になって残る
    • SRVレコードがdb-001・db-002と返していたのが、db-002だけになると、db-001のバックエンドは「メンテ状態」のステータスで残り続ける

バックエンドが大きく変わるような場合だと、指定したサーバ数からあふれたり、残り続けたバックエンドがどうなるのかがよく分からない…なんとなーく、よくない匂いを感じる…

それで、consul-template使うか、いやconsulのクラスタを管理したいわけじゃないんだよな、もっとシンプルにやりたいんだよ、そういえばconfdってSRVレコード対応してたっけ…と調べてみると

confd/dns-srv-records.md at master · kelseyhightower/confd · GitHub

etcdやconsulのノードを見つけるためにSRVレコードは使えるけど、バックエンドとしては使えない。 たしかに、DNSはKVSじゃないしね…

confdのバックエンドに追加する修正を投げようかと思ったけれど、なんとなくポリシーが違いそうだし、confdのソースをざっと眺めた感じ、これくらいなら実装できるか、と思って作った次第です。

使い方

srvdの設定ファイルがこんな感じ。

src = "/etc/haproxy/haproxy.cfg.tmpl"
dest = "/etc/haproxy/haproxy.cfg"
domain = "_mysql._tcp.example.com"
reload_cmd = "/bin/systemctl reload haproxy.service"
check_cmd = "/usr/sbin/haproxy -c -V -f {{ .src }}"
interval = 1
timeout = 3
#resolv_conf = "/etc/resolv.conf"
cooldown = 60
#status_port = 8080

haproxy.cfgのテンプレートはこんな感じ。

backend nodes
  mode tcp
  # see https://godoc.org/github.com/miekg/dns#SRV
  {{ range .srvs }}
  server {{ .Target }} {{ .Target }}:{{ .Port }} check
  {{ end }}

SRVレコードを

10 10 3306 db-001.example.com.

と設定して、srvdを起動すると、以下のようなhaproxy.cfgが作成される。

backend nodes
  mode tcp
  # see https://godoc.org/github.com/miekg/dns#SRV

  server db-001.example.com. db-001.example.com.:3306 check

RaisのUnitファイルはこんな感じ。

[Unit]
Description=Rails
After=network.target
ReloadPropagatedFrom=haproxy.service

[Service]
User=ubuntu
WorkingDirectory=/home/ubuntu/hello
ExecStart=/usr/local/bin/bundle exec puma
ExecReload=/bin/kill -s USR2 $MAINPID

[Install]
WantedBy=multi-user.target

RailslocalhostのHAProxyを参照するように設定。

default: &default
  adapter: mysql2
  encoding: utf8
  pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %>
  username: scott
  password: tiger
  host: 127.0.0.1

コントローラで接続先が分かるようにして

  def index
    render plain: Item.connection.execute("show variables like 'hostname'").first.last + "\n"
    ActiveRecord::Base.clear_all_connections!
  end

curlで叩き続けると

$ while true; do curl localhost:3000; sleep 1; done
db-001
db-001
db-001

db-001に接続していることが分かる。


そうしたらSRVレコードにdb-001を追加。

10 10 3306 db-001.example.com.
10 10 3306 db-002.example.com.

変更して30秒〜1分ぐらい待つと、haproxy.cfgが書き換わってHAProxyとRailsがリロード。

Aug 03 02:35:01 app-101 srvd[20298]: 2018/08/03 02:35:01 The configuration has been changed. Update /etc/haproxy/haproxy.cfg
Aug 03 02:35:01 app-101 srvd[20298]: 2018/08/03 02:35:01 Run '/usr/sbin/haproxy -c -V -f {{ .src }}' for checking
Aug 03 02:35:01 app-101 srvd[20298]: 2018/08/03 02:35:01 /usr/sbin/haproxy: stdout: Configuration file is valid
Aug 03 02:35:01 app-101 srvd[20298]: 2018/08/03 02:35:01 Run '/bin/systemctl reload haproxy.service' for reloading
Aug 03 02:35:01 app-101 systemd[1]: Reloading HAProxy Load Balancer.
Aug 03 02:35:01 app-101 systemd[1]: Reloading Rails.
Aug 03 02:35:01 app-101 bundle[22391]: * Restarting...
Aug 03 02:35:01 app-101 haproxy[30792]: Configuration file is valid
Aug 03 02:35:01 app-101 haproxy-systemd-wrapper[21157]: haproxy-systemd-wrapper: re-executing
Aug 03 02:35:01 app-101 systemd[1]: Reloaded HAProxy Load Balancer.
Aug 03 02:35:01 app-101 haproxy-systemd-wrapper[21157]: haproxy-systemd-wrapper: executing /usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -Ds -sf 30587
Aug 03 02:35:01 app-101 systemd[1]: Reloaded Rails.
Aug 03 02:35:01 app-101 haproxy[30802]: Proxy localnodes started.
Aug 03 02:35:01 app-101 haproxy[30802]: Proxy localnodes started.
Aug 03 02:35:01 app-101 haproxy[30802]: Proxy nodes started.
Aug 03 02:35:01 app-101 haproxy[30802]: Proxy nodes started.
Aug 03 02:35:01 app-101 bundle[22391]: Puma starting in single mode...
Aug 03 02:35:01 app-101 bundle[22391]: * Version 3.12.0 (ruby 2.3.1-p112), codename: Llamas in Pajamas
Aug 03 02:35:01 app-101 bundle[22391]: * Min threads: 5, max threads: 5
Aug 03 02:35:01 app-101 bundle[22391]: * Environment: development
Aug 03 02:35:02 app-101 bundle[22391]: * Inherited tcp://0.0.0.0:3000
Aug 03 02:35:02 app-101 bundle[22391]: Use Ctrl-C to stop

新しいDBに接続するようになる。

backend nodes
  mode tcp
  # see https://godoc.org/github.com/miekg/dns#SRV

  server db-001.example.com. db-001.example.com.:3306 check

  server db-002.example.com. db-002.example.com.:3306 check
$ while true; do curl localhost:3000; sleep 1; done
...
db-001
db-001
db-001
db-002
db-001
db-002
db-001
db-002

その他

まあまあ使えるかな、と思いつつまだ実践に投入できていないのでなんともかんとも。 Dockerコンテナ内でのHAProxyのリロードも考えねば。

Ridgepoleのverboseオプションについて

Ridgepoleを使っていると、たまによく分からない差分が出てくることがあります。

そういうときにverboseオプションをつけて実行すると、内部でどのような比較を行っているのかが出力されるので、デバッグの助けになります。

例えば次のようなSchemafileがあったとして

create_table "users", force: :cascade do |t|
  t.string "name", null: false
  t.datetime "created_at", null: false
  t.datetime "updated_at", null: false
  t.index "lower(name)"
end

一回適用したのに、もう一度適用しようとすると差分(とエラー)が出てくる。

$ ridgepole -a
Apply `Schemafile`
-- create_table("users", {})
   -> 0.0109s
-- add_index("users", "lower(name)", {})
   -> 0.0054s

$ ridgepole -a --dry-run
Apply `Schemafile` (dry-run)
remove_index("users", {:name=>"index_users_on_lower_name"})
add_index("users", "lower(name)", {})

[ERROR] Index name 'index_users_on_lower_name' on table 'users' already exists
  1: remove_index("users", {:name=>"index_users_on_lower_name"})
* 2: add_index("users", "lower(name)", {})
    /Users/sugawara/.rbenv/versions/2.4.2/lib/ruby/gems/2.4.0/gems/activerecord-5.2.0/lib/active_record/connection_adapters/abstract/schema_statements.rb:1169:in `add_index_options'

なので--verboseをつけて実行してみます。

$ ridgepole -a --dry-run --verbose
Apply `Schemafile` (dry-run)
# Parse DSL
# Load tables
#   users
# Compare definitions
#   users
   {"created_at"=>{:options=>{:null=>false}, :type=>:datetime},
    "name"=>{:options=>{:null=>false}, :type=>:string},
    "updated_at"=>{:options=>{:null=>false}, :type=>:datetime}},
- :indices=>
-  {"index_users_on_lower_name"=>
-    {:column_name=>"lower((name)::text)",
-     :options=>{:name=>"index_users_on_lower_name"}}},
+ :indices=>{"lower(name)"=>{:column_name=>"lower(name)", :options=>{}}},
  :options=>{}}
remove_index("users", {:name=>"index_users_on_lower_name"})
add_index("users", "lower(name)", {})

[ERROR] Index name 'index_users_on_lower_name' on table 'users' already exists
  1: remove_index("users", {:name=>"index_users_on_lower_name"})
* 2: add_index("users", "lower(name)", {})
    /Users/sugawara/.rbenv/versions/2.4.2/lib/ruby/gems/2.4.0/gems/activerecord-5.2.0/lib/active_record/connection_adapters/abstract/schema_statements.rb:1169:in `add_index_options'

紫の部分が変更前、つまり実際のデータベースからエクスポートした情報、水色の部分が変更後、つまりSchemafileをパースした情報です。

差異を見てみると

  • lower(name)lower((name)::text)として解釈・適用されている
  • 自動的にインデックス名index_users_on_lower_nameが付けられている

ということが分かります。

なのでSchemafileを以下のように修正すると差分は出なくなります。

create_table "users", force: :cascade do |t|
  t.string "name", null: false
  t.datetime "created_at", null: false
  t.datetime "updated_at", null: false
  t.index "lower((name)::text)", name: "index_users_on_lower_name"
end
$ ridgepole -a --dry-run
Apply `Schemafile` (dry-run)
No change

ちなみに、このようなデータベースの暗黙的な変更(例えばMySQLだと外部キーに自動的にインデックスが貼られる件など)をRidgepole側で吸収するのはなかなか難しいので、できるだけデーターベース様の意向に沿ってください…というのが今のところのポリシーです。

京都に引っ越して一ヶ月ぐらいたった

京都に引っ越して一ヶ月ぐらいたった。

住所変更やバイクの運搬でバタバタしていたが、生活には大分慣れてきたように思う。 一応、京都市内ではあるが、所謂、洛中ではなく南の方なので、あまり「古式ゆかしい町並みが…」という感じではない。 近くに大きめの道路があって、消防車や救急車がしょっちゅう走り、夜はだいたい暴走族っぽい感じの人たちが走っている。

まわりを見渡せば住宅やら工場やらで、遠くに見える山々が唯一「遠くに来たんだ」と感じさせる。 似たような光景は横浜でも、さいたまでも見たような気がする。 京都駅付近は混んでいるのであまり近づかないし、唯一、観光的に感じるのは、たまに自転車で通る東寺をみて「変な鳥がいるな」と思うときぐらいだ。


京都に引っ越したのは、主にバイクが理由だ。 生まれてこのかた関東で暮らし、大学で二輪の免許を取って以来、関東の方々にバイクで走りに行って、大分飽きてきたので引っ越してみた。 別に休暇を取れば、日本のどこにでも行けるのだけれど、バイクという乗り物は天気に旅程がひじょーに左右されるので、長期的な計画を立てづらい。 普通の天気予報だとだいたい2週間ぐらいが限度だし、一ヶ月予報をやっているサイトもあるけれど、当てになるのかよく分からない… なのでまあ、バイクでちょっと出かけるときは「明日晴れじゃん。ツーリング行くか」という感じで、土日に早起きして日帰りでぷらっと旅に出るのがよいのです。

そうはいっても、もう箱根も何度も行ってるし、富士山もいったし、秩父もいったし、奥多摩もいったし、群馬もいったし、平らな千葉もいったし、長野は遠いし…という感じでぷらっと行けるところはだいたい行ってしまって、正直飽きてきたなというところで、はたと弊社がリモートワークをやっていることを思い出し、「リモートワークなら別にどこに住んでいても仕事に支障はないよな…」と、引っ越してみることを思いついた。


引っ越しを思いつくまで、あまりリモートワークは活用してはいなかった。 体調不良やら、私用やらで例外的に出社しない時で自宅で作業はやっていたが、基本的には会社で作業をしていた。 どうも自宅にいるとなかなか仕事モードに入りにくく、会社に行くと自分に仕事をさせる強制力が働くと思ってのそんな作業スタイルだったが、関東外への引っ越しを決めたとき、まず自主的に「会社に出社しない」ということを心がけてみた。

最初の頃は、長居できる喫茶店をさまよう生活をしていたが、さすがに毎日だと厳しい。 自宅で仕事ができるのが一番なのだが、なかなか仕事モードになれない、どうするか…と思っていたが、作業用の机と椅子を購入したところ案外すっと仕事モードに入れるようになり、そこからずっと自宅で作業するようになった(机と椅子は5000円くらいの安いやつ)。 それで一ヶ月ぐらい出社しないで大丈夫そうな頃合いを見計らって上司に相談したところ、まずは様子見ということでさらに一ヶ月、会社に出社しない期間を設けて問題がないことを確認することになった。 それで、去年の11月ぐらいから今年の1月ぐらいまで、会社に出社しない日々を過ごし「問題ないであろう」というお墨付きをもらい、2月にぷらっと京都に出かけて引っ越し先を決め、3月に引っ越して現在に至る。


今のところ、仕事に支障は出ていない(と思ってはいる)。 元々会社にいても、もっぱらSlackとGitHubでのコミュニケーションが主だったし、基本的なオペレーションや日々の細かいタスクなどで困ることは、リモートワークを本格化した当初からあまりなかった。 対面でのコミュニケーションはもっぱらZoomやHangoutで、リモート特有の煩わしさ(音声が切れる・映像が乱れる)はあるものの、致命的に困ることはない。 たまに事務処理などで、どうしても紙の書類を所定の場所にだすような作業が発生するが、そこはPDFを作って提出するところだけ会社にいる同僚にお願いしている(申し訳ない)。 とはいえ、3月4月はこまごまとした用事で東京に行くことがしばしばあったので、特に用事もなくなる5月以降、なにか困ることは出てくるのかもしれない…


生活では、特にロケーションを感じることはない。 リモートワーク本格化以降、日がな一日家のデスクに座って壁に向かい合って仕事をするような感じなので、引っ越し前と後であんまり感覚的な差がない。 宅配便の受け取りや、コンビニ行ったときにの店員さんのイントネーションの違いで「そういえば関西だっけ」と意識するぐらいである。 洛中に暮らすと、もう少し感じるものが違ったんだろうか?

そういえば、京都で会う人会う人「冬は寒いし夏は暑いですよ」と言われたが、3月4月のせいかまだ暑い寒いと感じることはない。 むしろ、前に住んでいた部屋が、部屋にいながら寒いというものだったので、部屋が暖かいのが素晴らしい、などと思ってしまっている。 あと、いろんなところで言われていたぶぶ漬け的な何かについても、今のところ特に遭遇してはいない。 役所の人も親切だし、お店の人も親切だし。まあ、鈍いだけなのかもしれないけれど。


バイクについては、この間バイクをこっちに持ってきたばかりなのであまり走り回ってはいないけれど、1時間もかからずに山に行けて道の駅でソフトクリームを食べれたりするので、なかなか楽しめそうだなーと思っている。 最初に京都に下見に来たとき「山の地方都市っぽいな」と思っていたんだけれど(松本とか渋川とか)そんな感じですぐ行けば山、という環境がよい。 関東に住んでいたときは、どこかに出かけるときはとにかく首都高・高速を挟む必要があり、たとえば箱根に行くときは東名を抜けて小田原厚木道路を抜けたときには「やっとついたー」という感じだったのだけれど、京都はシームレスに山につながっている感じなので、その辺がなかなか面白い。 一方で、南のほうは大阪までずっと市街地が続いているのも面白い。関東でも神奈川の奥の方とか、八王子に住んだりすると、こんな感覚なんだろうか? とにかくバラエティに富んでいて、旅に出かけたくなる。


という感じで、今のところ京都でも支障なく暮らしてはいます。 まだきちんと会社として制度化はされていない(と、思う)のだれど、とりあえずモデルケースとして、今後もいろんなところで働ける人が増えるといいなぁ、となんとなく考えている(数は把握してないけど、完全リモートでやっている人は、僕以外にもいます)。

ので、まあ関西在中で弊社に興味があるかたは、お声がけいただければどういう感じで仕事をしてるか、みたいな話はできると思います。SREまわりしか分からないけど。 完全リモートワークができるかは分からないけど、別に作業に支障がないなら許可は下りる…と思う。たぶん。 あと、単純に酒飲みたいので、暇な方は声かけてください。

AWS Lambda Go without Go

とある勉強会用にLTネタを作っていたのですが、発表できなくなったので腐る前にブログに書いておきます。

お前は何を(ry

記事のタイトルについて お前は何を言っているんだ なのですが、元ネタは以下のツイートです。

LTのネタになりそうだったのでやってみたわけです。

aws-lambda-goについて

じゃあ、まあaws-lambda-goは一体どういう仕組みで動いているんだろうと、ソースを読んでみました。

で、entry.gofunction.goあたりでだいたい分かりましたが、net/rpcパッケージをつかってFunction#InvokeFunction#Pingを呼んでいる感じでした。

ミニマムなハンドラ

必要なrpcのメソッドさえ実装すればaws-lambda-goがなくても動きます。 いろいろと削ってみて、ほぼ最小のコードは以下のようになりました。

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net"
    "net/rpc"
    "os"
)

type PingRequest struct {
}

type PingResponse struct {
}

type Function struct {
    // handler lambdaHandler
}

type InvokeRequest struct {
    Payload []byte
    //RequestId             string
    //XAmznTraceId          string
    //Deadline              InvokeRequest_Timestamp
    //InvokedFunctionArn    string
    //CognitoIdentityId     string
    //CognitoIdentityPoolId string
    //ClientContext         []byte
}

type InvokeResponse struct {
    Payload []byte
    //Error   *InvokeResponse_Error
}

func (fn *Function) Ping(req *PingRequest, res *PingResponse) (err error) {
    *res = PingResponse{}
    return
}

func (fn *Function) Invoke(req *InvokeRequest, response *InvokeResponse) error {
    response.Payload, _ = json.Marshal(100)
    return nil
}

func main() {
    port := os.Getenv("_LAMBDA_SERVER_PORT")

    l, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", port))

    if err != nil {
        log.Fatal(err)
    }

    f := new(Function)
    rpc.Register(f)
    rpc.Accept(l)
    log.Fatal("accept should not have returned")
}

構造体のメンバを結構削っても、一応動くんですよね…

ローカルでハンドラを動かす

ハンドラはrpcのメソッドを呼ばれてるだけなので、ローカルから実行することもできます。

  • ハンドラ
package main

import (
    "github.com/aws/aws-lambda-go/lambda"
)

func hello(event interface{}) (int, error) {
    return 1, nil
}

func main() {
    lambda.Start(hello)
}
  • ローカル実行用のクライアント
package main

import (
    "fmt"
    "github.com/aws/aws-lambda-go/lambda/messages"
    "log"
    "net/rpc"
)


func ping(client *rpc.Client) {
    req := &messages.PingRequest{}
    var res *messages.PingResponse

    err := client.Call("Function.Ping", req, &res)

    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Ping: %v\n", *res)
}

func invoke(client *rpc.Client) {
    req := &messages.InvokeRequest{Payload: []byte("{\"foo\":100}")}
    res := messages.InvokeResponse{}

    err := client.Call("Function.Invoke", req, &res)

    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Invoke: %v\n", string(res.Payload))
}

func main() {
    client, err := rpc.Dial("tcp", "localhost:1234")

    if err != nil {
        log.Fatal("dialing:", err)
    }

    ping(client)
    invoke(client)
}

実行はこんな感じで

_LAMBDA_SERVER_PORT=1234 ./hello
./client
Ping: {}
Invoke: 1

RustのハンドラをAWS Lambda Goで動かす

それで本題なのですが、net/rpcパッケージはGoに特化しているとはいえ、シリアライズされたデータをネットワーク経由でやりとりしているので、やろうと思えばほかの言語とも通信ができるはずです。 ただシリアライズにつかっているエンコーディングgobで、さすがにこのエンコーダの他言語実装を見つけることはできませんでした。

仕方ないので、net/rpcのサーバ・クライアント間にプロキシ立ててパケットをキャプチャして流れているデータを調べた上で、そのデータをそのまま返すようなサーバを作ってみました。

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::Read;
use std::io::Write;
use std::env;

fn handle_client(mut stream: TcpStream) {
    let mut buf;
    loop {
        buf = [0; 2048];
        let _ = match stream.read(&mut buf) {
            Err(e) => panic!("Got an error: {}", e),
            Ok(m) => {
                if m == 0 {
                    break;
                }
                m
            }
        };

        let s = String::from_utf8_lossy(&buf);
        let ret: &[u8];

        if s.contains("Ping") {
            ret = b":\xFF\x81\x03\x01\x01\x08Response\x01\xFF\x82\x00\x01\x03\x01\rServiceMethod\x01\x0c\x00\x01\x03Seq\x01\x06\x00\x01\x05Error\x01\x0c\x00\x00\x00\x12\xFF\x82\x01\rFunction.Ping\x00\x18\xFF\x83\x03\x01\x01\x0cPingResponse\x01\xFF\x84\x00\x00\x00\x03\xFF\x84\x00";
        } else {
            ret = b"\x16\xFF\x82\x01\x0FFunction.Invoke\x01\x01\x00(\xFF\x85\x03\x01\x01\x0EInvokeResponse\x01\xFF\x86\x00\x01\x01\x01\x07Payload\x01\n\x00\x00\x00\x08\xFF\x86\x01\x03111\x00";
        }

        match stream.write(ret) {
            Err(_) => break,
            Ok(_) => continue,
        }
    }
}

fn main() {
    let port = env::var("_LAMBDA_SERVER_PORT").unwrap();

    let listener = TcpListener::bind(format!("localhost:{}", port)).unwrap();
    for stream in listener.incoming() {
        match stream {
            Err(e) => println!("failed: {}", e),
            Ok(stream) => {
                thread::spawn(move || handle_client(stream));
            }
        }
    }
}

コードはRust Echo Server Example | Andrei Vacariu, Software Developerをほぼそのままコピーしています。 飛んできたメッセージを無理矢理Stringにして「Ping」という文字が入っていたらPing用のデータ、それ以外のデータはInveke用のデータを返すようにしています。

これ、linux-amd64でビルドしてLambdaにGolangとして登録すると、普通に動きます(「111」という値が返ってきます)

ということで、AWS Lambda Goはほかの言語でも動きます! やったぜ!

その他

Goに特化したrpcに対応するなら、先にgRPCに対応してもよかったのでは…と思わなくもない。

追記

そういえばRustはstatic linkにしてません。 ぱっとリンクしているライブラリを調べてみると

[ec2-user@ip-10-0-1-204 release]$ ldd hello
    linux-vdso.so.1 =>  (0x00007fff055a1000)
    libdl.so.2 => /lib64/libdl.so.2 (0x00007fee696d1000)
    librt.so.1 => /lib64/librt.so.1 (0x00007fee694c9000)
    libpthread.so.0 => /lib64/libpthread.so.0 (0x00007fee692ad000)
    libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00007fee69097000)
    libc.so.6 => /lib64/libc.so.6 (0x00007fee68cd3000)
    /lib64/ld-linux-x86-64.so.2 (0x00007fee69b4f000)

こんな感じでした。 まあGoでもオプション指定しないとdynamic linkになるので、libcくらいは使えますね…と。

ProxySQLをつかったRDSの切り替え

ProxySQLとは

ProxySQLはMySQL用のL7のプロキシサーバで、プロキシサーバのレイヤでR/W Splittingできたり、クエリの書き換えをできたり、負荷分散などができたりする便利ミドルウェアです。

www.proxysql.com

Dropboxの中の人が書いているみたいで、Perconaの推しミドルウェアみたいです。(開発にも関わっているのかな?)

あとQiitaにもいくつか記事が上がってます。

https://qiita.com/search?q=ProxySQL

設定の管理が結構独特で、MySQLっぽく振る舞うsqliteで管理されていて、動的にバックエンドのサーバを書き換えたりすることができます。設定まわりの概念的なものは『ProxySQL触ってみた - Qiita』がわかりやすいかも。

動的なバックエンドの切り替え

管理用インターフェースに対して、以下のようなクエリを流すと、バックエンドが瞬間的に切り替わります。

# 元の設定
$ mysql -uadmin -p -S /tmp/proxysql_admin.sock
mysql> select * from runtime_mysql_servers;
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
| hostgroup_id | hostname                                           | port | status | weight | compression | max_connections | max_replication_lag | use_ssl | max_latency_ms | comment |
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
| 0            | mydb.xxx.ap-northeast-1.rds.amazonaws.com | 3306 | ONLINE | 1      | 0           | 10000           | 0                   | 0       | 0              |         |
+--------------+-------------------------------------------+------+--------+--------+-------------+-----------------+---------------------+---------+----------------+---------+
delete from mysql_servers;
insert into mysql_servers (hostname, status, max_connections)
values
('mydb.xxx.ap-northeast-1.rds.amazonaws.com', 'offline_hard', 10000),
('mydb2.xxx.ap-northeast-1.rds.amazonaws.com', 'online', 10000);

load mysql servers to runtime; /* ここで実際の設定が書き換わる */

RDSの瞬間的な切り替え

RDSのフェイルオーバーは通常、1〜2分で完了し、フェイルオーバー後はエンドポイントのIPアドレスが変わります。大抵のユースケースでは問題ないと思うんですが、「瞬間的な切り替えにしたい」とか「DNSTTLに依存したくない」等というときに、間にProxySQLを挟むことで(+別クラスタのスレーブを用意することで)瞬間的にフェイルオーバーすることができます。*1

サーバ構成

f:id:winebarrel:20171230123633p:plain

上図のmydb2mydbのリードレプリカ、という訳ではなくてスナップショットからリストアしたRDSをmysql.rds_set_external_masterレプリケーションしたものです。

クラスタのスレーブは以下の手順で作ります。

  1. mydbのリードレプリカを作る
  2. リードレプリカのレプリケーションを止める
  3. レプリケーションのポジションを記録する
  4. リードレプリカのスナップショットをとる
  5. スナップショットからリストアしてmydb2を作る
  6. 記録したポジションで、mydb2mysql.rds_set_external_masterを実行する

リードレプリカをpromoteしてmysql.rds_set_external_masterを実行すれば良さそう、、、な感じなんですが、promoteしたリードレプリカでmysql.rds_set_external_masterを実行すると、マスタのホスト名が172...と内部のIPにすり替わる、という現象が発生したのでやらない方がよいかと。mysql.rds_set_external_masterの実行前にmysql.rds_reset_external_masterが必要だったりするのを見るに、元のクラスタの情報がきちんと初期化されないのかも。

切り替えスクリプト

#!/usr/bin/env ruby
require 'logger'
require 'mysql2'
require 'open3'
require 'optparse'

class MySQL
  def initialize(host:, port:, user:, pass:, dry_run:, logger:)
    @client = Mysql2::Client.new(host: host, port: port.to_i, username: user, password: pass)
    @host = host
    @dry_run = dry_run
    @logger = logger
  end

  def enable_log_bin
    query('SET SQL_LOG_BIN = 1', force: true)
  end

  def disable_log_bin
    query('SET SQL_LOG_BIN = 0', force: true)

    if block_given?
      begin
        yield
      ensure
        begin
          enable_log_bin
        rescue => e
          @logger.warn(e.message)
        end
      end
    end
  end

  def enable_read_only
    query('SET GLOBAL READ_ONLY = 1')
  end

  def disable_read_only
    query('SET GLOBAL READ_ONLY = 0')
  end

  def read_only?
    query('SELECT @@read_only AS read_only').first.fetch('read_only').nonzero?
  end

  def enable_read_only_and_check
    enable_read_only

    if !@dry_run && !read_only?
      raise "[#{@host}] Failed to enable read only!"
    end
  end

  def find_users(user)
    query("SELECT user, host FROM mysql.user where user = '#{user}'", force: true).map {|row|
      "'#{row.fetch('user')}'@'#{row.fetch('host')}'"
    }.uniq
  end

  def drop_user(user)
    users = find_users(user)

    if users.empty?
      raise "Cannot find drop user: #{user}"
    end

    grants = users.flat_map do |user|
      privs = show_grants(user)
      query("DROP USER #{user}")
      privs
    end

    query('FLUSH PRIVILEGES')
    grants
  end

  def show_grants(user)
    query("SHOW GRANTS FOR #{user}", force: true).flat_map(&:values)
  end

  def create_user(grants, password:)
    grants.each do |sql|
      sql.sub!(/IDENTIFIED\b.*/, '')
      sql << " IDENTIFIED BY '#{password}'"
      query(sql)
    end
    query('FLUSH PRIVILEGES')
  end

  def while_threads(time_until, tstart: Time.now, user: nil)
    threads = get_threads_util(user: user)

    while time_until > 0 && threads.length > 0
      if (time_until % 5 ).zero?
        @logger.info 'Waiting all running %d threads are disconnected.. (max %d milliseconds)' % [threads.length + 1, time_until * 100]

        if threads.length < 5
          threads.each {|th| @logger.info th.inspect }
        end
      end

      sleep_until(tstart)
      tstart = Time.now
      time_until -= 1

      threads = get_threads_util(user: user)
    end

    return [threads, tstart]
  end

  def kill_threads(threads)
    threads.each do |th|
      begin
        thread_id = th.fetch('Id')
        @logger.info("KILL #{th}")
        query("CALL mysql.rds_kill(#{thread_id})")
      rescue Mysql2::Error => e
        raise e if e.error_number != 1094 # MYSQL_UNKNOWN_TID
      end
    end
  end

  def show_master_status
    query('SHOW MASTER STATUS', force: true, hidden: true).first
  end

  def show_slave_status
    query('SHOW SLAVE STATUS', force: true, hidden: true).first
  end

  def wait_replication(master, interval: 0.1)
    master_log_file, master_log_pos, slave_log_file, slave_log_pos = update_repl_info(master)

    @logger.info 'Waiting replication..'
    @logger.info "master: file=#{master_log_file} pos=#{master_log_pos}"
    @logger.info "slave: file=#{slave_log_file} pos=#{slave_log_pos}"

    #return if @dry_run

    loop do
      if master_log_file == slave_log_file && master_log_pos == slave_log_pos
        @logger.info 'Caught up!'
        @logger.info "master: file=#{master_log_file} pos=#{master_log_pos}"
        @logger.info "slave: file=#{slave_log_file} pos=#{slave_log_pos}"
        break
      end

      sleep interval
      master_log_file, master_log_pos, slave_log_file, slave_log_pos = update_repl_info(master)
      #_, _, slave_log_file, slave_log_pos = update_repl_info(master)
    end
  end

  private

  def query(sql, force: false, hidden: false)
    unless hidden
      log_msg = "[#{@host}] #{sql}"
      log_msg << ' (dry-run)' if @dry_run
      @logger.info log_msg
    end

    if force || !@dry_run
      @client.query(sql)
    end
  end

  def get_threads_util(running_time_threshold: 0, type: 0, user: nil)
    my_connection_id = @client.thread_id
    threads = []

    query('SHOW PROCESSLIST', force: true, hidden: true).each do |row|
      th_id         = row.fetch('Id')
      th_user       = row.fetch('User')
      th_host       = row.fetch('Host')
      th_command    = row.fetch('Command')
      th_state      = row.fetch('State')
      th_query_time = row.fetch('Time')
      th_info       = row.fetch('Info')

      th_info.sub!(/\A\s*(.*?)\s*\Z/) { $1 } if th_info

      next if my_connection_id == th_id
      next if th_query_time && th_query_time < running_time_threshold
      next if th_command && th_command == 'Binlog Dump'
      next if th_user && th_user == 'system user'
      next if user && th_user != user

      unless user
        if th_command && th_command == 'Sleep' && th_query_time && th_query_time >= 1
          next
        end
      end

      if type >= 1
        next if th_command && th_command == 'Sleep'
        next if th_command && th_command == 'Connect'
      end

      if type >= 2
        next if th_info && th_info =~ /\Aselect/i
        next if th_info && th_info =~ /\Ashow/i
      end

      threads << row
    end

    return threads
  end

  def sleep_until(tstart, running_interval: 0.1)
    elapsed = Time.now - tstart

    if running_interval > elapsed
      sleep(running_interval - elapsed)
    end
  end

  def update_repl_info(master)
    master_status = master.show_master_status
    slave_status = show_slave_status
    master_log_file = master_status.fetch('File')
    master_log_pos = master_status.fetch('Position')
    slave_log_file = slave_status.fetch('Relay_Master_Log_File')
    slave_log_pos = slave_status.fetch('Exec_Master_Log_Pos')
    [master_log_file, master_log_pos, slave_log_file, slave_log_pos]
  end
end

def parse_options(argv)
  options = argv.getopts(
    '',
    'orig-host:',
    'orig-port:',
    'orig-user:',
    'orig-pass:',
    'new-host:',
    'new-port:',
    'new-user:',
    'new-pass:',
    'drop-user:',
    'drop-user-pass:',
    'failover:',
    'execute'
  )

  if options.values.any?(&:nil?)
    raise "not enough options: #{options}"
  end

  Hash[options.map {|k, v| [k.tr(?-, ?_).to_sym, v] }]
end

def failover(script, logger, dry_run:)
  logger.info 'Failing over..'
  logger.info "Execute #{script}"

  unless dry_run
    out, err, status = Open3.capture3(script)
    logger.info out unless out.empty?
    logger.error err unless err.empty?
    raise err unless status.success?
  end
end

def main(argv)
  logger = Logger.new($stdout)
  options = parse_options(argv)

  if options.fetch(:execute)
    logger.info '!!! execute mode !!!'
  else
    logger.info '*** dry-run mode ***'
  end

  orig_mysql = MySQL.new(
    host: options.fetch(:orig_host),
    port: options.fetch(:orig_port),
    user: options.fetch(:orig_user),
    pass: options.fetch(:orig_pass),
    dry_run: !options.fetch(:execute),
    logger: logger)

  new_mysql = MySQL.new(
    host: options.fetch(:new_host),
    port: options.fetch(:new_port),
    user: options.fetch(:new_user),
    pass: options.fetch(:new_pass),
    dry_run: !options.fetch(:execute),
    logger: logger)

  #logger.info 'Set read_only on the new master..'
  #new_mysql.enable_read_only_and_check

  #orig_mysql.disable_log_bin do
    drop_user = options.fetch(:drop_user)
    grants = orig_mysql.drop_user(drop_user)
    threads, tstart = orig_mysql.while_threads(15, user: drop_user)
    #orig_mysql.enable_read_only_and_check
    threads, tstart = orig_mysql.while_threads(5, tstart: tstart, user: drop_user)
    logger.info 'Killing all application threads..'
    orig_mysql.kill_threads(threads) if threads.length > 0
  #end

  sleep 1
  new_mysql.wait_replication(orig_mysql)

  new_mysql.create_user(grants, password: options.fetch(:drop_user_pass))

  failover(options.fetch(:failover), logger, dry_run: !options.fetch(:execute))

  #new_mysql.disable_log_bin do
  #  logger.info 'Set read_only=0 on the new master.'
  #  new_mysql.disable_read_only
  #end
end

main(ARGV)

MHA用スクリプトの雑な流用です。 以下のような感じで使います。

#!/bin/bash
ORIG=mydb.xxx.ap-northeast-1.rds.amazonaws.com
NEW=mydb2.xxx.ap-northeast-1.rds.amazonaws.com
MYSQL_USER=root
MYSQL_PASS='...'

./mysql-online-switch.rb \
  --orig-host=$ORIG --orig-port=3306 --orig-user=$MYSQL_USER --orig-pass="$MYSQL_PASS" \
   --new-host=$NEW  --new-port=3306  --new-user=$MYSQL_USER  --new-pass="$MYSQL_PASS" \
  --drop-user=app_mysql_user --drop-user-pass='...' --failover ./switch.sh  #--execute

--executeオプションをつけないとdry-runモードで起動します

やっていることは

  1. RDSのアプリ用ユーザを削除する
  2. 当該ユーザの既存セッションをKILLする
  3. レプリケーションが追いつくのを待つ
  4. 移行先に新しくアプリ用ユーザを作る(レプリケーションで削除されるので)
  5. 切り替えスクリプトを実行

切り替えスクリプトは前述のSQLを実行するだけのものです。

#!/bin/bash
cat <<SQL | mysql -S /tmp/proxysql_admin.sock -u admin -pPASSWORD 2> /dev/null
delete from mysql_servers;
insert into mysql_servers (hostname, status, max_connections)
values
('mydb.xxx.ap-northeast-1.rds.amazonaws.com', 'offline_hard', 10000),
('mydb2.xxx.ap-northeast-1.rds.amazonaws.com', 'online', 10000);
load mysql servers to runtime;
SQL

実行結果

感じで切り替わります。

$ ./mysql-online-switch.rb ...
I, [2017-10-01T11:30:04.277194 #23953]  INFO -- : !!! execute mode !!!
I, [2017-10-01T11:30:04.365187 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] SELECT user, host FROM mysql.user where user = 'app_mysql_user'
I, [2017-10-01T11:30:04.365557 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] DROP USER 'app_mysql_user'@'%'
I, [2017-10-01T11:30:04.365594 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] FLUSH PRIVILEGES
I, [2017-10-01T11:30:04.377357 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 1500 milliseconds)
I, [2017-10-01T11:30:04.876041 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 1000 milliseconds)
I, [2017-10-01T11:30:05.377405 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 500 milliseconds)
I, [2017-10-01T11:30:05.887999 #23953]  INFO -- : Waiting all running 100 threads are disconnected.. (max 500 milliseconds)
I, [2017-10-01T11:30:06.379264 #23953]  INFO -- : Killing all application threads..
I, [2017-10-01T11:30:06.379462 #23953]  INFO -- : KILL {"Id"=>72515760, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:57520", "db"=>"mydb", "Command"=>"Sleep", "Time"=>401, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379504 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72515760)
I, [2017-10-01T11:30:06.379632 #23953]  INFO -- : KILL {"Id"=>72517447, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:57536", "db"=>"mydb", "Command"=>"Sleep", "Time"=>38, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379668 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72517447)
I, [2017-10-01T11:30:06.379754 #23953]  INFO -- : KILL {"Id"=>72523822, "User"=>"app_mysql_user", "Host"=>"10.0.0.100:58008", "db"=>"mydb", "Command"=>"Sleep", "Time"=>48, "State"=>"", "Info"=>nil}
I, [2017-10-01T11:30:06.379790 #23953]  INFO -- : [mydb.xxx.ap-northeast-1.rds.amazonaws.com] CALL mysql.rds_kill(72523822)
...
I, [2017-10-01T11:30:07.402341 #23953]  INFO -- : Waiting replication..
I, [2017-10-01T11:30:07.402428 #23953]  INFO -- : master: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402455 #23953]  INFO -- : slave: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402476 #23953]  INFO -- : Caught up!
I, [2017-10-01T11:30:07.402511 #23953]  INFO -- : master: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402561 #23953]  INFO -- : slave: file=log-bin.000001 pos=168900837
I, [2017-10-01T11:30:07.402605 #23953]  INFO -- : [mydb2.xxx.ap-northeast-1.rds.amazonaws.com] GRANT USAGE ON *.* TO `app_mysql_user` IDENTIFIED BT '...'
...
I, [2017-10-01T11:30:07.402605 #23953]  INFO -- : Failing over..
I, [2017-10-01T11:30:07.402653 #23953]  INFO -- : Execute ./switch.sh

まとめ

MHA便利。 ProxySQL便利。

OFFLINE_SOFTについて

OFFLINE_HARDに対してOFFLINE_SOFTというステータスがあります。 これは、アクティブなトランザクションとコネクションはそのままで、新しいトラフィックは新しい別のバックエンドに送信するというものです。

コネクション張りっぱなしでも、一応、gracefulに新しいバックエンドに切り替えてくれるのですが

という感じで、トランザクションはさておき、セッションを操作すると、当たり前ですがバックエンドの切り替えは行われないようでした。 RailsでこのOFFLINE_SOFTを無理に活用しようと思うと、接続時にセッション変数を書いたり読んだりしているところを潰して、全部グローバル変数とかmy.cnfに設定する…とか? リクエストごとにコネクション切るような設計でないとあんまり活用できない気がしますね。

*1:もちろんクライアント側の再接続などのケアは必要ですが