Files
SmartUp/backend/app/utils/dingtalk.py
T
2026-05-12 17:51:53 +08:00

60 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""DingTalk webhook signing and message formatting (ported from monitor script)."""
import base64
import hashlib
import hmac
import json
import time
from typing import Any
from urllib.parse import quote_plus
def dingtalk_signed_url(webhook_url: str, secret: str) -> str:
timestamp = str(int(time.time() * 1000))
string_to_sign = f"{timestamp}\n{secret}".encode("utf-8")
digest = hmac.new(secret.encode("utf-8"), string_to_sign, hashlib.sha256).digest()
sign = quote_plus(base64.b64encode(digest).decode("utf-8"))
sep = "&" if "?" in webhook_url else "?"
return f"{webhook_url}{sep}timestamp={timestamp}&sign={sign}"
def format_dingtalk_rate_changed(upstream_name: str, changed_at: str, changes: list[dict[str, Any]]) -> dict[str, Any]:
lines = [
f"### 📊 {upstream_name} 分组倍率变更",
"",
f"- **时间**{changed_at}",
f"- **变化数量**{len(changes)}",
"",
]
for ch in changes:
name = ch.get("group_name") or ch.get("group_id") or "unknown"
platform = ch.get("platform") or "-"
old = ch.get("old_rate")
new = ch.get("new_rate")
lines.append(f"- `{name}` ({platform})`{old}` → `{new}`")
return {
"msgtype": "markdown",
"markdown": {
"title": f"{upstream_name} 分组倍率变更",
"text": "\n".join(lines),
},
}
def format_dingtalk_status(upstream_name: str, event: str, changed_at: str, error: str = "") -> dict[str, Any]:
emoji = "🔴" if event == "upstream_unhealthy" else "🟢"
label = "服务异常" if event == "upstream_unhealthy" else "服务恢复"
lines = [
f"### {emoji} {upstream_name} {label}",
"",
f"- **时间**{changed_at}",
]
if error:
lines.append(f"- **错误**{error}")
return {
"msgtype": "markdown",
"markdown": {
"title": f"{upstream_name} {label}",
"text": "\n".join(lines),
},
}