Cloud Pub/SubのBigQueryサブスクリプションを使用してログを取得する

はじめに
はじめまして、株式会社QualiArtsでバックエンドエンジニアをしている水村と申します。
今回は、Cloud Pub/SubのBigQueryサブスクリプションを使用して、BigQueryテーブルに直接ログデータを書き込む処理について、導入の流れや実際の処理結果を紹介します。
既存のロギングの概要
弊プロジェクトでは、インフラ環境にGoogle Cloudを利用しており、ゲームサーバから送られるAPIアクセスログやユーザの行動ログをBigQueryに集約しています。 このゲームサーバからBigQueryのテーブルにログを書き込むために、Cloud Pub/SubとGoogle Cloud Dataflowを採用しています。

ログデータをそのまま流すだけであれば、公式が提供するPub/Sub Subscription to BigQueryテンプレートを使用してDataflowジョブを作成すれば可能ですが、この場合ジ ョブにつき1つのテーブルにしかログを送信できません。 そのため、弊プロジェクトでは、Apache Beam SDKを使用してカスタムテンプレートを作成し、ログのタイプ毎に別のテーブルへ送信しています。
しかしこの方法では、SDKのサポートバージョンを追うためのアップデート作業、Dataflowのインフラコストなどのデメリットが存在しています。
そこで、新しいログインフラの構造として、BigQueryサブスクリプションの使用を検討しました。
BigQueryサブスクリプションとは
Cloud Pub/Sub BigQueryサブスクリプションとは、2022年にリリースされたエクスポートサブスクリプションの1つで、Pub/Subに送られたデータを、Dataflowなどのサブスクライバーを介さず直接BigQueryのテーブルに書き込むことが可能です。新しい技術ではありませんが、弊プロジェクトのログインフラ構築当時はなかったため、採用できませんでした。 このサブスクリプションを使用すれば、Dataflowのジョブに任せていた、サブスクリプションからメッセージをpullし てBigQueryテーブルに書き込む処理を他のサービスなしに実行することができます。 サブスクリプションの作成はpull/pushサブスクリプションの代わりに選択して生成できるため、マネージドでありバージョンを管理する必要がないため、アップデートなどの管理コストもかかりません。
ただし、1つのサブスクリプションにつき1つのテーブルにしかデータを送信できないため、送信するログテーブルの分だけサブスクリプションを生成する必要があります。 今回は、実際にTerraformを使用したログインフラの構築例と、データを送信して正常にBigQueryに書き込まれ、そのデータを取得するまでの一連の流れを紹介します。
BigQueryサブスクリプションを用いたログインフラの導入

