ミツモア Tech blog

「ミツモア」を運営する株式会社ミツモアの技術ブログです

KARTE データパイプライン:マルチチャネルマーケティング分析基盤の構築

ℹ️ この記事は dbt Advent Calendar 2025 21日&MeetsMore Advent Calendar 2025 21日の記事です。

ミツモア データマネージャの古田 (@crazysrot) です。

はじめに

ミツモアでは、マルチチャネルマーケティングツールである KARTE を活用し、Email、プッシュ通知、LINE などの様々なチャネルを通じてユーザーとのコミュニケーションを実施しています。 KARTEではそのプロダクト内でさまざまな施策のPDCAが行えるように設計されていますが、自社データを完全に活用した検証や分析をすることまではできないです。そのため、ミツモアではそのデータをBigQueryへELTしデータパイプラインを構築しました。それにより、より詳細な可視化/分析が可能となりました ※本記事は株式会社プレイド様に監修をいただいております

KARTE とは

KARTE は、株式会社プレイドが提供する顧客体験プラットフォームです。その中でも KARTE Message は、Email、SMS、プッシュ通知、LINE など複数のチャネルを統合的に管理できる CRM ツールで、以下の特徴を持ちます:

  • マルチチャネル対応: 一つのプラットフォームで Email、プッシュ通知、LINE などを統合管理
  • セグメント配信: ユーザー属性や行動データに基づいた精密なターゲティング
  • リアルタイム配信: イベントドリブンな即時配信とスケジュール配信の両立
  • 効果測定: 送信、開封、クリック、コンバージョンまでを一元的に追跡
  • A/B テスト: 配信内容やタイミングの最適化をサポート

KARTE Message では、これらの配信結果がイベントデータとして記録され、KARTE Datahub を通じて外部のデータ基盤と連携できます

BigQuery へ構築した KARTE データパイプラインの概要

システム構成

KARTE dbt data pipeline

技術的な詳細

1. データ取り込み層

KARTE のイベントデータは、KARTE Datahubを用いてBigQueryへデータを連携しています。

主要なテーブルは以下の通りです:

  • masspush_event_log_regional: キャンペーンイベントログ(メイン)
  • masspush_setting_value_campaign: キャンペーン設定
  • masspush_setting_value_push_content: プッシュコンテンツ設定
  • masspush_setting_value_campaign_folder: キャンペーンフォルダ管理
  • masspush_setting_value_address_profile: アドレスプロファイル設定
  • karte_information_schema_column_field_paths: 上記tablesのSchema定義

各テーブルについてはこちらに詳細が記載してあります

masspush_event_log_regionalを例に手順を記載します

Step 1. 権限設定

こちらの手順に沿って、KARTE が 連携先のBigQueryの編集権限を持つように設定をします

Step 2. KARTE Datahub のQueryでBigQueryへ連携したいデータを出力するためのクエリを作成します

クエリ例

SELECT
  event_hash,
  event_name,
  api_key,
  ARRAY(
    SELECT AS STRUCT
      user_id,
      target
    FROM UNNEST(user_data_list)
  ) as user_data_list,
  timestamp,
  campaign_id,
  push_content_id,
  plugin_type,
  schedule_id,
  schedule_task_id,
  error_type,
  error_code,
  error_message,
  values,
  ARRAY(
    SELECT AS STRUCT
      recovery_type,
      timestamp
    FROM UNNEST(error_recoveries)
  ) as error_recoveries,
  current_timestamp() as _sdc_batched_at
FROM
  `prd-karte-message-per-client.csv_upload_{projectID}.masspush_event_log_regional`
WHERE 1=1
  and timestamp > timestamp_sub(current_timestamp(), interval 2 day)

KARTEの仕様変更によりSchema変更が行われることがあるため、select * をするのはやめました。

karte_information_schema_column_field_pathsにて変更は検知したり確認はできるのですが、変更があった際にDestinationのBigQuery tableとSchema定義が一致せずにエラーになってしまい変更対応をする必要があるためです。追従はkarte_information_schema_column_field_pathsから変更を確認し必要があれば事後対応で対応するようにしています

Step 3. Jobflowを作成する

Datahub > Jobflow の https://admin.karte.io/datahub/jobflow/list?project={your project} から新しいJobflowを作成してください

