こんにちはミツモアエンジニアの ton です。
今回は PGSync というツールとそれを安全に運用するための工夫をご紹介したいと思います。
PGSync とは
PGSyncとは PostgreSQL のデータをほぼリアルタイムで Elasticsearch に同期するためのミドルウェアツールです。
- Web サイト https://pgsync.com
- GitHub https://github.com/toluaina/pgsync
※ ankane/pgsyncという同名のツールがありますがそちらではありません。
PGSync を使うことで、以下のようなメリットがあります。
- リアルタイム同期: PostgreSQL の レコードの Insert/Update/Delete を直接検知するため、ほぼリアルタイムに Elasticsearch に反映されます。
- コード実装不要: 後述する schema を定義し PGSync を起動するだけで同期が開始されるため、同期のための実装をしなくてすみます。
- 同期漏れの排除: 手動で Elasticsearch に同期する場合しばしば同期漏れが発生しますが、PGSync は PostgreSQL の変更を監視しているため、同期漏れがなくなります。また、PGSync が起動していない間に更新されたデータも同期漏れが発生しないような工夫がされています。
schema 定義
データを同期するために必要なことは、schema 定義を書くことだけです。 以下のような schema 定義を記述して PGSync を起動した場合、自動的に book テーブルと author テーブルを監視します。
{ "table": "book", "columns": ["isbn", "title", "description"], "children": [ { "table": "author", "columns": ["name"] } ] }
データが Insert された際には PGSync は自動的に以下のようなクエリを生成してデータを取得します。
SELECT JSON_BUILD_OBJECT( 'isbn', book_1.isbn, 'title', book_1.title, 'description', book_1.description, 'authors', anon_1.authors ) AS "JSON_BUILD_OBJECT_1", book_1.id FROM book AS book_1 LEFT OUTER JOIN (SELECT JSON_AGG(anon_2.anon) AS authors, book_author_1.book_isbn AS book_isbn FROM book_author AS book_author_1 LEFT OUTER JOIN (SELECT author_1.name AS anon, author_1.id AS id FROM author AS author_1) AS anon_2 ON anon_2.id = book_author_1.author_id GROUP BY book_author_1.book_isbn) AS anon_1 ON anon_1.book_isbn = book_1.isbn
そして、取得したレコードを Elasticsearch に POST し、以下のようなドキュメントが Elasticsearch に作成されます。
[ { "isbn": "9785811243570", "title": "Charlie and the chocolate factory", "description": "Willy Wonka’s famous chocolate factory is opening at last!", "author": ["Roald Dahl"] }, { "isbn": "9788374950978", "title": "Kafka on the Shore", "description": "Kafka on the Shore is a 2002 novel by Japanese author Haruki Murakami", "author": ["Haruki Murakami", "Philip Gabriel"] }, { "isbn": "9781471331435", "title": "1984", "description": "1984 was George Orwell’s chilling prophecy about the dystopian future", "author": ["George Orwell"] } ]
( Table with single child node - PGSync からの引用 )
主な使い方は Web サイトを見ていただければ理解できますが、仕組みが少し分かりづらいので、まず仕組みを解説し、それに起因する問題点と工夫を紹介していきます。
PGSync の同期の仕組み
PGSync は 以下の 2 つの仕組みを駆使してデータを Elasticsearch に同期しています。
- PostgreSQL のトリガと LISTEN/NOTIFY によるリアルタイム通知
- PostgreSQL の論理レプリケーションによる変更のトラッキング
PGSync 起動時はリアルタイム通知を受け取り同期し、PGSync が停止している間に変更されたデータは論理レプリケーションのログから同期します。
トリガと LISTEN/NOTIFY によるリアルタイム通知
PostgreSQL にはトリガという機能があります。
トリガはテーブルのレコードが Insert/Update/Delete されたときに任意の処理を実行させることができます。
PGSync は初期セットアップ時にトリガ関数を作成し、同期したいテーブルにトリガを設定していきます。 これにより、同期したいテーブルにデータが Insert/Update/Delete されたときにトリガ関数が実行されます。 (トリガ関数の中身 は trigger.py)
また、トリガ関数の中では NOTIFY を実行しています。PostgreSQL には LISTEN/NOTIFY という非同期通知の仕組みがあり、 PGSync はトリガ関数内で実行される NOTIFY を LISTEN することでリアルタイムにデータ変更通知を受け取ることができます。
論理レプリケーションによる変更のトラッキング
PostgreSQL の論理レプリケーションは、データベースの変更を他のシステムに転送するための機能です。 物理レプリケーションと異なり、テーブル単位でレプリケートすることができます。
PGSync では定期的に論理レプリケーションのログを確認し、どこまで同期を行ったかトラッキングしています。 もし PGSync が予期せず終了してしまいその間にデータが変更された場合でも、論理レプリケーションのログにはその内容が記録されています。 次に PGSync を起動した際に、最後に同期したポイントからログを読み込み未同期のデータを全て同期するようになっています。
PGSync を ECS(Fargate) で運用する際のポイント
PGSync の仕組みを紹介しましたが、ECS(Fargate) で運用するにあたっていくつか考慮しなければならないポイントがあります。
checkpoint ファイル
論理レプリケーションによってどこまで同期したのかをトラッキングしていると説明しましたが、PGSync はそれを checkpoint ファイルというテキストファイルで管理しています。
デフォルトでエフェメラルストレージが割り当てられる Fargate では、タスクが切り替わるたびにファイルが消失してしまうため問題です。
これは予め共有ストレージを作成して割り当てておくことで解決できます。
今回は Amazon EFS を使って常に同じ共有ストレージを割り当て、環境変数 CHECKPOINT_PATH
に共有ストレージをマウントしたパスを指定することで解決しました。
- Amazon EFS ファイルシステムを、Fargate 上の Amazon ECS コンテナまたはタスクにマウントする | AWS re:Post
- Environment variables - PGSync
PGSync の更新/再インデックス
PGSync を更新したい場合や、Elasticsearch のインデックスを更新したい場合、どのようにすればダウンタイムなしでデプロイすれば良いのか不明でした。
ちなみに PGSync のサイトには Re indexing のページがありますが、一度 Elasticsearch のインデックスと checkpoint ファイルを消してから PGSync を re-run しろとだけ書いてあります...
しかし実運用でそんなことは出来ないので、工夫してダウンタイムなしでデプロイする必要があります。
ゼロダウンタイムデプロイ
ここからは、PGSync のデプロイと Elasticsearch の再インデックスをダウンタイムなしでデプロイするための工夫について紹介します。
インデックス毎に PGSync を別タスクで起動する
PGSync には複数の Schema 定義を 1 つのファイルに記述し、1 つの PGSync で同期させることができます。
Multiple schemas in single config - PGSync
これは非常に楽ですが、個別のインデックス毎に更新できなくなるため、Schema 毎に別の Fargate タスクで起動するべきです。
Elasticsearch の Alias を使う
Elasticsearch には Alias という機能があります。これは文字通り、任意のインデックスのエイリアスを作成できるものです。
例えば、job というエイリアスを作成しておき、job_blue というインデックスを指すように指定しておきます。 すると、job に対して検索した場合、実際には job_blue に対して検索したのと同じ挙動になります。
一見意味のないように見えますが、ゼロダウンタイムデプロイには非常に重要です。
デプロイ時に新しく job_green というインデックスを作成し、job のエイリアスを job_green を指すように変更した場合、検索するクライアント側からすると常に job に対して検索しているように見えますが、裏では別のインデックスに切り替えることができます。 これによってクライアント側の修正なし、かつゼロダウンタイムでインデックスを切り替えることができます。
PGSync のデプロイ時にインデックス名を書き換える
上述の Alias を使うため、デプロイ時に別のインデックス名にする必要があります。
そこで、ECS のタスク定義にINDEX_SUFFIX: blue
のように環境変数をセットし、
起動した際にスクリプトで Schema 定義内のインデックス名に INDEX_SUFFIX
を追加して動的に名前を変更できるようにしました。
Schema 定義内のインデックス名が job
となっている場合、INDEX_SUFFIX: blue
なら job_blue
に、INDEX_SUFFIX: green
なら job_green
になってインデックスが作成されます。
外部デプロイメントコントローラーでブルー/グリーンデプロイを行う
これでダウンタイムなしでデプロイするための準備が整いました。
次は ECS の PGSync タスクをブルー/グリーンデプロイする必要があります。
ブルー/グリーンデプロイメントとは、現状の環境(ブルー)を起動したまま、新しい環境(グリーン)を起動、並列して起動した状態でロードバランサーなどの向き先を新しい環境に切り替えてダウンタイムなしでデプロイする方法のことを言います。
ECS は標準でブルー/グリーンデプロイできるのですが、今回は向き先の切り替え対象がロードバランサーではなく Elasticsearch の Alias であるため、少し特殊な方法が必要です。
それが外部デプロイメントコントローラーです。
あらかじめ ECS のデプロイ方法を外部デプロイメントコントローラーにしておくと、ブルー/グリーンデプロイを AWS API で制御しながら実行することができます。
詳しい方法は AWS の記事に記載がありますが、以下の AWS CLI コマンド群で簡単にブルー/グリーンデプロイを制御することが出来ます。
aws ecs create-task-set // グリーン環境を起動 aws ecs update-service-primary-task-set // グリーン環境をプライマリに設定 aws ecs delete-task-set // ブルー環境を停止
ざっくりとした流れは以下の gif のイメージです。
以下にもう少し詳細な手順を解説します。
1. 新しい Schema 定義を含む PGSync の Docker イメージをビルドして ECR にプッシュ
これは通常の ECS デプロイの準備と同様です。
2. 次にデプロイするインデックス名を決定してタスク定義を作成する
現在動作している PGSync タスク定義から INDEX_SUFFIX
環境変数を取得します。
現在が blue なら次は green、green なら次は blue になるようにしています。
こうしてインデックスが blue と green 交互に入れ替わるように INDEX_SUFFIX
を決定し、新しい INDEX_SUFFIX
でタスク定義を作成します。
3. 新しい PGSync タスクを起動する
通常の ECS のデプロイでは新しいタスクが起動してから古いタスクが終了するまでを自動でやってくれますが、
外部デプロイメントコントローラーの場合、aws ecs create-task-set
で新しいタスクを起動すると、新旧のタスクが並列で起動したままになります。
ここで PGSync のセットアップを行い、新しい論理レプリケーションが作成され Elasticsearch にも新しいインデックスが作成されます。
この段階ではインデックスが作成され PGSync による同期もされていますが、エイリアスは古いインデックスを指したままなので、検索に影響はありません。
ここで新しいインデックスに直接検索クエリを投げることで内部で動作確認を行うことができます。
4. Elasticsearch の Alias を切り替える
Elasticsearch の Aliases API で add と remove を 1 回の POST で行うとエイリアスを切り替えることができます。
切り替えた瞬間から新しインデックスに対して検索されるようになるため、切り替えたらすぐに動作確認をします。
不具合が見つかった場合、エイリアスをもう一度古いインデックスに切り替えるとすぐにロールバックすることができます。
5. ECS のプライマリタスクを切り替える
まだ新旧両方のタスクが起動したままで、古いタスクがプライマリとして設定されています。
aws ecs update-service-primary-task-set
コマンドで新しくデプロイした PGSync タスクをプライマリとして設定します。
6. 旧タスクを停止する
aws ecs delete-task-set
で古いタスクを停止します。
これで外部デプロイコントローラーによる手動でのブルー/グリーンデプロイは完了です。
7. 後片付け
タスクは停止しましたが、Elasticsearch の古いインデックスや論理レプリケーションは残ったままなので削除します。
これでデプロイ完了です!
現在はこの一連の流れを CircleCI の Trigger Pipeline と Approve 機能を使ってポチポチしながらデプロイできるようになっています。
まとめ
PGSync の仕組みと運用・デプロイ方法の工夫について紹介しました。
非常に便利な反面、実運用に持っていくまでの構築がなかなか大変でした。
PGSync も外部デプロイコントローラーもあまり資料がなく手探りでしたが、仕組みは非常に面白く色々と勉強になって楽しかったです!
ミツモアでは様々な職種のエンジニアを積極的に募集しています。
ご興味がある方はぜひ気軽にお話ししましょう!