こんにちは、株式会社ミツモアでエンジニアをしている酒井です。 ミツモアでは、データパイプラインの運用を効率化するため、Slack通知によって起動されるデータエンジニアリングエージェント(以下、DEエージェント)を活用しています。本記事では、DEエージェントを使ってデータパイプラインの運用を自動化した一例をご紹介します。
こちらの記事ではdbt モデル単位でのアラートの通知方法についてご紹介しましたが、今回の記事はdbt job単位(workflow単位)でのアラートの通知方法になります。
技術スタック
- ワークフローエンジン: Digdag
- データ変換: dbt
- データウェアハウス: BigQuery
- 監視・可視化: Elementary
現在、約40のDigdagワークフローと約800のdbtモデルが日夜稼働しており、その中で発生するエラーに対してDEエージェントを使って自動対応できるようにしました。
課題: dbt実行時のエラーハンドリングの難しさ
従来の実装方法
もともと、Digdagからdbtを呼び出す際には、シェルスクリプト(shコマンド)で実行していました。以下はその例です。
#!/bin/sh
cd dbt
export DBT_PROFILES_DIR="$(pwd)"
uv run dbt run --target ${DIGDAG_ENV} -selector <セレクター名> --vars '{"JOB_NAME": "<ジョブ名>"}'
この方法では、以下の課題がありました:
- エラーメッセージの取得が困難: シェルスクリプトから実行した場合、エラーの詳細情報を構造化して取得することが難しく、DEエージェントに適切な情報を渡すことができませんでした。
- Invocation IDの取得が困難: dbtの実行ごとに割り当てられるInvocation IDを取得することが難しく、実行結果の追跡が困難でした。
- エラーハンドリングの柔軟性が低い: エラーの種類に応じた細かい制御ができず、エラー発生時の処理が限定的でした。
解決策: dbt Programmatic Invocationsの採用
Programmatic Invocationsとは
dbt Core v1.5以降では、CLIコマンドをPythonから直接呼び出すことができるProgrammatic Invocationsという機能が提供されています。これにより、dbtの実行をPythonコードから制御できるようになりました。
詳細はdbt公式ドキュメントを参照してください。
実装方法
1. dbtRunnerを使った実行
まず、dbtRunnerクラスを使用してdbtコマンドを実行します。
from dbt.cli.main import dbtRunner, dbtRunnerResult def run_dbt_command(cli_args: List[str]) -> int: """ 与えられた CLI 風の引数で dbt を実行し、 dbt CLI の慣習に互換の終了コード(0, 1, 2)を返します。 """ try: dbt = dbtRunner() res: dbtRunnerResult = dbt.invoke(cli_args) # dbt CLI の終了コードにマッピング # success=True の場合は 0 # ハンドリング済みの失敗で success=False -> 1(res.exception は None) # 未処理の例外 -> 2 if res.success: return 0 # 実行は完了したが失敗がある場合、success は False かつ exception は None if res.exception is None: return 1 # フォールバック(例外が発生していれば通常ここには到達しない) return 2 except Exception as exc: # 未処理のエラー(CLI の終了コード 2 相当) print(f"Unhandled dbt error: {exc}", file=sys.stderr) return 2
2. Invocation IDの取得
dbtの実行後、アーティファクトファイル(run_results.jsonやmanifest.json)からInvocation IDを取得します。
def _find_invocation_id_from_artifacts(project_root: Path) -> str: """dbt のアーティファクトから invocation_id を安定した順序で探索して返す。""" candidate_paths = [ project_root / "target" / "run_results.json", project_root / "target" / "manifest.json", ] for p in candidate_paths: data = _read_json_safe(p) inv_id = (data.get("metadata") or {}).get("invocation_id") if data else None if inv_id: return str(inv_id) return ""
3. エラーメッセージの抽出
run_results.jsonからエラーメッセージを抽出します。
def _extract_first_error_message(project_root: Path) -> str: """run_results.json からエラーメッセージを返す(なければ空文字)。""" rr_path = project_root / "target" / "run_results.json" rr = _read_json_safe(rr_path) for r in rr.get("results", []): if r.get("status") == "error": msg = r.get("message") or (r.get("adapter_response") or {}).get("message") if msg: return str(msg) return ""
4. ワークフロー全体の実装
これらの機能を統合したワークフロー関数を実装します。
def execute_dbt_workflow(cli_args: List[str]) -> None: """ 汎用 dbt 実行ワークフロー。 - CWD を dbt プロジェクトに変更 - dbt 実行 - invocation_id / error_message / subscribers を抽出し Digdag に保存 """ project_dir = (Path(__file__).resolve().parent / ".." / "dbt").resolve() prev_cwd = Path.cwd() # DBT_PROFILES_DIR を dbt プロジェクト直下に固定 prev_profiles_dir = os.environ.get("DBT_PROFILES_DIR") os.environ["DBT_PROFILES_DIR"] = str(project_dir) os.chdir(project_dir) try: exit_code = run_dbt_command(cli_args) # アーティファクトから invocation_id を取得(順序を固定) run_invocation_id = _find_invocation_id_from_artifacts(project_root=project_dir) if run_invocation_id: print(f"run_invocation_id: {run_invocation_id}") # エラーメッセージを抽出(必要時のみ) error_message = "" if exit_code != 0: error_message = _extract_first_error_message(project_root=project_dir) or ( f"dbt run failed with exit code {exit_code}" ) # subscribers を抽出(購読者:モデル一覧 形式に整形) subscribers_json = "" try: subs_mapping = extract_failed_model_subscribers_mapping(project_root=project_dir) if subs_mapping: # key: subscriber, value: List[model_name] lines = [] for subscriber in sorted(subs_mapping.keys()): models = sorted(list(subs_mapping.get(subscriber, []))) line = f"{subscriber} : {','.join(models)}" if models else f"{subscriber} :" lines.append(line) subscribers_json = json.dumps(lines) except Exception: subscribers_json = "" # Digdag 変数を一括保存(存在すれば) try: import digdag # type: ignore digdag.env.store({ "exit_code": str(exit_code), "error_message": error_message, "invocation_id": run_invocation_id or "", "subscribers": subscribers_json, "recovery_job_url": recovery_job_url if 'recovery_job_url' in locals() else "", }) except Exception: pass return finally: # 元のCWDに戻す try: os.chdir(prev_cwd) except Exception: pass # DBT_PROFILES_DIR を元に戻す try: if prev_profiles_dir is None: os.environ.pop("DBT_PROFILES_DIR", None) else: os.environ["DBT_PROFILES_DIR"] = prev_profiles_dir except Exception: pass
5. Digdagワークフローでの使用例
Digdagワークフローからは、以下のようにPython関数を呼び出します。
+main: +run: skip_on_overtime: true py>: dbt_bihourly.main +check: if>: ${exit_code != null && exit_code != '0'} _do: +export_ctx: _export: invocation_id: ${invocation_id} subscribers: ${subscribers} recovery_job_url: ${recovery_job_url} +fail_task: fail>: ${error_message}
Python側では、各ワークフロー用のラッパー関数を定義します。
def main() -> None: """ dbt(隔2時間)のワークフロー用ラッパー。 以下のコマンド実行に相当: dbt run --selector bi_hourly_models --target ${DIGDAG_ENV} --vars '{"JOB_NAME": "dbt-job-biHourly"}' """ target = os.environ.get("DIGDAG_ENV", "dev") cli_args: List[str] = [ "run", "--selector", "bi_hourly_models", "--target", target, "--vars", '{"JOB_NAME": "dbt-job-biHourly"}', ] execute_dbt_workflow(cli_args)
6. Slack通知への連携
エラー発生時には、Digdagの_errorセクションでSlack通知を送信します。この際、取得したinvocation_idやerror_messageをSlack通知に含めることができます。
_error: +notification: py>: utilities.send_error_notification.main _env: error_message: ${error.message} workflow_name: ${workflow_name} session_time: ${session_time} session_id: ${session_id} invocation_id: ${invocation_id} subscribers: ${subscribers} recovery_job_url: ${recovery_job_url}
Slack通知では、Invocation IDを含めた詳細情報を送信します。
def send_detail_notification(workflow_name, session_id, thread_ts, error_message, invocation_id, subscribers, slack_bot_token, slack_channel): """ 詳細情報をスレッド返信として送信 """ fields = [] # invocation_id のフィールドを追加(あれば) if invocation_id: fields.append({ "title": "Invocation ID", "value": f"`{invocation_id}`", "short": False }) # エラーメッセージのフィールドを追加 error_chunks = split_text_for_slack(error_message, max_chars=1900) for i, chunk in enumerate(error_chunks): title = "Error Message" if i == 0 else f"Error Message (Part {i+1})" formatted_chunk = f"```\\n{chunk}\\n```" fields.append({ "title": title, "value": formatted_chunk, "short": False }) # ... その他のフィールド ...
Elementaryとの連携
取得したInvocation IDは、Elementaryによって自動管理されているdbt_elementary.dbt_run_resultsテーブルで詳細情報を取得する際にも活用できます。
このテーブルには、各dbt実行の詳細な情報(実行時間、成功/失敗、エラーメッセージなど)が保存されており、Invocation IDをキーとして実行結果を追跡できます。
SELECT invocation_id, name, status, message, execute_started_at FROM `dbt_elementary.dbt_run_results` WHERE invocation_id = '8b123fcd-aa98-4d37-9307-904e7f9e8fec' and status in ('error','skipped')
これにより、DEエージェントは以下の情報を取得できます:
- Invocation ID: 実行を一意に識別するID
- エラーメッセージ: 構造化されたエラー情報
- 失敗したモデル情報: どのモデルが失敗したか
- Subscribers情報: 失敗したモデルに関連する連絡先
これらの情報を基に、DEエージェントは適切なエラー対応(Pull Requestの作成など)を自動的に実行できます。
まとめ
本記事では、dbtのProgrammatic Invocations機能を活用して、Digdagワークフローからdbtを実行し、エラー情報を適切に取得・伝達する方法を紹介しました。
主なポイント:
- Programmatic Invocationsの採用:
dbtRunnerクラスを使用することで、Pythonからdbtを柔軟に制御できるようになりました。 - Invocation IDの取得: 実行結果を追跡し、Elementaryのテーブルと連携できるようになりました。
- 構造化されたエラー情報: エラーメッセージや失敗したモデル情報を構造化して取得し、DEエージェントに適切な情報を渡せるようになりました。
- Slack通知への連携: 取得した情報をSlack通知に含めることで、エラー発生時の対応が効率化されました。
この実装により、データパイプラインの運用がより自動化され、エラー対応の効率が大幅に向上しました。
ミツモアで一緒に働きませんか?
ミツモアでは、データやAIを活用してデータドリブンな風土のある会社にて一緒に働く仲間を募集中です。
「技術で課題を解くことにワクワクできる人」や「仕組みで社会を良くしたい、そんなエンジニアになりたい人」、ぜひご応募をお待ちしています!
ミツモア採用ページ: https://corp.meetsmore.com/