Step 4. 上記で作成したJobflow下にてjobを追加する。次のような入力が行われていたら良いと思います。

JobflowのJob入力画面

全てのtableに対して設定を終えると次のような形になると思います

これで分析環境のBigQueryへデータが連携できるはずです。

Step 5. スケジュールの設定

スケジュールの設定をJobflowに対して設定をする。実行されると、実行ログも見れます

Jobflow実行ログ
">
Jobflow実行ログ

ミツモアではスケジュール設定はDailyにて行っています。分析要件として頻度はDailyで十分だという理由と、KARTEのNumber of jobfolows executedとQuery resources consumptionの上限制限によりDailyを採用しました。

ここまでで、BigQueryには次のtableが作成されるようになりました

  • <your-project-name>.<your-dataset-name>.masspush_event_log_regional
  • <your-project-name>.<your-dataset-name>.masspush_setting_value_campaign
  • <your-project-name>.<your-dataset-name>.masspush_setting_value_push_content
  • <your-project-name>.<your-dataset-name>.masspush_setting_value_campaign_folder
  • <your-project-name>.<your-dataset-name>.masspush_setting_value_address_profile
  • <your-project-name>.<your-dataset-name>.karte_information_schema_column_field_paths

ここまでで、イベントのトランザクションデータと各設定のスナップショットデータがBigQueryに入ってきました

2. dbt による変換処理

このあとは、分析可能なtableに変換していきます

Staging Layer(基礎層)

Staging モデルでは、生データの正規化と基本的なクレンジング処理を行います。以下、各モデルの詳細を説明します:

1. karte_stg_masspush_event_log_regional.sql

メインのイベントログテーブルで、すべてのキャンペーンイベントを処理 基本的には重複を排除し、後段で扱いやすくするための変更のみの思想

{{ config(
    materialized = 'incremental',
    unique_key = '_id',
    partition_by = {
        'field': 'date',
        'data_type': 'date',
        'granularity': 'day'
    },
    cluster_by = ['event_name', 'campaign_id', 'schedule_task_id', 'event_hash'],
    tags = ["daily_karte_job"]
) }}

WITH source_data AS (
    SELECT
        * except(created, user_data_list, error_recoveries),
        -- タイムスタンプの多次元変換(JST)
        DATETIME(created, 'Asia/Tokyo') as created,
        DATE(created, 'Asia/Tokyo') as date,
        -- JSON データの解析
        {{ ref('udfs.parseMasspushEventLogRegionalUserDataList') }}(user_data_list) AS parsed_user_data_list,
        {{ ref('udfs.parseMasspushEventLogRegionalErrorRecoveries') }}(error_recoveries) AS parsed_error_recoveries
    FROM {{ source('karte_mm_jobflow', 'masspush_event_log_regional') }}
    {% if is_incremental() %}
    WHERE 
        -- ELT遅延対応のための4日間バッファ
        DATE(_sdc_batched_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 4 DAY)
        -- イベントタイムスタンプによる2日間フィルタ
        AND TIMESTAMP_SECONDS(CAST(JSON_VALUE(value, '$.timestamp') AS INT64)) >= 
            TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY)
    {% endif %}
)

特徴:

  • トランザクションデータのためデータ量が一番多いテーブル
  • クラスタリングによるクエリ性能の最適化
  • エラーリカバリ情報の構造化
  • イベントタイプ: message_send, message_fail, message_open, message_click など
2. karte_stg_masspush_setting_value_campaign.sql

キャンペーン設定情報最新の状態を保持するように加工

{{ config(
    materialized = 'incremental',
    partition_by = {
        'field': 'date',
        'data_type': 'date',
        'granularity': 'day'
    },
    unique_key = '_id',
    tags = ["daily_karte_job"]
) }}

