まえがき
GCPでログルーターを使用し、LoggingバケットからBigQueryへログを転送していましたが、フィルタの更新を忘れていたため、約4日分のログが転送されていませんでした。そこで、手動でBigQueryに転送した手順を備忘録として残しておきます。
手動でBigQueryに転送するまでの手順
Loggingから直接BigQueryの転送はできないため、下記の手順で転送を行います。
- Loggingからjson形式でログをダウンロードしてファイルに出力する
- Newline Delimited JSON形式に変換(BigQueryはNewline Delimited JSONの形式しか受け付けないため)
- キー項目の".“と”/“を”_“に変換(カラム名に”.“や”/“は使用できないため)
- Cloud Storageにバケットを作成する
- 作成したバケットに出力したファイルを転送
- Cloud StorageからBigQueryにインサート
では順番に解説していきます。
1.Loggingからjson形式でログをダウンロードしてファイルに出力する
下記コマンドでBigQueryに取り込みたいログを出力します。
1
2
3
4
5
6
7
8
9
10
11
| $ gcloud logging read \
'timestamp >= "2025-03-12T00:00:00+09:00" AND timestamp <= "2025-03-16T00:00:00+09:00"
AND resource.type="gce_instance"
AND logName="LOG_NAME"
AND resource.labels.instance_id="INSTANCE_ID"
AND NOT jsonPayload.httpRequest.userAgent="HealthCheckBot/1.0"' \
--project=PROJECT_ID \
--bucket=_Default \
--location=global \
--view=_Default \
--format=json > logs.json
|
timestamp
で期間を区切り、その後はログルーターで設定しているフィルタを記載します。
今回設定していた包含フィルタと除外フィルタはそれぞれ下記のとおりです。
包含フィルタ
1
2
3
| resource.type="gce_instance"
AND logName="LOG_NAME"
AND resource.labels.instance_id="INSTANCE_ID"
|
除外フィルタ
1
| jsonPayload.httpRequest.userAgent="HealthCheckBot/1.0"
|
PROJECT_ID
やフィルタの条件などは適宜変更してください。
バケットの指定をする場合
また、バケットの指定は--bucket
オプションで指定します。
今回は_Default
バケットを使用しています。
--backet
を指定すると--location
と--view
の指定も必要になります。
ロケーション(Location)を確認する
バケットのロケーションは下記コマンドで一覧が取得できるので、「LOCATION」項目で確認できます。
1
| gcloud logging buckets list --project=PROJECT_ID
|
1
2
3
| LOCATION BUCKET_ID RETENTION_DAYS CMEK RESTRICTED_FIELDS INDEX_CONFIGS LIFECYCLE_STATE LOCKED CREATE_TIME UPDATE_TIME
global _Default 30 ACTIVE
global _Required 400 ACTIVE True
|
ビュー(View)を確認する
ビュー(View)は、バケット内のログをフィルタリングして特定のユーザーに制限付きで提供する機能です。
下記コマンドで確認できます。
1
| gcloud logging views list --bucket=_Default --location=global --project=PROJECT_ID
|
1
2
3
| VIEW_ID FILTER CREATE_TIME UPDATE_TIME
_AllLogs
_Default NOT LOG_ID("cloudaudit.googleapis.com/data_access") AND NOT LOG_ID("externalaudit.googleapis.com/data_access")
|
いつも見てる_Default
バケットのログは_Default
のビューが設定されていて、必要なさそうなログはあらかじめフィルタリングされているということですね。
2.Newline Delimited JSON形式に変換
1で出力したjsonをNewline Delimited JSON形式に変換します。
1
| $ cat logs.json | jq -c '.[]' > logs_jsonl.json
|
3.キー項目の”.“と”/“を”_“に変換
こちらは少し複雑なので、Pythonで処理します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import argparse
import json
def replace_keys(obj):
"""再帰的に辞書やリスト内のキーから '.' と '/' を '_' に置換する関数"""
if isinstance(obj, dict):
new_dict = {}
for k, v in obj.items():
new_key = k.replace(".", "_").replace("/", "_")
new_dict[new_key] = replace_keys(v)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Newline Delimited JSONファイルのキーの変換を行います。")
parser.add_argument("input_file", help="入力NDJSONファイルのパス")
parser.add_argument("output_file", help="出力NDJSONファイルのパス")
args = parser.parse_args()
fixed_data = []
with open(args.input_file, "r", encoding="utf-8") as f:
for line in f:
log_entry = json.loads(line)
renamed_entry = replace_keys(log_entry)
fixed_data.append(renamed_entry)
with open(args.output_file, "w", encoding="utf-8") as f:
for entry in fixed_data:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"Fixed JSON file saved to {args.output_file}")
|
上記コードをndjson_key_converter.py
ファイルに保存し、
下記のように入力ファイルと出力ファイルを指定して実行します。
1
| python3 ndjson_key_converter.py logs_jsonl.json logs_jsonl_renamed.json
|
4.Cloud Storageにバケットを作成する
下記コマンドで今回の転送に使用するバケットを作成します。
1
| gsutil mb -l REGION gs://YOUR_BUCKET_NAME
|
※REGION
と YOUR_BUCKET_NAME
は適宜設定
5.作成したバケットに出力したファイルを転送
下記コマンドで4で作成したバケットにlogs_jsonl_renamed.json
ファイルを転送します。
1
| gsutil cp logs_jsonl_renamed.json gs://YOUR_BUCKET_NAME/
|
※YOUR_BUCKET_NAME
は適宜設定
6.Cloud StorageからBigQueryにインサート
下記コマンドで出力したファイルをBigQueryにインサートします。
1
2
3
| bq load \
--source_format=NEWLINE_DELIMITED_JSON \
PROJECT_ID:DATASET_ID.TABLE_ID gs://YOUR_BUCKET_NAME/logs_jsonl_renamed.json
|
※PROJECT_ID:DATASET_ID.TABLE_ID
と YOUR_BUCKET_NAME
は適宜設定
--autodetect
オプションを付けると型が自動で変換され、既に作成されているカラムとの整合性が取れないケースが出てくるため、私は付けないで実行しました。
もし上記オプションを付けなければいけない場合は、データの加工やテーブルのカラムのデータ型を変更する必要があります。
注意点
転送用に作成したバケットは、データを保存している間は課金対象になります。そのため、必要がなくなった場合は 忘れずに削除 しましょう。
バケットを削除するコマンド
1
| gsutil rm -r gs://YOUR_BUCKET_NAME
|
※ YOUR_BUCKET_NAME
は作成したバケット名に置き換えてください。
※ -r
を付けることで、バケット内のすべてのファイルも削除されます。
あとがき
今回の手順では、Cloud Logging から直接 BigQuery に転送できない問題を解決する方法 をまとめました。ログルーターの設定ミスでログが転送されていなかった場合でも、手動でログを取得し、Cloud Storage を経由して BigQuery に取り込むことが可能です。
また、Cloud Logging から取得した JSON を BigQuery で利用できる形式(Newline Delimited JSON)に変換する作業 や、カラム名の適切な変更(.
や /
を _
に置換) も必須となるため、スクリプトを活用すると作業がスムーズになります。
今後、同様の問題が発生した際に、この手順が役に立てば幸いです。