ミツモア Tech blog

「ミツモア」を運営する株式会社ミツモアの技術ブログです

dbtでメタデータ管理を完結させる!dbt sourceのメタデータをSTRUCT型カラムやポリシータグも含めBigQueryに連携する方法

ミツモア データグループの酒井です。データ分析基盤の構築や運用を担当しています。 今回はdbt sourceのメタデータをBigQueryに連携させる方法をご紹介します。

メタデータは重要です。 データ量が増え続ける中、メタデータが適切に管理されていないと、各テーブルやカラムの意味を把握するのが困難になり、データ活用の効率が大きく低下します。 特にミツモアではAIツールであるtext2SQLを内製しており、この精度を上げる為にもメタデータの効率的な管理が必要になります。

ミツモアのデータ分析基盤ではBigQueryをデータウェアハウスとして、データ変換ツールにdbt coreを使用しています。 dbt modelのスキーマ定義ファイルのメタデータは自動でBigQuery(INFORMATION_SCHEMA)に連携されますが、dbt sourceのメタデータは残念ながら連携されません。

これを反映させるための方法を調べたところ、こちらの記事を見つけ、非常に参考になりました。 記事の大枠は、dbt source定義ファイルからDDL文を作成し実行するGitHub Actionsワークフローを作るという内容です。

ただこのSQL(DDL)によって更新する方法では以下のような要件には対応できないことがわかりました。

1 STRUCT型内のネストされたカラムのメタデータの更新はできない

BigQueryでは、STRUCT型内のネストされたカラムに対してSQLのALTER文でdescriptionを更新することはできません。例えば以下のようなクエリは実行できません:

ALTER TABLE `project.dataset.table` 
ALTER COLUMN `fugafuga.hogehoge` 
SET OPTIONS(description = 'これはネストされたカラムです');

2 ポリシータグの付与ができない

ミツモアではセキュリティ要件に応じて、BigQueryのポリシータグを使用してカラムレベルでのアクセス制御を行っています。例えば、メールアドレスや電話番号などの個人情報を含むカラムには適切なポリシータグを付与し、IAMポリシーと連携してアクセスを制限しています。しかし、ポリシータグの付与はBigQuery Client API経由でのみ可能で、SQLのALTER文からは実行できません。

3. dbt source定義ファイルにJinja表記がある場合に未対応

dbt source定義ファイルでは、環境(production/devなど)によって異なるスキーマ名を参照するために、Jinjaテンプレートを使用しています。例えば以下のように、target.nameによってスキーマ名を切り替えています:

schema: |
  {%- if target.name == 'production' -%} production_db
  {%- else -%} staging_db
  {%- endif -%}

このようなJinja表記をそのままSQLに変換することはできず、事前にレンダリングをする実装が必要でした。

解決策

そこで、「persist_docsによってdbt modelが更新される時にはSTRUCT型にネストされたカラムのメタデータもBigQueryに反映されているはず」と考え、 dbt-coreのGitHubリポジトリを確認したところ、dbt-bigquery/dbt/adapters/bigquery/impl.pyのupdate_columns関数でBigQuery Client APIを使って更新処理を行っていました。

def update_columns(self, relation, columns):
    if len(columns) == 0:
        return

    conn = self.connections.get_thread_connection()
    table_ref = self.get_table_ref_from_relation(relation)
    table = conn.handle.get_table(table_ref)

    new_schema = []
    for bq_column in table.schema:
        bq_column_dict = bq_column.to_api_repr()
        new_bq_column_dict = self._update_column_dict(bq_column_dict, columns)
        new_schema.append(SchemaField.from_api_repr(new_bq_column_dict))

    new_table = google.cloud.bigquery.Table(table_ref, schema=new_schema)
    conn.handle.update_table(new_table, ["schema"])

dbtはSQLの依存関係を解決して実行するツールというイメージが強かったのですが、メタデータの更新処理ではBigQuery Client APIを直接呼び出していることがわかりました。この実装を参考に、GitHub Actionsとそれを呼び出すPythonスクリプトの実装に着手しました。

実装の詳細

GitHub Actionsワークフロー(update_source_schema.yml)

dbt source定義ファイル(dbt/models/sources/**/*.yml)が変更された際に自動実行されるワークフローです。変更されたファイルのみを対象に、Pythonスクリプトを実行します。

name: update_source_schemas

on:
  push:
    branches:
      - master
    paths:
      - 'dbt/models/sources/**/*.yml'
      - '.github/workflows/update_source_schemas.py'