WITH source AS (
    SELECT
        * except(created, modified, modified_by, content),
        -- タイムスタンプの多次元変換(JST)
        DATETIME(created, 'Asia/Tokyo') as created,
        DATE(created, 'Asia/Tokyo') as date,
        DATETIME(modified, 'Asia/Tokyo') as modified,
        -- キャンペーンコンテンツとスケジュールのJSON解析
        udfs.parseMasspushSettingValueCampaignContent(
            {{ to_json_string('content') }}
        ) as content,
        udfs.parseMasspushSettingValueCampaignSchedule(
            {{ to_json_string('schedule') }}
        ) as schedule,
        STRUCT(
            modified_by.account_id,
            modified_by.username
        ) as modified_by,
        STRUCT(
            option.force_send_to_unsubscribe_list,
            option.max_sendable_size_per_minute,
            option.is_e2e
        ) as option,
        row_number() over (PARTITION by _id order by _sdc_batched_at desc, modified desc) as row_num,
    FROM {{ source('karte_mm_jobflow', 'masspush_setting_value_campaign') }}
    {% if is_incremental() %}
    WHERE {{ incremental_condition('modified') }}
    {% endif %}
)
SELECT * EXCEPT(row_num)
FROM source
WHERE row_num = 1

特徴:

  • キャンペーンのメタデータ(名前、説明、フォルダ、ステータス)
  • コンテンツとスケジュールの詳細解析
3. karte_stg_masspush_setting_value_push_content.sql

プッシュ通知とメールコンテンツの設定:

{{ config(
    materialized = 'incremental',
    partition_by = {
        'field': 'date',
        'data_type': 'date',
        'granularity': 'day'
    },
    unique_key = '_id',
    tags = ["daily_karte_job"]
) }}

WITH source AS (
    SELECT
        * except(created, modified, modified_by, content),
        -- タイムスタンプの多次元変換(JST)
        DATETIME(created, 'Asia/Tokyo') as created,
        DATE(created, 'Asia/Tokyo') as date,
        DATETIME(modified, 'Asia/Tokyo') as modified,
        -- コンテンツフィールドの構造化
        STRUCT(
            -- プッシュ通知コンテンツ
            content.title,
            content.body,
            content.url,
            -- iOS固有設定
            content.badge_for_ios,
            content.sound_for_ios,
            -- メールコンテンツ(SES)
            content.SES_SUBJECT,
            content.SES_CONTENT_HTML,
            content.SES_CONTENT_TEXT,
            content.use_bee_html,
            -- その他メタデータ
            content.custom_data,
            content.attachment_type,
            content.attachment_url,
        ) as content,
        STRUCT(
            modified_by.account_id,
            modified_by.username
        ) as modified_by,
    FROM {{ source('karte_mm_jobflow', 'masspush_setting_value_push_content') }}
    {% if is_incremental() %}
    WHERE {{ incremental_condition('modified') }}
    {% endif %}
)
SELECT * EXCEPT(row_num)
FROM source
WHERE row_num = 1

特徴:

  • マルチチャネル対応(プッシュ通知、メール)
  • プラットフォーム固有設定(iOS バッジ、サウンド) HTML/テキストコンテンツの管理
4. karte_stg_masspush_setting_value_address_profile.sql

メール送信プロファイル設定:

{{ config(
    materialized = 'table',
    unique_key = '_id',
    tags = ["daily_karte_job"]
) }}

WITH source AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY _id 
            ORDER BY _sdc_batched_at DESC
        ) AS row_num
    FROM {{ source('karte_mm_jobflow', 'masspush_setting_value_address_profile') }}
)
SELECT * EXCEPT(row_num)
FROM source
WHERE row_num = 1

特徴:

  • メール送信者情報の管理
  • フルリフレッシュによる最新状態の維持
5. karte_stg_masspush_setting_value_campaign_folder.sql

キャンペーンフォルダ管理:

{{ config(
    materialized = 'table',
    unique_key = '_id',
    tags = ["daily_karte_job"]
) }}

WITH source AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY _id 
            ORDER BY _sdc_batched_at DESC
        ) AS row_num
    FROM {{ source('karte_mm_jobflow', 'masspush_setting_value_campaign_folder') }}
)
SELECT * EXCEPT(row_num)
FROM source
WHERE row_num = 1

特徴:

  • キャンペーンの論理的な組織化
  • フォルダ階層による管理

共通パターン:

  • タイムゾーン: すべて Asia/Tokyo で統一
  • 重複排除: ROW_NUMBER() によるウィンドウ関数で最新レコードを選択
  • インクリメンタル更新: incremental_condition マクロによる効率的な差分更新
  • 命名規則: karte_stg_masspush_* で統一
  • 実行スケジュール: daily_karte_job タグで実行

Intermediate Layer(中間層)

