S3 から BigQuery へ簡単にデータをロードする仕組み

こんにちは。データエンジニアリンググループの川崎です。まだここにない出会いを求めて、日々コードを書いています。

この記事では、 S3 から BigQuery へデータをロードするときに使っている汎用的な仕組みについて紹介したいと思います。

なぜそんな仕組みが必要なの?

弊社では、日時のバックアップや外部の協働者とのデータの受け渡しのために AWS の S3 をよく使っています。
一方でデータの分析には Redshift や BigQuery を使っています。
そのため受け取ったデータを分析するには適宜ロードする必要があるのですが、そのロードの方法がバラバラだったため、いくつか管理上の問題がありました。

データを取り込む手法がバラバラでメンテナンスしづらい

S3 のファイルを BigQuery へロードするには、いろいろな方法があります。
たとえば単純なコマンド(sed や gsutil や bq コマンドなど)を組み合わせたシェルスクリプトを実行する方法でもよいですし、
もう少しスマートにするなら、Embulk とデータの整形をする幾つかのプラグインと設定ファイルが入った Docker イメージを用意しておいて、
データの準備ができたら docker run コマンドを実行する方法でも実現できます。

どの方法が良い、悪い、という話は一旦置いておきますが、ファイルによってデータのロード方法が異なると、エラーを検知する方法やリトライの方法も異なってしまい、運用が煩雑になってしまいます。

元データとロード先のデータの関係が、管理しづらい

これまで必要に応じて、その都度データをロードしていたため S3 のバケット名やファイル名と、それをロードした BigQuery のプロジェクト名やデータセット名やテーブル名に関連がありませんでした。

そのため BigQuery の特定のテーブルの中に怪しいデータが紛れ込んでいた場合、調査しようにも、そのテーブルの元データがどこにあって、どうやってロードされているか、
社内のドキュメント管理システムや GitHub を検索する必要がありました。

どんな仕組みを作ったの?

これを解決するために、AWS の S3, SNS, SQS と、 GCP の Container Engine を使って、S3 のデータを BigQuery へロードするための汎用的な仕組みを作りました。

大まかには下図のような構成です。

概要図

この仕組みの利用方法を順番に説明します。

設定を GitHub に保存する

すべての設定は GitHub のリポジトリで管理されています。上図の左下のあたりです。
ここには BigQuery へロードするための設定や、エラーが発生した時に通知する Slack のチャネルなどの情報を JSON 形式で保存されています。

新しくファイルを BigQuery へ取り込みたい場合は、このリポジトリに feature/xxx (xxx は任意)という名前のブランチを作って、設定ファイルを追加して push します。

すると自動的に、その設定がテスト用の環境へデプロイされます。

設定に従って S3 にファイルをアップロードする

次に、テスト用の環境で S3 へファイルをアップロードして、正しく設定ができているか確認します。
上図の左上あたりです。

ファイルアップロードすると、そのファイルは自動的に GCS へコピーされ、さらに BigQuery へロードされます。

もし何か設定が間違っていれば、下図のようなメッセージが Slack へ飛んできます。

失敗した時の通知

ほとんど黒塗りでよくわからないですが、エラーの内容と、よくある設定ミスをまとめたページへのリンクを含んだメッセージが、指定した Slack のチャネルへ飛んできます。

master ブランチへプルリクエストを出す

テスト用の環境でうまくロードができたら、その設定を本番反映するために master ブランチへプルリクエストを出します。
そのプルリクエストは一瞬で担当者がレビューしてくれるので、問題がなければマージされ、すぐに本番環境へデプロイされます。

これで本番用の S3 へファイルをアップロードすると、自動的に BigQuery へロードできるようになりました。

工夫したところ

実装自体は簡単なのですが、いくつか工夫が必要なところがありました。

SQS に同じメッセージが複数送信される

実装の初期では AWS のスタンダードキューを使っていたためか、たまに同じメッセージが複数回送信されることがありました。
そこでロード処理をべき等にするために、コピーされた新しいファイルだけでなく、その日のパーティションにロードされるべきすべてのファイルをロードするように変更しました。
さらに、FIFO キューを試してみたところ、この問題は発生しなくなりました。

BigQuery の同一テーブルへのロードは 1 日 1000 回まで

BigQuery の制限で、1 日に同一テーブルへロードできるのは 1000 回までです。
1 日 1 回のコピーや 1 時間に 1 回のコピーでは問題ならないのですが、それ以上の頻度で頻繁にファイルをロードする場合には問題になることがあります。

たとえば、 Redshift の UNLOAD コマンドを使って S3 にファイルを出力すると、結果が複数のファイルに分割して書き込まれます。
このファイルを都度、BigQuery の同一のテーブルへロードしようとして、この制限を超えてしまい、ロードに失敗することがありました。

この問題の具体的な解決策はありませんが、 UNLOAD コマンドを実行する頻度を減らしてもらったり、
可能であれば PARALLEL OFF のオプションを指定にして、出力が 1 ファイルだけになるように調整してもらったりして回避しています。

デッドレターキューをちゃんと使う

上図にはありませんが、すべての SQS にはデッドレターキューが設定されています。

これのおかげで、意図しない理由(たとえば GCP の障害とか)でファイルのコピーに失敗したり、BigQuery のロードに失敗した場合に安全にリトライできるようになりました。

まとめ

BigQuery や Redshift は本当によくできたプロダクトなので、ここへデータをロードすれば、あとはよい SQL を書くだけで分析や集計ができます。
一方で、そこへ安定してデータをロードする方法は、チームやプロダクトによっていろいろ工夫する必要があるところかな、と感じています。

リクルートライフスタイルでは、必要に応じて複数のクラウドサービスを使い分けて開発したいエンジニアを募集しています。