図のインフラ構成をゴールとして、Terraformを使用してログインフラを構築していきます。
まず、作業ディレクトリのファイル構造を以下のように設定します。
.
├── main.tf
├── test1.json
└── test2.json
JSONファイルは、ゲームサーバを記述しているGolangの構造体から自動生成したものです。 これはBigQueryのテーブルスキーマとして使用します。
test1.json
[
{
"name": "timestamp",
"description": "タイムスタンプ",
"type": "TIMESTAMP",
"mode": "NULLABLE"
},
{
"name": "str_value",
"description": "文字列",
"type": "STRING",
"mode": "NULLABLE"
}
]
test2.json
[
{
"name": "timestamp",
"description": "タイムスタンプ",
"type": "TIMESTAMP",
"mode": "NULLABLE"
},
{
"name": "number",
"description": "数字",
"type": "INTEGER",
"mode": "NULLABLE"
}
]
次に、ログを保存するBigQueryのデータセットとテーブルを作成します。あくまで構築例なのでオプションはデフォルトにします。
main.tf
resource "google_bigquery_dataset" "test_dataset" {
dataset_id = "test_dataset"
location = "asia-northeast1"
}
resource "google_bigquery_table" "test_table1" {
table_id = "test1"
dataset_id = google_bigquery_table.test_dataset.dataset_id
schema = file("./test1.json")
}
resource "google_bigquery_table" "test_table2" {
table_id = "test2"
dataset_id = google_bigquery_table.test_dataset.dataset_id
schema = file("./test2.json")
}
次に、ログを受け取るPub/Subのトピックと、test1,test2テーブルに対応するサブスクリプションを作成します。
locals {
project_id = "test-pj"
}
resource "google_pubsub_topic" "test" {
name = "test-topic"
}
resource "google_pubsub_subscription" "test1" {
name = "test1-subscription"
topic = google_pubsub_topic.test.id
bigquery_config {
table = "${local.project_id}.${google_bigquery_dataset.test_dataset.dataset_id}.${google_bigquery_table.test_table1.table_id}"
use_table_schema = true
}
filter = "attributes.table_name = \"${google_bigquery_table.test_table1.table_id}\""
}
resource "google_pubsub_subscription" "test2" {
name = "test2-subscription"
topic = google_pubsub_topic.test.id
bigquery_config {
table = "${local.project_id}.${google_bigquery_dataset.test_dataset.dataset_id}.${google_bigquery_table.test_table2.table_id}"
use_table_schema = true
}
filter = "attributes.table_name = \"${google_bigquery_table.test_table2.table_id}\""
}
filterは、属性などの条件を記述することで、その条件にマッチするメッセージのみをサブスクリプションで受け取ることができます。 今回の構成では1つのトピックにつき2つのサブスクリプションがありますが、ゲームサーバからのメッセージはトピックに送信するため、 そのままの設定だとそれぞれのサブスクリプションにメッセージが流れてしまいます。 これを防止するために、属性にテーブル名を送信してもらい、サブスクリプション側でフィルタリングで防ぐことで、任意のテーブルに対してログを書き込みます。
ゲームサーバ側では以下のように、メッセージをパブリッシュする際にAttributesにtable_nameを指定することで、サブスクリプション側でフィルタリングすることができます。
type PublishInfo struct {
topic *pubsub.Topic
...
}
func (p *PublishInfo) Publish(ctx context.Context, data []byte, tableName string) error {
// タイムアウト設定
tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
ret := p.topic.Publish(tctx, &pubsub.Message{
Data: data,
Attributes: map[string]string{
"table_name": tableName,
},
})
...
}
これでログを送信する準備が整いました。
ゲームサーバからログが送れるか動作確認
上述したインフラをTerraformで適用して構築したあと、以下の処理を呼び出すようにゲームサーバを実装・デプロイします。 実際にはClose処理などが必要ですが、今回は省略しています。
type Test1 struct {
Timestamp time.Time `json:"timestamp"`
StrValue string `json:"str_value"`
}
type Test2 struct {
Timestamp time.Time `json:"timestamp"`
Number int `json:"number"`
}
func publishTest(ctx context.Context) error {
publishInfo, err := pubsub.NewPublishInfo(ctx, "test-pj", "test-topic")
if err != nil {
return err
}
json1, err := json.Marshal(&Test1{
Timestamp: time.Now(),
StrValue: "test_str_value",
})
if err != nil {
return err
}
json2, err := json.Marshal(&Test2{
Timestamp: time.Now(),
Number: 100,
})
if err != nil {
return err
}
if err := publishInfo.Publish(ctx, json1, "test1"); err != nil {
return err
}
if err := publishInfo.Publish(ctx, json2, "test2"); err != nil {
return err
}
return nil
}
この関数を呼び出すと、まずtest-topicにメッセージをパブリッシュし、サブスクリプションが属性のtable_nameを見て、対応するテーブルのメッセージを受け取り、BigQueryテーブルに書き込みます。 結果的に、BigQueryで各テーブルのデータを検索したところ、以下のデータが取得できました。
クエリ
SELECT timestamp, str_value FROM `test-pj.test_dataset.test1` LIMIT 10;
SELECT timestamp, number FROM `test-pj.test_dataset.test2` LIMIT 10;
結果
[{
"timestamp": "2025-05-06 10:00:48.881522 UTC",
"str_value": "test_str_value"
}]
[{
"timestamp": "2025-05-06 10:00:48.881655 UTC",
"number": "100"
}]
最後に
今回はCloud Pub/Sub BigQueryサブスクリプションを利用したログの書き込みの、構築例と実際の処理を紹介しました。 あくまで最低限の動作なので、デッドレタートピックの設定や保持期間の設定など細かい調整が必要です。
また、データ書き込み部分などをBigQueryサブスクリプションに一任する都合上、トピックへメッセージをパブリッシュしてからさらにデータ処理をしたい場合には、 Dataflowなどのサブスクライバーを使用する必要があります。
ただ、運用時のサービスのバージョンアップデート作業が不要になるのは個人的にはとても大きいため、 要件次第では積極的な採用は十分に考えられると感じています。
この記事が皆様の開発に少しでもお役に立てれば幸いです。