ミツモア Tech blog

「ミツモア」を運営する株式会社ミツモアの技術ブログです

dbt Programmatic Invocationsとデータエンジニアリングエージェントによるデータパイプライン運用自動化への道

こんにちは、株式会社ミツモアでエンジニアをしている酒井です。 ミツモアでは、データパイプラインの運用を効率化するため、Slack通知によって起動されるデータエンジニアリングエージェント(以下、DEエージェント)を活用しています。本記事では、DEエージェントを使ってデータパイプラインの運用を自動化した一例をご紹介します。

こちらの記事ではdbt モデル単位でのアラートの通知方法についてご紹介しましたが、今回の記事はdbt job単位(workflow単位)でのアラートの通知方法になります。

技術スタック

  • ワークフローエンジン: Digdag
  • データ変換: dbt
  • データウェアハウス: BigQuery
  • 監視・可視化: Elementary

現在、約40のDigdagワークフローと約800のdbtモデルが日夜稼働しており、その中で発生するエラーに対してDEエージェントを使って自動対応できるようにしました。

課題: dbt実行時のエラーハンドリングの難しさ

従来の実装方法

もともと、Digdagからdbtを呼び出す際には、シェルスクリプト(shコマンド)で実行していました。以下はその例です。

#!/bin/sh
cd dbt
export DBT_PROFILES_DIR="$(pwd)"
uv run dbt run --target ${DIGDAG_ENV} -selector <セレクター名> --vars '{"JOB_NAME": "<ジョブ名>"}'

この方法では、以下の課題がありました:

  1. エラーメッセージの取得が困難: シェルスクリプトから実行した場合、エラーの詳細情報を構造化して取得することが難しく、DEエージェントに適切な情報を渡すことができませんでした。
  2. Invocation IDの取得が困難: dbtの実行ごとに割り当てられるInvocation IDを取得することが難しく、実行結果の追跡が困難でした。
  3. エラーハンドリングの柔軟性が低い: エラーの種類に応じた細かい制御ができず、エラー発生時の処理が限定的でした。

解決策: dbt Programmatic Invocationsの採用

Programmatic Invocationsとは

dbt Core v1.5以降では、CLIコマンドをPythonから直接呼び出すことができるProgrammatic Invocationsという機能が提供されています。これにより、dbtの実行をPythonコードから制御できるようになりました。

詳細はdbt公式ドキュメントを参照してください。

実装方法

1. dbtRunnerを使った実行

まず、dbtRunnerクラスを使用してdbtコマンドを実行します。

from dbt.cli.main import dbtRunner, dbtRunnerResult

def run_dbt_command(cli_args: List[str]) -> int:
    """
    与えられた CLI 風の引数で dbt を実行し、
    dbt CLI の慣習に互換の終了コード(0, 1, 2)を返します。
    """
    try:
        dbt = dbtRunner()
        res: dbtRunnerResult = dbt.invoke(cli_args)

        # dbt CLI の終了コードにマッピング
        # success=True の場合は 0
        # ハンドリング済みの失敗で success=False -> 1(res.exception は None)
        # 未処理の例外 -> 2
        if res.success:
            return 0

        # 実行は完了したが失敗がある場合、success は False かつ exception は None
        if res.exception is None:
            return 1

        # フォールバック(例外が発生していれば通常ここには到達しない)
        return 2

    except Exception as exc:  # 未処理のエラー(CLI の終了コード 2 相当)
        print(f"Unhandled dbt error: {exc}", file=sys.stderr)
        return 2

2. Invocation IDの取得

dbtの実行後、アーティファクトファイル(run_results.jsonmanifest.json)からInvocation IDを取得します。

def _find_invocation_id_from_artifacts(project_root: Path) -> str:
    """dbt のアーティファクトから invocation_id を安定した順序で探索して返す。"""
    candidate_paths = [
        project_root / "target" / "run_results.json",
        project_root / "target" / "manifest.json",
    ]
    for p in candidate_paths:
        data = _read_json_safe(p)
        inv_id = (data.get("metadata") or {}).get("invocation_id") if data else None
        if inv_id:
            return str(inv_id)
    return ""

