コンテンツへスキップ

QualiArtsengineer blog

Cloud Pub/SubのBigQueryサブスクリプションを使用してログを取得する

Cloud Pub/SubのBigQueryサブスクリプションを使用してログを取得する

6 min read

はじめに

はじめまして、株式会社QualiArtsでバックエンドエンジニアをしている水村と申します。

今回は、Cloud Pub/SubのBigQueryサブスクリプションを使用して、BigQueryテーブルに直接ログデータを書き込む処理について、導入の流れや実際の処理結果を紹介します。

既存のロギングの概要

弊プロジェクトでは、インフラ環境にGoogle Cloudを利用しており、ゲームサーバから送られるAPIアクセスログやユーザの行動ログをBigQueryに集約しています。 このゲームサーバからBigQueryのテーブルにログを書き込むために、Cloud Pub/SubとGoogle Cloud Dataflowを採用しています。

既存のログインフラ概要図
既存のログインフラ概要図

ログデータをそのまま流すだけであれば、公式が提供するPub/Sub Subscription to BigQueryテンプレートを使用してDataflowジョブを作成すれば可能ですが、この場合ジョブにつき1つのテーブルにしかログを送信できません。 そのため、弊プロジェクトでは、Apache Beam SDKを使用してカスタムテンプレートを作成し、ログのタイプ毎に別のテーブルへ送信しています。

しかしこの方法では、SDKのサポートバージョンを追うためのアップデート作業、Dataflowのインフラコストなどのデメリットが存在しています。

そこで、新しいログインフラの構造として、BigQueryサブスクリプションの使用を検討しました。

BigQueryサブスクリプションとは

Cloud Pub/Sub BigQueryサブスクリプションとは、2022年にリリースされたエクスポートサブスクリプションの1つで、Pub/Subに送られたデータを、Dataflowなどのサブスクライバーを介さず直接BigQueryのテーブルに書き込むことが可能です。新しい技術ではありませんが、弊プロジェクトのログインフラ構築当時はなかったため、採用できませんでした。 このサブスクリプションを使用すれば、Dataflowのジョブに任せていた、サブスクリプションからメッセージをpullしてBigQueryテーブルに書き込む処理を他のサービスなしに実行することができます。 サブスクリプションの作成はpull/pushサブスクリプションの代わりに選択して生成できるため、マネージドでありバージョンを管理する必要がないため、アップデートなどの管理コストもかかりません。

ただし、1つのサブスクリプションにつき1つのテーブルにしかデータを送信できないため、送信するログテーブルの分だけサブスクリプションを生成する必要があります。 今回は、実際にTerraformを使用したログインフラの構築例と、データを送信して正常にBigQueryに書き込まれ、そのデータを取得するまでの一連の流れを紹介します。

BigQueryサブスクリプションを用いたログインフラの導入

BigQueryサブスクリプションを使用したログインフラ概要図
BigQueryサブスクリプションを使用したログインフラ概要図

図のインフラ構成をゴールとして、Terraformを使用してログインフラを構築していきます。

まず、作業ディレクトリのファイル構造を以下のように設定します。

.
├── main.tf
├── test1.json
└── test2.json

JSONファイルは、ゲームサーバを記述しているGolangの構造体から自動生成したものです。 これはBigQueryのテーブルスキーマとして使用します。

test1.json

[
  {
    "name": "timestamp",
    "description": "タイムスタンプ",
    "type": "TIMESTAMP",
    "mode": "NULLABLE"
  },
  {
    "name": "str_value",
    "description": "文字列",
    "type": "STRING",
    "mode": "NULLABLE"
  }
]

test2.json

[
  {
    "name": "timestamp",
    "description": "タイムスタンプ",
    "type": "TIMESTAMP",
    "mode": "NULLABLE"
  },
  {
    "name": "number",
    "description": "数字",
    "type": "INTEGER",
    "mode": "NULLABLE"
  }
]

次に、ログを保存するBigQueryのデータセットとテーブルを作成します。あくまで構築例なのでオプションはデフォルトにします。

main.tf

resource "google_bigquery_dataset" "test_dataset" {
  dataset_id = "test_dataset"
  location   = "asia-northeast1"
}

resource "google_bigquery_table" "test_table1" {
  table_id   = "test1"
  dataset_id = google_bigquery_table.test_dataset.dataset_id
  schema = file("./test1.json")
}

resource "google_bigquery_table" "test_table2" {
  table_id   = "test2"
  dataset_id = google_bigquery_table.test_dataset.dataset_id
  schema = file("./test2.json")
}

次に、ログを受け取るPub/Subのトピックと、test1,test2テーブルに対応するサブスクリプションを作成します。

locals {
  project_id = "test-pj"
}

resource "google_pubsub_topic" "test" {
  name = "test-topic"
}