jobs:
  update_source_schemas:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout repository
        uses: actions/checkout@v3
      
      - name: Get changed yml files
        run: |
          git fetch origin ${{ github.event.before }}
          CHANGED=$(git diff --name-only ${{ github.event.before }} ${{ github.sha }} -- 'dbt/models/sources/**/*.yml')
          CHANGED_YMLS=$(echo $CHANGED | tr ' ' ',')
          echo "CHANGED_YMLS=$CHANGED_YMLS" >> $GITHUB_ENV
      
      - name: Set up Python 3.11.3
        uses: actions/setup-python@v4
        with:
          python-version: '3.11.3'
      
      - name: Install dependencies
        run: |
          pip install google-cloud-bigquery pyyaml Jinja2
      
      - name: Authenticate to Google Cloud
        uses: google-github-actions/auth@v1
        with:
          credentials_json: ${{ secrets.GCLOUD_SERVICE_KEY }}
      
      - name: Set up Google Cloud SDK
        uses: google-github-actions/setup-gcloud@v1
      
      - name: Run script
        run: python .github/workflows/update_source_schemas.py

Pythonスクリプト(update_source_schemas.py)の実装詳細

スクリプトは以下の処理フローで動作します:

  1. 変更されたsource定義ファイルの特定 → 2. YAMLファイルの読み込み → 3. dbt変数の展開 → 4. Jinjaテンプレートのレンダリング → 5. BigQueryスキーマの取得と更新

各処理を詳しく見ていきましょう。

1. dbt変数の読み込みと展開

まず、dbt_project.ymlから変数(特にpolicy_tags)を読み込みます:

def load_dbt_vars() -> dict:
    """
    dbt_project.ymlからvars設定を読み込む
    """
    dbt_project_path = "dbt/dbt_project.yml"
    if os.path.exists(dbt_project_path):
        with open(dbt_project_path, "r") as file:
            dbt_project = yaml.safe_load(file)
            return dbt_project.get("vars", {})
    return {}

dbt source定義ファイルでは、ポリシータグを{{ var('policy_tags')['email'] }}のようなJinja表記で指定することがあります。これを実際のポリシータグ値に展開する処理がexpand_dbt_variables関数です:

def expand_dbt_variables(value, dbt_vars):
    """
    dbt変数展開を行う
    {{ var('policy_tags')['email'] }} のような構文を展開する
    """
    if not isinstance(value, str):
        return value
    
    # {{ var('policy_tags')['key'] }} パターンをマッチ
    pattern = r"\{\{\s*var\(['\"]policy_tags['\"]\)\s*\[['\"]([^'\"]+)['\"]\]\s*\}\}"
    
    def replace_var(match):
        var_key = match.group(1)
        policy_tags = dbt_vars.get("policy_tags", {})
        return policy_tags.get(var_key, match.group(0))  # 見つからない場合は元の文字列を返す
    
    return re.sub(pattern, replace_var, value)

例えば、dbt source定義ファイルで以下のように指定されている場合:

columns:
  - name: email
    policy_tags:
      - "{{ var('policy_tags')['email'] }}"

dbt_project.ymlvars.policy_tags.emailの値(例:"projects/project_id/locations/us/taxonomies/.../policyTags/...")に展開されます。

2. Jinjaテンプレートのレンダリング

source定義ファイルのschemaフィールドにJinja表記がある場合、production環境として固定でレンダリングします:

# Jinja2 環境を準備(production 固定)
jinja_env = Environment()
jinja_context = {"target": {"name": "production"}}

raw_schema = source.get("schema") or source.get("database") or source.get("name")
# source.schema が Jinja の場合に production 固定でレンダリング
if isinstance(raw_schema, str):
    try:
        template = jinja_env.from_string(raw_schema)
        rendered_schema = template.render(**jinja_context).strip()
        dataset = rendered_schema or raw_schema
    except Exception:
        dataset = raw_schema

例えば、以下のような定義:

schema: |
  {%- if target.name == 'production' -%} production_db
  {%- else -%} staging_db
  {%- endif -%}

は、production_dbにレンダリングされます。

3. ネストカラム対応のメタデータ更新

最も重要な処理が_update_column_dict関数です。この関数は、BigQueryのスキーマを再帰的に走査し、ネストされたカラム(STRUCT型内のフィールド)のメタデータを更新します:

