目次
データ推進室のsaka1です。 最近はデータ分析用の社内Webアプリケーションの開発……だけでなく、さまざまな開発案件に関わっています。
この記事では、BigQuery(BQ)上のテーブルをローカルにダウンロードするシチュエーションを扱います。
リクルート社内ではさまざまなデータウェアハウス製品が用いられますが、特にBQは大規模利用が行われていてリソースも潤沢です。もし、全てのデータ処理がBQ上で完結していれば困らないのですが、なんだかんだで一部の処理や実験、アドホックな操作では、ローカルPCや特定のインスタンス上でデータ操作を行わなければならない事もあります。
そんなBQデータのローカルダウンロードをPythonで取り扱う時の話です。
この記事に書かれていること
※ 以下の文章の意味がわかる人は、この記事を読まなくて大丈夫です。
BQのPythonクライアントを使い、データをSELECTしてダウンロードする際には、デフォルトではBigQuery Storage APIが使われないので圧倒的に遅くなる。
google-cloud-bigquery-storageを実行環境に足すのが大事。
to_dataframe()は内部で自動的にBigQueryReadClientを検出してくれて圧倒的に高速になるし、データフレームによる高水準な操作が可能。ただしto_dataframe_iterable()には自動検出がないので、必ず引数bqstorage_clientを明示的に渡すこと。
背景・課題
BQからのエクスポート手段
BQからのデータ出力・ダウンロードには、大きく分けて次の選択肢があります。
- 少量のデータならなんとでもなる:
- 深く考えず
SELECTを投げたり、Google Cloudのコンソールから直接データを取ったりするのも選択肢となります
- 深く考えず
- 大量のデータならGoogle Cloud Storage(GCS)へエクスポートするのが定石:
- 公式ガイドの テーブルデータを Cloud Storage にエクスポートする にあるように、GCSに書き出してからダウンロード等の後続作業を行うと効率的です
筆者の経験則ですが、数千行ぐらいならどんな方法でも困りません。一方で数億行以上のデータとなると、GCS経由でデータを操作するのが無難です。GCSへのエクスポートは並列処理が効くらしく、BQが効率的に処理してくれます。
今回問題にするのは「ほどほど」にデータ量が多い場合です。データが100万行やそれぐらいのオーダーだと、あまり考えずにSELECTすると時間がかかってしまうが、GCSバケットを用意するほどビッグなデータかというと疑問に感じるレンジです。
簡易ベンチマーク用データの準備
ここからは具体的な説明をするため、適当なサンプルデータを作っておきます。100万行のBQテーブルです。
CREATE OR REPLACE TABLE `<プロジェクト>.<データセット>.performance_test_1m` AS
SELECT
-- 適当なID
GENERATE_UUID() AS id,
-- 適当な整数と文字列を数カラムずつ用意しておきダミーデータとする
CAST(FLOOR(RAND() * 10000000) AS INT64) AS i1,
CAST(FLOOR(RAND() * 10000000) AS INT64) AS i2,
CAST(FLOOR(RAND() * 10000000) AS INT64) AS i3,
SHA512(CAST(x * 100000000 AS STRING)) AS s1,
SHA512(CAST(x * 200000000 AS STRING)) AS s2,
SHA512(CAST(x * 300000000 AS STRING)) AS s3,
FROM
-- 100万行
UNNEST(GENERATE_ARRAY(1, 1000000)) AS x;
以降はこのテーブルに対するSELECT文の測定値を交えて紹介していきます。
本文中で使われている値は実測したものですが、あまり厳密なものではない点に注意してください。利用したBQ環境のジョブの混み具合で実行時間は変わりますし、ネットワークその他のノイズとなる要素は多数あります。
「ほどほどのデータ」をナイーブに扱うと遅い
このテーブルをローカルダウンロードする問題を考えます。PythonにはBQ用の公式クライアントライブラリがあるため、SELECT文を投げること自体は簡単です。例えばCSVの形でダウンロードしようとすると、ナイーブにはこんな実装になると思います。
from google.cloud import bigquery
PROJECT_ID = "your-project-id"
DATASET_ID = "your_dataset"
TABLE_ID = "performance_test_1m"
def sql_download_to_csv(project_id: str, dataset_id: str, table_id: str, n: int):
client = bigquery.Client(project=project_id)
table_ref = f"{project_id}.{dataset_id}.{table_id}"
query = f"SELECT * FROM `{table_ref}` LIMIT {n}"
query_job = client.query(query)
df = query_job.to_dataframe()
df.to_csv("output.csv", header=False, index=False, encoding="utf-8")
import sys
n = int(sys.argv[1])
sql_download_to_csv(PROJECT_ID, DATASET_ID, TABLE_ID, n)
to_dataframe()によってPandasデータフレームが得られるため、以降はデータフレームを用いた高水準な操作が可能です。
同時に、この実装のデータフェッチの遅さに驚くと思います。データを読み込む行数を変化させて、実行時間を測定しました。
| N | 実行時間(s) |
|---|---|
| 10000 | 14.17 |
| 20000 | 23.33 |
| 30000 | 36.00 |
こんな調子なので、数万行なら実用できそうとはいえ、100万行はちょっと実用的じゃないぐらいの時間がかかってしまいます(この伸び方だと数十分オーダーになりそうですね)。
解決策
google-cloud-bigquery-storageの導入は有効
上記の遅さを改善する方法は、google-cloud-bigquery-storageを使うことです。実際ライブラリは、前述のスクリプトのような使い方であれば警告を出してくれます。
UserWarning: BigQuery Storage module not found, fetch data with the REST endpoint instead.
google-cloud-bigquery-storageの使い方は簡単で、ただ実行環境に( pip install google-cloud-bigquery-storage 等によって)足すだけです。BQクライアントライブラリはgoogle-cloud-bigquery-storageを自動検出して、内部処理をBigQuery Storage APIを用いた効率的な実装に切り替えてくれます。gRPCやApache Arrowといった技術が採用されているようです。
| N | 実行時間(s) |
|---|---|
| 10000 | 4.93 |
| 20000 | 4.64 |
| 30000 | 5.87 |
| 1000000(1M) | 37.08 |
明らかに数倍は高速化していることと、Nに応じた実行時間になっていない事に注意してください。おそらくBQのジョブ実行など別の部分が実行時間において支配的になっていそうです。
ちなみにN=1000000(100万)のとき実行時間は約37秒でした。これなら数百万行を扱うのも現実的です。
to_dataframe_iterable()は要注意
to_dataframe() は全データをメモリに乗せるため、メモリ消費量を心配したくなります。工夫の余地はないでしょうか?
処理内容によっては、全てのデータをメモリに保持する必要はありません。例えばCSV出力は全体の情報がなくても実行できるので、ストリーム処理できるはずです。BQクライアントにおいては、DataFrameを細切れにして扱うAPIとして、 to_dataframe_iterable() が提供されています。これを使えば効率的なはずです。
from google.cloud import bigquery
def sql_download_to_csv(project_id: str, dataset_id: str, table_id: str, n: int):
client = bigquery.Client(project=project_id)
table_ref = f"{project_id}.{dataset_id}.{table_id}"
query = f"SELECT * FROM `{table_ref}` LIMIT {n}"
query_job = client.query(query)
with open("output.csv", "w", encoding="utf-8") as f:
for df_chunk in query_job.result().to_dataframe_iterable():
df_chunk.to_csv(f, header=False, index=False, encoding="utf-8")
同様にベンチマークしてみます。
| N | 実行時間(s) |
|---|---|
| 10000 | 16.99 |
| 20000 | 23.34 |
| 30000 | 37.54 |
google-cloud-bigquery-storage が効いていない結果になってしまいました。
実は、執筆時点のBQクライアントライブラリは、to_dataframe_iterable()時にBigQuery Storage Readerを自動検出してくれません。以下のように明示的に渡す必要があります。
from google.cloud import bigquery
from google.cloud import bigquery_storage
def sql_download_to_csv(project_id: str, dataset_id: str, table_id: str, n: int):
client = bigquery.Client(project=project_id)
storage_client = bigquery_storage.BigQueryReadClient() # storage_clientの明示的な生成
table_ref = f"{project_id}.{dataset_id}.{table_id}"
query = f"SELECT * FROM `{table_ref}` LIMIT {n}"
query_job = client.query(query)
with open("output.csv", "w", encoding="utf-8") as f:
# bqstorage_clientに生成したstorage_clientを渡す
for df_chunk in query_job.result().to_dataframe_iterable(bqstorage_client=storage_client):
df_chunk.to_csv(f, header=False, index=False, encoding="utf-8")
このちょっとした修正で実行時間の爆発を避けられます。
| N | 実行時間(s) |
|---|---|
| 10000 | 6.21 |
| 20000 | 6.20 |
| 30000 | 6.04 |
実装詳細を簡単に追う: 自動検出の詳細
to_dataframe() と to_dataframe_iterable() でなぜこんなに違いが出るのか、実装を追いつつ確認していきます。コードリーディングしたバージョンはgoogle-cloud-bigquery v3.38.0です。
より正確には to_dataframe() にだけ便利な自動検出実装が付いていると見るのが妥当かもしれません。to_dataframe() には多数の引数がありますが、注目は create_bqstorage_client です。
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
is a faster way to fetch rows from BigQuery. See the
``bqstorage_client`` parameter for more information.
This argument does nothing if ``bqstorage_client`` is supplied.
.. versionadded:: 1.24.0
デフォルトで BigQuery Storage API client を使うように構成されそうですね。
コードパス的にはcreate_bqstorage_client が True だと _should_use_bqstorage が第2引数 True で呼ばれます。
if not self._should_use_bqstorage(bqstorage_client, create_bqstorage_client):
create_bqstorage_client = False
bqstorage_client = None
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
_should_use_bqstorage の内部では、細かな分岐はありますが、デフォルトの実行パスだとモジュールのimportを試してくれるようです。
try:
_versions_helpers.BQ_STORAGE_VERSIONS.try_import(raise_if_error=True)
except bq_exceptions.BigQueryStorageNotFoundError:
warnings.warn(
"BigQuery Storage module not found, fetch data with the REST "
"endpoint instead."
)
return False
except bq_exceptions.LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return False
というわけで、 to_dataframe() はデフォルトだとモジュールのimportを試すコードが実行されます。一方でto_dataframe_iterable() にはそういったパスはなく、ただ bqstorage_client が渡されたらそれを使い、渡されなければ使わない(通常のREST API取得にフォールバックする)様子でした。
まとめ
「ほどほど」のデータは単一マシンでも十分に扱えることが多いです。一方で、ちょっとした工夫や注意点を守ると、処理はずっと効率的になります。
- PythonでBQクライアント(google-cloud-bigquery)を使う際には google-cloud-bigquery-storage も同時に導入するのがおすすめ
- パッケージを足すだけで自動的にPandasデータフレームの生成は高速になる
- ただし
to_dataframe_iterable()は自動検出が効かない-
bqstorage_clientを明示的に渡すこと
-
Appendix: メモリ消費量はイテレータで改善するのか
理屈上、 to_dataframe_iterable() は to_dataframe() よりも省メモリで効率的になりそうなものですが、今回の測定範囲では明確な優位性が見られませんでした。実際、100万行程度であれば、全体をメモリにロードするのも現実的でしょう。
ここでは実際にメモリ消費量が減っているのか、100万行ダウンロード時の time -l コマンドを見てみます。実行は筆者のローカルPC(MacBook)で行いました。出力を見ると今回注目したい maximum resident set size について:
-
to_dataframe()版: 901906432 -
to_dataframe_iterable()版: 203636736
つまり 902MB vs 204MB と大きな差が出ています。ある程度以上に大きなデータセットを扱う場合には、イテレータ版の有用性が高くなることが確かめられました。
# to_dataframe()版
$ /usr/bin/time -l uv run python simple.py 1000000
31.73 real 10.81 user 2.48 sys
901906432 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
183905 page reclaims
3929 page faults
0 swaps
0 block input operations
0 block output operations
88 messages sent
71567 messages received
1 signals received
4406 voluntary context switches
68786 involuntary context switches
144949685 instructions retired
78174140 cycles elapsed
9044928 peak memory footprint
# to_dataframe_iterable()版
$ /usr/bin/time -l uv run python simple_iter_opt.py 1000000
25.03 real 10.64 user 1.61 sys
203636736 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
46030 page reclaims
3933 page faults
0 swaps
0 block input operations
0 block output operations
100 messages sent
54553 messages received
1 signals received
4358 voluntary context switches
108712 involuntary context switches
130255972 instructions retired
67222875 cycles elapsed
9044928 peak memory footprint
お知らせRECRUIT TECH CONFERENCE 2026を開催します!(オンライン配信/参加無料)
リクルート主催の技術カンファレンス。
第3回目となる今回は「AI×プロダクト開発」をテーマに、急速な技術進化の中で生まれた多様な領域のナレッジから、技術者の活躍を引き出す土壌づくりまで、豊富なセッションをお届けします。是非お気軽にご参加ください!
RECRUIT TECH CONFERENCE 2026
-
開催日時
2026年2月27日(金) 12:00~19:30 (オンライン配信/途中入退場自由) -
社外ゲスト
和田 卓人 氏(タワーズ・クエスト株式会社 取締役社長)/岡野原 大輔 氏(株式会社Preferred Networks 共同創業者 代表取締役社長) ※ご登壇順