ミツモア Tech blog

「ミツモア」を運営する株式会社ミツモアの技術ブログです

S3内のCloudFrontのCSVログデータをBQに連携した話

※こちらはミツモアAdvent Calendar 2024の6日目の記事です

ミツモアデータチームの赤藤と申します。 主にデータ基盤の運用、データ加工、可視化を行なっています。

今回はAWSのS3内に保存しているCloudFrontのログファイルをBigQuery(BQ)に連携した話をしようかと思います。

採用したデータ連携法

AWSのS3ファイルをBQに連携する方法は様々かと思いますが、Google CloudのData Transfer Service(DTS)を使用しました。DTSはスケジュール設定も簡単に行うことができ、UI上でも設定ができるので簡単にPipelineを構築することができとても便利です。 後述しますが、jobを複数作成する必要があったため、今回はCLIを使ってjobを作成しました。その設定方法を紹介したいと思います。 DTSについては公式ページをご参照ください。

BigQuery Data Transfer Service とは  |  Google Cloud

S3側の下準備

DTSは一度に転送できるファイル数が1万ファイルまでという限りがあります。プロダクトに依るかもしれないですが、CloudFrontのログデータは膨大なので、転送jobを工夫する必要がありました。 ミツモアでは、過去分全てのファイルを一度に転送することが不可能だったので、必要な期間内で月毎にjobを区切って作成し、連携することにしました。

その際、S3のURIを指定するのですが、プレフィックス マッチングのみサポートされているのでファイル名の形式によっては上手くファイルを指定できない可能性もあるので注意が必要です。 ミツモアでは以下のようなファイル名形式で運用しています。

s3://{{ bucket_name }}/{{ cloudfront_id }}.2024-12-09.gz

DTSのS3転送に関する詳細についても公式ページをご参照ください。URIの指定方法についても記載されています。

Amazon S3 転送  |  BigQuery  |  Google Cloud

転送設定

  1. 転送先のデータセット、テーブルを準備する
    • スキーマは予め設定しておく必要があります。また、後ほど触れますが、作成時にpartitionを設定しておくと後続の加工でコストを抑えられるのでおすすめです。
  2. S3のアクセスキー、シークレットアクセスキーを取得する
  3. 転送jobを作成する
    • Cloud Shellで以下のコマンドを実行します。

        # データセット名
        DATASET_NAME="{{ dataset_name }}"
        # テーブル名
        TABLE_NAME="{{ table_name }}"
        # S3のURIパス
        DATA_PATH="s3://{{ bucket_name }}/{{ cloudfront_id }}.2024-12-*.gz"
        # アクセスキー
        ACCESS_KEY="{{ access_key }}"
        # シークレットアクセスキー
        SECRET_ACCESS_KEY="{{ secret_access_key }}"
      
        ## 12月のDTSのジョブを作成
        bq mk --transfer_config \
          --data_source=amazon_s3 \
          --display_name=s3tobq_202412 \
          --target_dataset="$DATASET_NAME" \
          --params="{
          \"destination_table_name_template\": \"$TABLE_NAME\",
          \"data_path\": \"$DATA_PATH\",
          \"access_key_id\": \"$ACCESS_KEY\",
          \"secret_access_key\": \"$SECRET_ACCESS_KEY\",
          \"file_format\": \"CSV\",
          \"write_disposition\": \"WRITE_APPEND\",
          \"field_delimiter\": \"\t\",
          \"skip_leading_rows\": \"2\"
        }"
      
    • このようなjobが作成されていれば成功です。 注意して頂きたいのは、AWSのS3は24時間毎でしかスケジュールを設定できないので、1日に何度も連携することはできません。 また、job作成時に一度連携が走りますが、その後は24時間後になるので、特定の時間を指定したい場合はUI上で変更する必要があります。例えば以下では毎日3:00に実行されるように設定されています。

    • ちなみにUIでは上から順に選択、記載していけば設定することができます。

    • 月別にjobを分けているので、月毎に新しくjobを作成する必要があります。毎月都度作成するのは手間ですし、失念する可能性もあるので、自動化するかjobを予め作成しておいても良いかと思います。例えば2025年1月のデータを転送するjobを作成して動かしておけば、データが入ってくるまではファイルがないため特に転送が行われず、2025年1月になってから転送が始まるということになり、job作成忘れを防止できます。

データ加工

CloudFrontのログデータをBQに連携できましたが、そのままだと扱いにくいです。 他の記事でも触れていますが、ミツモアではdbtを導入しているので、dbtでどのように加工しているのかについても簡単に紹介しようと思います。

CloudFrontのログデータは、日付と時間が別の列になっていたりして無加工だと扱いにくいので、データの加工が必要です。だた、前述でも触れた用にCloudFrontのログは膨大なため、毎回いちからテーブルを作成するとコストが嵩んでしまいます。 詳しくは触れませんがincremental更新のinsert_overwriteを採用し、コストを抑えながら日時で更新しています。

{{
  config(
    materialized = 'incremental',
    unique_key = 'x_edge_request_id',
    incremental_strategy = 'insert_overwrite',
    partition_by = {'field': 'date', 'data_type': 'date'},
  )
}}

with
base as (
select
    timestamp(concat(date, ' ', time)) createdAt,
    *
from
    {{ dataset_name }}.{{ table_name }}
where
    true
    {% if is_incremental() %}
      and date(timestamp(concat(date, ' ', time)), "Asia/Tokyo")  >= date_sub(current_date('Asia/Tokyo'), interval 3 day)
      and coalesce(_partitiontime, current_timestamp()) >= timestamp_sub(current_timestamp(), interval 3 day)
    {% else %}
      and timestamp(concat(date, ' ', time)) <= current_timestamp()
    {% endif %}
)
select
    date(createdAt, 'Asia/Tokyo') date,
    *
from
    base

以上がミツモアで実装したAWSのS3に配置しているCloudFrontのログファイルをBQに連携し、dbtで加工した話でした。皆さんの参考になれば幸いです。

さて、そんなミツモアでは様々な職種のエンジニアを積極的に採用しています! ご興味がある方はぜひ気軽に面談しましょう!