Mid モデルでは、Staging Layer のデータを元に、ビジネスロジックの適用と高度な集約処理を実施します。特に重要なのは、ネストされた配列データの展開(UNNEST)処理です。

1. karte_int_masspush_messages.sql

Karte の全メッセージイベントを集約する中核モデル unique keyを1ユーザへの配信あたりに変換し、open, clickなどのイベントを集約

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
        'field': 'date',
        'data_type': 'date'
    },
    unique_key = 'unique_id',
    tags = ["daily_karte_job"]
    cluster_by = ['user_id', 'campaign_id', 'schedule_task_id', 'sendAt'],
) }}

WITH schedule_task_info AS (
    -- キャンペーン実行のメタデータを取得
    SELECT
        campaign_id,
        schedule_task_id,
        MIN(CASE WHEN event_name = 'system_start' THEN datetime END) AS system_start_datetime
    FROM {{ ref('karte_stg_masspush_event_log_regional') }}
    where 1=1
        and event_name = 'system_start'
        {% if is_incremental() %}
            and date >= DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 34 DAY) -- Concerned about trouble of source data ELT 
        {% else %}
            and datetime <= CURRENT_DATETIME('Asia/Tokyo')
        {% endif %}
    GROUP BY 1, 2
),

messages AS (
    select
        distinct
        elr.event_hash,
        s.datetime as system_start_datetime,
        s.date as system_start_date,
        s.week as system_start_week,
        s.month as system_start_month,
        elr.datetime,
        elr.date,
        elr.week,
        elr.month,
        user_data.user_id,
        elr.campaign_id,
        vc.name campaign_name,
        elr.event_name,
        elr.values,
        elr.plugin_type,
        elr.schedule_task_id,
        elr.error_type,
        elr.error_code,
        elr.error_message,
        concat(elr.error_type, '-$-', elr.error_code, '-$-', elr.error_message) error_type_code_message,
        json_extract_scalar(values, '$.ses_message_id') ses_message_id,
    from
        {{ ref('karte_base_masspush_event_log_regional') }} elr,
        UNNEST(elr.user_data_list) AS user_data
        inner join schedules s
        on elr.schedule_task_id = s.schedule_task_id
        left join {{ ref('karte_base_masspush_setting_value_campaign') }} vc
        on elr.campaign_id = vc._id
    where 1=1
        {% if is_incremental() %}
            and elr.date >= DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 34 DAY) -- Concerned about trouble of source data ELT 
        {% else %}
            and elr.date <= CURRENT_DATE('Asia/Tokyo')
        {% endif %}
        and elr.event_name in (
            'message_ses_send',
            'message_send',
            'message_open',
            'message_click',
            'message_fail',
            'message_unsubscribe'
        )
)
select
    concat(send_req.event_hash, '-', send_req.user_id) as id,
    send_req.event_hash,
    send_req.system_start_datetime,
    send_req.system_start_date,
    send_req.system_start_week,
    send_req.system_start_month,
    send_req.user_id,
    send_req.campaign_id,
    send_req.campaign_name,
    send_req.event_name,
    send_req.plugin_type,
    send_req.schedule_task_id,

    -- ses message
    send_req.ses_message_id,
    send_req.datetime ses_message_sendAt,

    send.user_id is not null send,
    send.datetime sendAt,
    -- open
    open.user_id is not null open,
    open.datetime openedAt,
    -- click
    click.user_id is not null click,
    click.datetime clickedAt,
    -- fail
    fail.user_id is not null fail,
    fail.datetime failedAt,
    -- unsubscribe
    unsubscribe.user_id is not null unsubscribe,
    unsubscribe.datetime unsubscribedAt,
    -- error
    error_tc.error_type_code_messages,