def _update_column_dict(bq_column_dict, dbt_columns, parent=""):
    """
    Helper function to recursively traverse the schema of a table.
    dbt-coreの実装を参考にしたネストカラム対応のヘルパー関数。
    
    bq_column_dict should be a dict as obtained by the to_api_repr()
    function of a SchemaField object.
    """
    # ドット記法でカラム名を構築(例: "convenicode.expiredat")
    if parent:
        dotted_column_name = "{}.{}".format(parent, bq_column_dict["name"])
    else:
        dotted_column_name = bq_column_dict["name"]

    # dbt source定義に該当するカラムがある場合、descriptionとポリシータグを更新
    if dotted_column_name in dbt_columns:
        column_config = dbt_columns[dotted_column_name]
        description = column_config.get("description")
        if description:
            bq_column_dict["description"] = description
        
        # ポリシータグの処理(RECORD型(STRUCT型)以外のカラムのみ)
        if bq_column_dict["type"] != "RECORD":
            policy_tags = column_config.get("policy_tags", [])
            if policy_tags:
                bq_column_dict["policyTags"] = {"names": policy_tags}

    # 子フィールド(ネストされたカラム)を再帰的に処理
    new_fields = []
    for child_col_dict in bq_column_dict.get("fields", list()):
        new_child_column_dict = _update_column_dict(
            child_col_dict, dbt_columns, parent=dotted_column_name
        )
        new_fields.append(new_child_column_dict)

    bq_column_dict["fields"] = new_fields
    return bq_column_dict

このように再帰的に処理することで、任意の深さのネストカラムに対応できます。

4. BigQuery Client APIによる更新

最後に、更新されたスキーマをBigQueryに反映します:

def update_table_and_columns_with_api(client, project, dataset, table, table_desc, columns):
    """
    BigQuery Client APIを使用してテーブルとカラム(ネストカラム含む)のdescriptionを更新する。
    dbt-coreのupdate_columnsメソッドを参考にした実装。
    """
    table_ref = client.dataset(dataset, project=project).table(table)
    
    # 現在のテーブルスキーマを取得
    bigquery_table = client.get_table(table_ref)
    
    # テーブルdescriptionを更新(必要に応じて)
    if table_desc:
        bigquery_table.description = table_desc
    
    # カラムが指定されている場合のみスキーマを更新
    if columns:
        # カラム設定を辞書形式に変換(ドット記法対応)
        dbt_columns = {}
        for col in columns:
            col_name = col.get("name")
            col_desc = col.get("description", "")
            col_policy_tags = col.get("policy_tags", [])
            
            if col_name and (col_desc or col_policy_tags):
                column_config = {}
                if col_desc:
                    column_config["description"] = col_desc
                if col_policy_tags:
                    column_config["policy_tags"] = col_policy_tags
                dbt_columns[col_name] = column_config
        
        # 新しいスキーマを構築
        new_schema = []
        for bq_column in bigquery_table.schema:
            bq_column_dict = bq_column.to_api_repr()
            new_bq_column_dict = _update_column_dict(bq_column_dict, dbt_columns)
            new_schema.append(SchemaField.from_api_repr(new_bq_column_dict))
        
        # テーブルスキーマを更新
        bigquery_table.schema = new_schema
    
    # BigQueryテーブルを更新(スキーマとdescriptionの両方)
    fields_to_update = []
    if table_desc:
        fields_to_update.append("description")
    if columns:
        fields_to_update.append("schema")
        
    if fields_to_update:
        client.update_table(bigquery_table, fields_to_update)

この処理により、SQLでは不可能だった以下の操作が実現できます:

  • STRUCT型内のネストカラムのdescription更新
  • ポリシータグの付与(SQLのALTER文では不可)
  • 既存スキーマを保持したまま、必要な部分のみを更新

まとめ

今回は、dbt sourceの定義ファイルに記載したメタデータをBigQueryのINFORMATION_SCHEMAに自動で反映する仕組みを構築しました。これにより、dbt source由来のテーブルについてもdbt modelと同様にメタデータがBigQuery上で参照できるようになり、データの意味や利用方法を容易に把握できる状態になりました。

ミツモアで一緒に働きませんか?

ミツモアでは、データやAIを活用してデータドリブンな風土のある会社にて一緒に働く仲間を募集中です。

「技術で課題を解くことにワクワクできる人」や「仕組みで社会を良くしたい、そんなエンジニアになりたい人」、ぜひご応募をお待ちしています!

ミツモア採用ページ: https://corp.meetsmore.com/