3. エラーメッセージの抽出

run_results.jsonからエラーメッセージを抽出します。

def _extract_first_error_message(project_root: Path) -> str:
    """run_results.json からエラーメッセージを返す(なければ空文字)。"""
    rr_path = project_root / "target" / "run_results.json"
    rr = _read_json_safe(rr_path)
    for r in rr.get("results", []):
        if r.get("status") == "error":
            msg = r.get("message") or (r.get("adapter_response") or {}).get("message")
            if msg:
                return str(msg)
    return ""

4. ワークフロー全体の実装

これらの機能を統合したワークフロー関数を実装します。

def execute_dbt_workflow(cli_args: List[str]) -> None:
    """
    汎用 dbt 実行ワークフロー。
    - CWD を dbt プロジェクトに変更
    - dbt 実行
    - invocation_id / error_message / subscribers を抽出し Digdag に保存
    """
    project_dir = (Path(__file__).resolve().parent / ".." / "dbt").resolve()
    prev_cwd = Path.cwd()
    # DBT_PROFILES_DIR を dbt プロジェクト直下に固定
    prev_profiles_dir = os.environ.get("DBT_PROFILES_DIR")
    os.environ["DBT_PROFILES_DIR"] = str(project_dir)
    os.chdir(project_dir)

    try:
        exit_code = run_dbt_command(cli_args)

        # アーティファクトから invocation_id を取得(順序を固定)
        run_invocation_id = _find_invocation_id_from_artifacts(project_root=project_dir)
        if run_invocation_id:
            print(f"run_invocation_id: {run_invocation_id}")

        # エラーメッセージを抽出(必要時のみ)
        error_message = ""
        if exit_code != 0:
            error_message = _extract_first_error_message(project_root=project_dir) or (
                f"dbt run failed with exit code {exit_code}"
            )

        # subscribers を抽出(購読者:モデル一覧 形式に整形)
        subscribers_json = ""
        try:
            subs_mapping = extract_failed_model_subscribers_mapping(project_root=project_dir)
            if subs_mapping:
                # key: subscriber, value: List[model_name]
                lines = []
                for subscriber in sorted(subs_mapping.keys()):
                    models = sorted(list(subs_mapping.get(subscriber, [])))
                    line = f"{subscriber} : {','.join(models)}" if models else f"{subscriber} :"
                    lines.append(line)
                subscribers_json = json.dumps(lines)
        except Exception:
            subscribers_json = ""

        # Digdag 変数を一括保存(存在すれば)
        try:
            import digdag  # type: ignore
            digdag.env.store({
                "exit_code": str(exit_code),
                "error_message": error_message,
                "invocation_id": run_invocation_id or "",
                "subscribers": subscribers_json,
                "recovery_job_url": recovery_job_url if 'recovery_job_url' in locals() else "",
            })
        except Exception:
            pass

        return
    finally:
        # 元のCWDに戻す
        try:
            os.chdir(prev_cwd)
        except Exception:
            pass
        # DBT_PROFILES_DIR を元に戻す
        try:
            if prev_profiles_dir is None:
                os.environ.pop("DBT_PROFILES_DIR", None)
            else:
                os.environ["DBT_PROFILES_DIR"] = prev_profiles_dir
        except Exception:
            pass

5. Digdagワークフローでの使用例

Digdagワークフローからは、以下のようにPython関数を呼び出します。

+main:
  +run:
    skip_on_overtime: true
    py>: dbt_bihourly.main

  +check:
    if>: ${exit_code != null && exit_code != '0'}
    _do:
      +export_ctx:
        _export:
          invocation_id: ${invocation_id}
          subscribers: ${subscribers}
          recovery_job_url: ${recovery_job_url}
      +fail_task:
        fail>: ${error_message}

Python側では、各ワークフロー用のラッパー関数を定義します。