from
    messages as send_req
    -- error
    left join (select schedule_task_id, user_id, array_agg(distinct error_type_code_message) as error_type_code_messages, min(datetime) as datetime, min(date) as date from messages where error_code is not null group by 1,2) as error_tc
    on send_req.schedule_task_id = error_tc.schedule_task_id and send_req.user_id = error_tc.user_id and date_add(send_req.system_start_date, interval 30 day) >= error_tc.date
    -- send
    left join (select schedule_task_id, user_id, min(datetime) datetime, min(date) date from messages where event_name = 'message_send' group by 1,2) as send
    on send_req.schedule_task_id = send.schedule_task_id and send_req.user_id = send.user_id and date_add(send_req.system_start_date, interval 30 day) >= send.date
    -- open
    left join (select schedule_task_id, user_id, min(datetime) datetime, min(date) date from messages where event_name = 'message_open' group by 1,2) as open
    on send_req.schedule_task_id = open.schedule_task_id and send_req.user_id = open.user_id and date_add(send_req.system_start_date, interval 30 day) >= open.date
    -- click
    left join (select schedule_task_id, user_id, min(datetime) datetime, min(date) date from messages where event_name = 'message_click' group by 1,2) as click
    on send_req.schedule_task_id = click.schedule_task_id and send_req.user_id = click.user_id and date_add(send_req.system_start_date, interval 30 day) >= click.date
    -- fail
    left join (select schedule_task_id, user_id, min(datetime) datetime, min(date) date from messages where event_name = 'message_fail' group by 1,2) as fail
    on send_req.schedule_task_id = fail.schedule_task_id and send_req.user_id = fail.user_id and date_add(send_req.system_start_date, interval 30 day) >= fail.date
    -- unsub
    left join (select schedule_task_id, user_id, min(datetime) datetime, min(date) date from messages where event_name = 'message_unsubscribe' group by 1,2) as unsubscribe
    on send_req.schedule_task_id = unsubscribe.schedule_task_id and send_req.user_id = unsubscribe.user_id and date_add(send_req.system_start_date, interval 30 day) >= unsubscribe.date
where 1=1
    and (
        -- message_ses_send はメールのみトランザクションが発生するためメールとそれ以外それぞれのトランザクションを取得する
        send_req.event_name = 'message_ses_send'
        or
        (send_req.event_name = 'message_send' and send_req.plugin_type != 'ses')
    )

特徴:

  • 1ユーザあたりの配信単位に変換: 1レコードにsend, open, click, unsubscribe, errorを全て集約し横持ちで表現
  • イベントフラグの生成: 各イベントタイプを boolean カラムとして表現
  • エラー情報の集約: 複数のエラーを文字列として連結
  • 30日間のルックバック: インクリメンタル処理での適切なウィンドウ設定。Unsubscribeなどユーザ行動により変更のありうるイベントをなるべく反映できるように設計
2. karte_int_line_delivery_list.sql

LINE チャネル専用の配信リスト 自社のLINE IDと結合した上で分析を可能にするためのテーブル

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
        'field': 'date',
        'data_type': 'date'
    },
    tags = ["daily_karte_job"],
    cluster_by = ['campaign_id', 'schedule_task_id', 'user_id'],
) }}

-- KARTEのマスプッシュ配信ログから、LINE配信の送信対象ユーザーのLINE_IDを抽出

-- キャンペーンID/配信ID(schedule_task_id)ごとに、配信したユーザーとLINE IDを列挙する

with line_message as (
    select
        mm.system_start_date,
        mm.system_start_week,
        mm.system_start_month,
        mm.campaign_id,
        mm.campaign_name,
        mm.schedule_task_id,
        mm.user_id,
        mm.send,
        mm.sendAt,
        mm.click,
        mm.clickedAt,
        mm.fail,
        mm.failedAt,
        mm.unsubscribe,
        mm.unsubscribedAt
    from {{ ref('karte_int_masspush_messages') }} mm
    where 1=1
        and mm.plugin_type = 'line'
        {% if is_incremental() %}
            and mm.system_start_date >= date_sub(current_date('Asia/Tokyo'), interval 33 day)
        {% else %}
            and mm.system_start_date <= current_date('Asia/Tokyo')
        {% endif %}
)
, with_line as (
  select
    lm.*,
    user.line_id,
    user.blocked
  from line_message lm
  left join {{ <your company user data> }} user
    on lm.user_id = user.user_id
)
select distinct
  system_start_date,
  system_start_week,
  system_start_month,
  campaign_id,
  campaign_name,
  schedule_task_id,
  user_id,
  line_id,
  deactivate,
  blocked,
  send,
  sendAt,
  click,
  clickedAt,
  fail,
  failedAt,
  unsubscribe,
  unsubscribedAt
from with_line
where 1=1

