こんにちは、株式会社ミツモアでエンジニアをしている酒井です。 本記事では、dbtのpost_hook機能とBigQueryのUser-defined functions (UDF) in Pythonを組み合わせることで、データパイプライン内で自動的にSlack通知を送信する方法と簡単な使用例を紹介します。
この仕組みが解決する課題
データパイプラインを運用する中で、以下のような課題を抱えていました:
- データ品質の問題を早期に検出したい: rawデータのスキーマ変更や異常値の混入などを早期に検出し、下流への影響を最小限に抑えたい
- 問題発生時に即座に通知を受けたい: Slackなど、チームが日常的に使うツールで即座に通知を受け取りたい
- 問題の原因を素早く特定したい: アラートが発生した際に、どのテーブルに問題があったのかをすぐに判断したい
そこで考えたのがdbtモデルのpost_hookにアラート機能を組み込む今回の仕組みです。 こうすることでdbtモデルがbuildされた直後に通知を受け取ることができます。 また、dbtのリネージュ機能を活用して、アラートが発生したモデルからそのモデルが依存しているテーブルやモデルを遡って確認できるため、問題の根本原因となっているテーブルを迅速に特定できます。
一方、アラート機能を別のワークフロー(例:独立したSQLクエリや外部スクリプト)として実装した場合、dbtのリネージュグラフには含まれないため、どのテーブルに依存したアラートなのかをすぐには判断できません。
またミツモアではこのアラート通知をAIエージェントが受け取り、自動的にPull Requestを作成する仕組みに活用しています。 アラートが発生すると、Slack通知に含まれる情報(モデル名、エラー内容など)を基に、AIエージェントが適切な修正内容を判断し、自動的にPull Requestを作成します。
例えば、rawデータのスキーマ変更の検出アラートでは、AIエージェントが新しいカラムの追加を検出し、dbtのsource定義ファイルを自動的に更新するPull Requestを作成します。これにより、データパイプラインの運用をより効率的かつ自動化されたものにしています。
アーキテクチャ
この自動通知システムは以下の3つのコンポーネントで構成されています:
- Python UDF: BigQuery上で実行されるPython関数。Secret ManagerからSlack webhook URLを取得し、Slackに通知を送信します。
- dbtマクロ: dbtモデルの実行後に、条件に応じて通知を送信するSQLを生成します。
- dbtモデル: 監視対象のデータをクエリし、アラート条件に該当するレコードを検出します。
実装手順
1. Python UDFの作成
まず、Slack通知を送信するためのPython UDFを作成します。このUDFは、Secret Managerからwebhook URLを取得し、Slackに通知を送信します。
notification_udfs.sql:
{%- macro notification_udfs() -%}
-- Slack通知用のPython UDFを作成
{% set query %}
CREATE OR REPLACE FUNCTION `{{ target.project }}.udfs.notify_slack`(
project_id STRING,
secret_name STRING,
message STRING
)
RETURNS STRING
LANGUAGE python
WITH CONNECTION `{{ target.project }}.us.slack_notification_connection`
OPTIONS (
runtime_version='python-3.11',
entry_point='notify_slack',
packages=['google-cloud-secret-manager>=2.16.0']
)
AS r'''
import json
import urllib.request
import urllib.error
from google.cloud import secretmanager
def notify_slack(project_id: str, secret_name: str, message: str) -> str:
"""
Secret ManagerからSlack webhook URLを取得してSlackに通知を送信する
Args:
project_id: GCPプロジェクトID
secret_name: Secret Managerに保存されたSlack webhook URLのシークレット名
message: 通知文
Returns:
成功時: "OK"
Raises:
Exception: Secret Managerからの取得失敗、Slack通知送信失敗など、エラーが発生した場合
"""
try:
# メッセージのバリデーション
if not message or not message.strip():
raise Exception("Message cannot be empty")
# Secret Managerからwebhook URLを取得
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
response = client.access_secret_version(request={"name": name})
webhook_url = response.payload.data.decode("utf-8").strip()
# Slackに通知を送信
payload = {
"text": message
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
webhook_url,
data=data,
headers={'Content-Type': 'application/json'}
)
try:
with urllib.request.urlopen(req, timeout=10) as response:
result = response.read().decode('utf-8')
if response.status == 200:
return "OK"
else:
raise Exception(f"Slack API returned status {response.status}: {result}")
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8') if e.fp else "No error body"
raise Exception(f"HTTP Error {e.code}: {e.reason}. Response: {error_body}")
except Exception as e:
raise Exception(str(e))
''';
{% endset %}
{{ return(query) }}
{%- endmacro %}
2. dbtマクロの作成
次に、dbtモデルの実行後に条件に応じて通知を送信するマクロを作成します。
notify_slack_if_records_exist.sql:
{%- macro notify_slack_if_records_exist(secret_name, message_column_name='message') -%}
{#
post_hook用のマクロ: レコードが存在する場合にSlack通知を送信するSQLを返す
指定されたカラムの1行目の値をメッセージとして使用する
Args:
secret_name: Secret Managerに保存されたSlack webhook URLのシークレット名
message_column_name: メッセージを取得するカラム名(デフォルト: 'message')
#}
{# モデル名を取得(this.identifierを使用) #}
{%- set model_name = this.identifier -%}
{# レコードが存在する場合のみUDFを呼び出すSQLを生成 #}
{%- set hook_sql -%}
-- レコードが存在する場合のみSlack通知を送信
-- 指定されたカラムの1行目の値をメッセージとして使用し、先頭にモデル名を追加
select `{{ target.project }}.udfs.notify_slack`(
'{{ target.project }}',
'{{ secret_name }}',
concat(
'Notification by {{ model_name }}\\n\\n',
(select {{ message_column_name }} from {{ this }} where {{ message_column_name }} is not null limit 1 )
)
) as result
from unnest([1])
where (select count(*) from {{ this }}) > 0
{%- endset -%}
{{ return(hook_sql) }}
{%- endmacro %}
動作の仕組み:
- dbtモデルが実行された後(post_hook)、このマクロが呼び出されます
- モデルにレコードが存在する場合(
count(*) > 0)のみ、Python UDFを呼び出してSlack通知を送信します messageカラムの1行目の値を通知メッセージとして使用します
3. dbtモデルでの使用例
簡単な例として、エラーレコードを検出して通知するdbtモデルを作成します。
mart_detect_errors.sql:
{{
config(
materialized='view',
tags=['monitoring'],
post_hook = "{{ notify_slack_if_records_exist('slack_webhook_urlが格納されているsecret managerのsecret名', 'message') }}"
)
}}
-- ログテーブルからエラーステータスのレコードを検出する例
select
concat(
'⚠️ エラーレコードが検出されました\n\n',
'エラー件数: ', cast(count(*) as string), '件'
) as message
from
{{ ref('log_table') }}
where
status = 'error'
使用例の説明:
post_hookでnotify_slack_if_records_existマクロを呼び出します- モデルが実行され、エラーレコードが1行以上存在する場合、自動的にSlack通知が送信されます
messageカラムに通知内容を設定します
実行フロー
- dbtモデルの実行: dbtモデルが実行され、監視対象のデータをクエリします
- レコード検出: アラート条件に該当するレコードが検出されます
- post_hookの実行: モデル実行後、
post_hookで指定されたマクロが実行されます - 条件チェック: マクロ内でレコードの存在を確認します
Slack通知: レコードが存在する場合、Python UDFを呼び出してSlack通知を送信します

まとめ
User-defined functions in Pythonとdbtのhooks機能を組み合わせることで、データパイプライン内で自動的にSlack通知を送信する仕組みを実現できます。この仕組みにより、データ品質の問題や異常を早期に検出し、迅速に対応できるようになりました。さらに、AIエージェントとの連携により、問題の検出から修正までを自動化し、より効率的なデータパイプラインの運用を実現しています。
ミツモアで一緒に働きませんか?
ミツモアでは、データやAIを活用してデータドリブンな風土のある会社にて一緒に働く仲間を募集中です。
「技術で課題を解くことにワクワクできる人」や「仕組みで社会を良くしたい、そんなエンジニアになりたい人」、ぜひご応募をお待ちしています!
ミツモア採用ページ: https://corp.meetsmore.com/