def main() -> None:
    """
    dbt(隔2時間)のワークフロー用ラッパー。
    以下のコマンド実行に相当:
      dbt run --selector bi_hourly_models --target ${DIGDAG_ENV} --vars '{"JOB_NAME": "dbt-job-biHourly"}'
    """
    target = os.environ.get("DIGDAG_ENV", "dev")
    cli_args: List[str] = [
        "run",
        "--selector",
        "bi_hourly_models",
        "--target",
        target,
        "--vars",
        '{"JOB_NAME": "dbt-job-biHourly"}',
    ]
    execute_dbt_workflow(cli_args)

6. Slack通知への連携

エラー発生時には、Digdagの_errorセクションでSlack通知を送信します。この際、取得したinvocation_iderror_messageをSlack通知に含めることができます。

_error:
  +notification:
    py>: utilities.send_error_notification.main
    _env:
      error_message: ${error.message}
      workflow_name: ${workflow_name}
      session_time: ${session_time}
      session_id: ${session_id}
      invocation_id: ${invocation_id}
      subscribers: ${subscribers}
      recovery_job_url: ${recovery_job_url}

Slack通知では、Invocation IDを含めた詳細情報を送信します。

def send_detail_notification(workflow_name, session_id, thread_ts, error_message, invocation_id, subscribers, slack_bot_token, slack_channel):
    """
    詳細情報をスレッド返信として送信
    """
    fields = []

    # invocation_id のフィールドを追加(あれば)
    if invocation_id:
        fields.append({
            "title": "Invocation ID",
            "value": f"`{invocation_id}`",
            "short": False
        })

    # エラーメッセージのフィールドを追加
    error_chunks = split_text_for_slack(error_message, max_chars=1900)
    for i, chunk in enumerate(error_chunks):
        title = "Error Message" if i == 0 else f"Error Message (Part {i+1})"
        formatted_chunk = f"```\\n{chunk}\\n```"
        fields.append({
            "title": title,
            "value": formatted_chunk,
            "short": False
        })

    # ... その他のフィールド ...

Elementaryとの連携

取得したInvocation IDは、Elementaryによって自動管理されているdbt_elementary.dbt_run_resultsテーブルで詳細情報を取得する際にも活用できます。

このテーブルには、各dbt実行の詳細な情報(実行時間、成功/失敗、エラーメッセージなど)が保存されており、Invocation IDをキーとして実行結果を追跡できます。

SELECT
  invocation_id,
  name,
  status,
  message,
  execute_started_at
FROM `dbt_elementary.dbt_run_results`
WHERE invocation_id = '8b123fcd-aa98-4d37-9307-904e7f9e8fec'
  and status in ('error','skipped')

これにより、DEエージェントは以下の情報を取得できます:

  1. Invocation ID: 実行を一意に識別するID
  2. エラーメッセージ: 構造化されたエラー情報
  3. 失敗したモデル情報: どのモデルが失敗したか
  4. Subscribers情報: 失敗したモデルに関連する連絡先

これらの情報を基に、DEエージェントは適切なエラー対応(Pull Requestの作成など)を自動的に実行できます。

まとめ

本記事では、dbtのProgrammatic Invocations機能を活用して、Digdagワークフローからdbtを実行し、エラー情報を適切に取得・伝達する方法を紹介しました。

主なポイント:

  1. Programmatic Invocationsの採用: dbtRunnerクラスを使用することで、Pythonからdbtを柔軟に制御できるようになりました。
  2. Invocation IDの取得: 実行結果を追跡し、Elementaryのテーブルと連携できるようになりました。
  3. 構造化されたエラー情報: エラーメッセージや失敗したモデル情報を構造化して取得し、DEエージェントに適切な情報を渡せるようになりました。
  4. Slack通知への連携: 取得した情報をSlack通知に含めることで、エラー発生時の対応が効率化されました。

この実装により、データパイプラインの運用がより自動化され、エラー対応の効率が大幅に向上しました。

ミツモアで一緒に働きませんか?

ミツモアでは、データやAIを活用してデータドリブンな風土のある会社にて一緒に働く仲間を募集中です。

「技術で課題を解くことにワクワクできる人」や「仕組みで社会を良くしたい、そんなエンジニアになりたい人」、ぜひご応募をお待ちしています!

ミツモア採用ページ: https://corp.meetsmore.com/