特徴:

  • LINE 専用処理: plugin_type = 'line' でフィルタリング
  • LINE ID の紐付け: 自社ソースからの LINE ID マッピング
3. karte_int_masspush_subscribe_event.sql

購読/購読解除イベントの追跡用table

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
        'field': 'date',
        'data_type': 'date'
    },
    tags = ["daily_karte_job"],
    cluster_by = ['user_id', 'event_name'],
) }}

select
    distinct
    elr.event_hash,
    elr.datetime,
    elr.date,
    elr.week,
    elr.month,
    user_data.user_id,
    elr.event_name,
    elr.values,
from
  {{ ref('karte_base_masspush_event_log_regional') }} elr,
  UNNEST(elr.user_data_list) AS user_data
where 1=1
    {% if is_incremental() %}
        and elr.date >= DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 34 DAY) -- Concerned about trouble of source data ELT 
    {% else %}
        and elr.date <= CURRENT_DATE('Asia/Tokyo')
        and elr.date >= DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 34 DAY) -- Concerned about trouble of source data ELT 
    {% endif %}
    and elr.event_name in (
        'message_subscribe',
        'message_unsubscribe'
    )

特徴:

  • CROSS JOIN UNNEST: 配列データの効率的な展開
  • 購読管理: subscribe/unsubscribe イベントの追跡
  • 理由の記録: 購読解除理由の保持
4. karte_int_mail_softbounce.sql

メールのソフトバウンス検出:

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
        'field': 'datetime',
        'data_type': 'timestamp'
    },
    tags = ["daily_karte_job"]
) }}

select
    r.datetime,
    r.date,
    r.week,
    r.month,
    r.event_hash,
    r.event_name,
    user_data.user_id as user_id,
    r.campaign_id,
    r.push_content_id,
    r.plugin_type,
    r.schedule_id,
    r.schedule_task_id,
    r.error_type,
    r.error_code,
    r.error_message,
    r.values,
    r.error_recoveries,
    m.soft_bounce,
    m.memo
from {{ ref('karte_base_masspush_event_log_regional') }} r
cross join unnest(r.user_data_list) as user_data
left join error_master m
  on r.error_type = m.error_type
  and r.error_code = m.error_code
  and r.error_message = m.error_message
where 1=1
    {% if is_incremental() %}
        and r.date >= DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 34 DAY) -- Concerned about trouble of source data ELT 
    {% else %}
        and r.date <= CURRENT_DATE('Asia/Tokyo')
    {% endif %}
  and r.plugin_type = 'ses'
  and m.soft_bounce = TRUE

特徴:

  • エラー分類: エラーマスタによるバウンスタイプの判定
  • ソフトバウンス特化: 一時的な配信失敗の追跡
  • 詳細分類: バウンス理由の分類

Mart Layer(分析層)

Mart モデルは、ビジネス分析用の最終的なデータセットを提供します。

1. karte_mart_masspush_messages.sql

キャンペーンレベルのパフォーマンスメトリクス

KARTEの管理画面で確認できる情報と同じ粒度のメトリクスを保有

自社データと組み合わせてご活用ください

{{ config(
    materialized = 'table',
    tags = ["daily_karte_job"]
) }}