resource "google_pubsub_subscription" "test1" {
  name  = "test1-subscription"
  topic = google_pubsub_topic.test.id

  bigquery_config {
    table            = "${local.project_id}.${google_bigquery_dataset.test_dataset.dataset_id}.${google_bigquery_table.test_table1.table_id}"
    use_table_schema = true
  }

  filter = "attributes.table_name = \"${google_bigquery_table.test_table1.table_id}\""
}

resource "google_pubsub_subscription" "test2" {
  name  = "test2-subscription"
  topic = google_pubsub_topic.test.id

  bigquery_config {
    table            = "${local.project_id}.${google_bigquery_dataset.test_dataset.dataset_id}.${google_bigquery_table.test_table2.table_id}"
    use_table_schema = true
  }

  filter = "attributes.table_name = \"${google_bigquery_table.test_table2.table_id}\""
}

filterは、属性などの条件を記述することで、その条件にマッチするメッセージのみをサブスクリプションで受け取ることができます。 今回の構成では1つのトピックにつき2つのサブスクリプションがありますが、ゲームサーバからのメッセージはトピックに送信するため、 そのままの設定だとそれぞれのサブスクリプションにメッセージが流れてしまいます。 これを防止するために、属性にテーブル名を送信してもらい、サブスクリプション側でフィルタリングで防ぐことで、任意のテーブルに対してログを書き込みます。

ゲームサーバ側では以下のように、メッセージをパブリッシュする際にAttributesにtable_nameを指定することで、サブスクリプション側でフィルタリングすることができます。

type PublishInfo struct {
	topic  *pubsub.Topic
	...
}

func (p *PublishInfo) Publish(ctx context.Context, data []byte, tableName string) error {
	// タイムアウト設定
	tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	ret := p.topic.Publish(tctx, &pubsub.Message{
		Data: data,
		Attributes: map[string]string{
			"table_name": tableName,
		},
	})
	...
}

これでログを送信する準備が整いました。

ゲームサーバからログが送れるか動作確認

上述したインフラをTerraformで適用して構築したあと、以下の処理を呼び出すようにゲームサーバを実装・デプロイします。 実際にはClose処理などが必要ですが、今回は省略しています。

type Test1 struct {
	Timestamp time.Time `json:"timestamp"`
	StrValue  string    `json:"str_value"`
}

type Test2 struct {
	Timestamp time.Time `json:"timestamp"`
	Number    int       `json:"number"`
}

func publishTest(ctx context.Context) error {
	publishInfo, err := pubsub.NewPublishInfo(ctx, "test-pj", "test-topic")
	if err != nil {
		return err
	}
	
	json1, err := json.Marshal(&Test1{
		Timestamp: time.Now(),
		StrValue:  "test_str_value",
	})
	if err != nil {
		return err
	}

	json2, err := json.Marshal(&Test2{
		Timestamp: time.Now(),
		Number:    100,
	})
	if err != nil {
		return err
	}

	if err := publishInfo.Publish(ctx, json1, "test1"); err != nil {
		return err
	}

	if err := publishInfo.Publish(ctx, json2, "test2"); err != nil {
		return err
	}

	return nil
}

この関数を呼び出すと、まずtest-topicにメッセージをパブリッシュし、サブスクリプションが属性のtable_nameを見て、対応するテーブルのメッセージを受け取り、BigQueryテーブルに書き込みます。 結果的に、BigQueryで各テーブルのデータを検索したところ、以下のデータが取得できました。

クエリ

SELECT timestamp, str_value FROM `test-pj.test_dataset.test1` LIMIT 10;
SELECT timestamp, number FROM `test-pj.test_dataset.test2` LIMIT 10;

結果

[{
  "timestamp": "2025-05-06 10:00:48.881522 UTC",
  "str_value": "test_str_value"
}]

[{
  "timestamp": "2025-05-06 10:00:48.881655 UTC",
  "number": "100"
}]

最後に

今回はCloud Pub/Sub BigQueryサブスクリプションを利用したログの書き込みの、構築例と実際の処理を紹介しました。 あくまで最低限の動作なので、デッドレタートピックの設定や保持期間の設定など細かい調整が必要です。

また、データ書き込み部分などをBigQueryサブスクリプションに一任する都合上、トピックへメッセージをパブリッシュしてからさらにデータ処理をしたい場合には、 Dataflowなどのサブスクライバーを使用する必要があります。

ただ、運用時のサービスのバージョンアップデート作業が不要になるのは個人的にはとても大きいため、 要件次第では積極的な採用は十分に考えられると感じています。

この記事が皆様の開発に少しでもお役に立てれば幸いです。

2023年にサイバーエージェントに新卒入社。株式会社QualiArtsにてバックエンドエンジニアとして従事。

OSZAR »