with send_campaigns as (
    select
        mm.system_start_date,
        mm.campaign_id,
        mm.campaign_name,
        mm.schedule_task_id,
        -- 基本メトリクス
        count(distinct mm.user_id) user_count,
        countif(mm.send) send_count,
        safe_divide(countif(mm.send), count(distinct mm.user_id)) send_rate,
        countif(mm.open) open_count,
        safe_divide(countif(mm.open), countif(mm.send)) open_rate,
        countif(mm.click) click_count,
        safe_divide(countif(mm.click), countif(mm.send)) click_rate,
        countif(mm.fail) fail_count,
        safe_divide(countif(mm.fail), countif(mm.send)) fail_rate,
        countif(mm.unsubscribe) unsubscribe_count,
        safe_divide(countif(mm.unsubscribe), countif(mm.send)) unsubscribe_rate,
        countif(array_to_string(mm.error_type_code_messages, ',') like '%Permanent:General%' or array_to_string(mm.error_type_code_messages, ',') like '%Permanent:Suppressed%') hard_bounce_count,
        safe_divide(countif(array_to_string(mm.error_type_code_messages, ',') like '%Permanent:General%' or array_to_string(mm.error_type_code_messages, ',') like '%Permanent:Suppressed%'), countif(mm.send)) hard_bounce_rate,
    from
        {{ ref('karte_mid_masspush_messages') }} mm
    where 1=1
        {% if is_incremental() %}
            and mm.system_start_date >= date_sub(current_date('Asia/Tokyo'), interval 33 day)
        {% else %}
            and mm.system_start_date <= current_date('Asia/Tokyo')
        {% endif %}
    group by 1,2,3,4
)
, error_counts as (
  select
    mm.system_start_date,
    mm.campaign_id,
    error_type_code_message,
    count(*) as error_count
  from
    {{ ref('karte_mid_masspush_messages') }} mm,
    unnest(mm.error_type_code_messages) as error_type_code_message
  where
    error_type_code_message is not null
  group by
    mm.system_start_date, mm.campaign_id, error_type_code_message
)
, company_data as (
select
    campaign_id,
    session_count,
    sales,
from
    <your own metrics table>
)
select
    sc.*,
    ir.* except(campaign_id),
    (
      select array_agg(struct(error_type_code_message as error_type, error_count as count))
      from error_counts ec
      where ec.system_start_date = sc.system_start_date
        and ec.campaign_id = sc.campaign_id
    ) as error_detail
from
    send_campaigns sc
    left join company_data cd
    on sc.campaign_id = cd.campaign_id

特徴:

  • 包括的なメトリクス: 送信、開封、クリック、失敗、購読解除の全指標
  • レート計算: SAFE_DIVIDE による安全な除算
  • エラー分析: 構造化されたエラー情報の集約
  • ハードバウンス検出: 正規表現によるバウンスタイプの識別

3. カスタム UDF とマクロの活用

カスタム UDF (User Defined Functions)

複雑な JSON 構造の解析には、JavaScript ベースのカスタム UDF を使用しています。これらは dbt/macros/udfs/karte_udfs.sql に定義されています:

1. parseMasspushEventLogRegionalUserDataList

ユーザーデータリストの配列を解析:

CREATE OR REPLACE FUNCTION udfs.parseMasspushEventLogRegionalUserDataList(input STRING)
RETURNS ARRAY<STRUCT<user_id STRING,
                     target STRING>>
LANGUAGE js AS """
return JSON.parse(input);
""";
2. parseMasspushEventLogRegionalErrorRecoveries

エラーリカバリ情報の配列を解析:

CREATE OR REPLACE FUNCTION udfs.parseMasspushEventLogRegionalErrorRecoveries(input STRING)
RETURNS ARRAY<STRUCT<recovery_type STRING,
                     timestamp STRING>>
LANGUAGE js AS """
return JSON.parse(input);
""";
3. parseMasspushSettingValueCampaignContent

キャンペーンコンテンツの複雑な構造を解析:

CREATE OR REPLACE FUNCTION udfs.parseMasspushSettingValueCampaignContent(input STRING)
RETURNS STRUCT<push_contents ARRAY<STRUCT<_id STRING,
                                          name STRING,
                                          push_content_id STRING,
                                          assign_rate INT64,
                                          is_archived BOOLEAN,
                                          thumbnail_url STRING,
                                          subject STRING,
                                          created STRING,
                                          modified STRING>>,
               ses_setting STRUCT<address_profile_id STRING>,
               native_app_setting STRUCT<show_on_inbox BOOLEAN,
                                         display_start_date_time STRING,
                                         display_end_date_time STRING>>
LANGUAGE js AS """
return JSON.parse(input);
""";
4. parseMasspushSettingValueCampaignSchedule

キャンペーンスケジュール情報を解析:

CREATE OR REPLACE FUNCTION udfs.parseMasspushSettingValueCampaignSchedule(input STRING)
RETURNS STRUCT<_id STRING,
               repeats BOOLEAN,
               repeat_days ARRAY<BOOLEAN>,
               repeat_hours ARRAY<BOOLEAN>,
               repeat_type STRING,
               repeat_day_in_month INT64,
               repeat_day_in_week INT64,
               repeat_hour_in_day INT64,
               repeat_minute INT64,
               has_end BOOLEAN,
               start_date STRING,
               end_date STRING,
               reservation_type STRING,
               repeat_every INT64,
               created STRING,
               modified STRING>
LANGUAGE js AS """
return JSON.parse(input);
""";

これらの UDF は、KARTE から取得した JSON 文字列を BigQuery で扱いやすい構造化データに変換する役割を担っています。

dbt マクロ

データ変換処理で使用している主要なマクロ:

1. to_json_string マクロ

dbt/macros/convert.sql に定義:

{%- macro to_json_string(fieldname) -%}
  to_json_string( `{{ fieldname }}` )
{%- endmacro %}

このマクロは、BigQuery のカラムを JSON 文字列に変換する際に使用します。

2. インクリメンタル更新用マクロ

dbt/macros/date.sql には、インクリメンタル更新で使用する複数のマクロが定義されています:

-- 基本的なインクリメンタル条件(timestamp変換付き)
{%- macro base_incremental_condition(key) -%}
  {% if is_incremental() %}
    and {{ timestamp_to_datetime(key) }} > (select max({{ key }}) from {{ this }})
    and _sdc_batched_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY)
  {% endif %}
{%- endmacro %}

-- シンプルなインクリメンタル条件
{%- macro incremental_condition(key) -%}
  {% if is_incremental() %}
    and {{ key }} > (select max({{ key }}) from {{ this }})
  {% endif %}
{%- endmacro %}

実装のポイントとベストプラクティス

1. UNNEST による配列データの展開

KARTE のイベントデータは、複数ユーザーの情報を配列として含むため、UNNEST を使用した展開が必須:

-- 良い例: LEFT JOIN UNNEST で NULL-safe な展開
FROM stg_table
LEFT JOIN UNNEST(parsed_user_data_list) AS user_data

-- 悪い例: CROSS JOIN は配列が空の場合にレコードが消える
FROM stg_table
CROSS JOIN UNNEST(parsed_user_data_list) AS user_data

2. インクリメンタル更新の最適化

  • 適切なルックバック期間の設定(通常30-34日)
  • insert_overwrite 戦略による重複の回避
  • パーティション単位での効率的な更新

3. エラーハンドリングとデータ品質

  • UDF 内での例外処理による堅牢性の確保
  • SAFE_DIVIDE による除算エラーの防止
  • NULL 値の適切な処理

4. パフォーマンスチューニング

-- パーティション pruning を活用
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
  AND date < CURRENT_DATE()

-- クラスタリングを活用したクエリ
WHERE campaign_id = 'specific_campaign'
  AND event_name IN ('message_send', 'message_open')

5. マルチチャネル対応

各チャネルの特性に応じた処理:

  • Email: バウンスタイプの詳細な分類、SPF/DKIM 対応
  • Push: デバイストークンの有効性確認
  • LINE: ブロック状態の追跡、友達追加状態の確認

6. マルチチャネル対応

運用上の工夫

1. スケジューリング

# Digdag による定期実行設定
schedule:
  daily_karte_job:
    cron: "X Y * * *"
    timezone: "Asia/Tokyo"

2. モニタリング

  • dbt test による品質チェック
  • Redash アラートによる異常値検知
  • 実行ログの定期監視

3. コスト最適化

  • 必要最小限のカラムのみを SELECT
  • 適切なパーティション設計
  • マテリアライズ戦略の最適化

まとめ

KARTE データパイプラインは、マルチチャネルマーケティングの効果測定と最適化を支える重要な基盤です。特に、UNNEST を活用した配列データの展開と、チャネル固有の処理により、複雑なイベントデータを分析可能な形に変換しています。

ELT アプローチと dbt を活用することで、スケーラブルで保守性の高いパイプラインを実現し、ビジネスの意思決定を支援する価値の高いインサイトを提供しています。

今後も、ビジネス要件の変化に柔軟に対応しながら、より価値の高いインサイトを提供できるよう、継続的な改善を進めていきます。

さいごに

ミツモアでは、データやAIを活用してデータドリブンな風土のある会社にて一緒に働く仲間を募集中です。

「技術で課題を解くことにワクワクできる人」や「仕組みで社会を良くしたい、そんなエンジニアになりたい人」、ぜひご応募をお待ちしています!

ミツモア採用ページ: https://corp.meetsmore.com/


関連リンク - dbt Documentation - KARTE Platform - BigQuery Best Practices