import requests
import json
import os
import time
import sys
def send_dingtalk_message(webhook_url, success_count, fail_count, fail_module, job_name, build_number, build_user, build_url):
"""
发送Markdown格式的钉钉通知消息,支持更丰富的排版样式
"""
# 获取当前时间
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 计算总脚本数和成功率
total_scripts = success_count + fail_count
success_rate = f"{(success_count / total_scripts * 100):.1f}%" if total_scripts > 0 else "0%"
# 根据结果设置状态标识和颜色
if fail_count == 0:
status_text = "构建成功"
status_color = "green" # 绿色表示成功
status_emoji = "✅"
else:
status_text = "构建失败"
status_color = "red" # 红色表示失败
status_emoji = "❌"
# 构造Markdown内容(支持标题、加粗、颜色、列表等格式)
markdown_content = f"""
test
### {status_emoji} Jenkins构建通知 [{job_name}]
> **状态**:<font color="{status_color}">{status_text}</font>
> **构建编号**:#{build_number}
| 信息类型 | 详情 |
|----------------|--------------------------|
| 📌 项目名称 | {job_name} |
| 👤 构建人员 | {build_user} |
| ⏰ 结束时间 | {end_time} |
| 🖥 构建环境 | 老版本-预发布环境 |
| 🔗 报告链接 | [查看详细日志]({build_url})|
#### 构建结果统计
- 📈 总脚本数:{total_scripts} 个
- ✅ 成功脚本:{success_count} 个
- ❌ 失败脚本:{fail_count} 个
- ❌ 失败脚本:{fail_module}
- 📊 成功率:<font color="blue">{success_rate}</font>
""".strip() # 去除首尾空行
# 构造Markdown类型消息
message = {
"msgtype": "markdown",
"markdown": {
"title": f"Jenkins构建通知 - {job_name} #{build_number}", # 消息标题(通知栏显示)
"text": markdown_content
},
"at": {
"isAtAll": False # 不@所有人,如需@特定人可添加"atMobiles": ["手机号"]
}
}
headers = {"Content-Type": "application/json"}
# VPN代理
proxies = {"https": None}
try:
response = requests.post(
webhook_url,
headers=headers,
data=json.dumps(message),
proxies=proxies,
timeout=10
)
'''response.raise_for_status() 是一个 “主动报错” 的方法,用于显式检查 HTTP 请求是否成功,
避免错误的响应被静默处理。在需要确保请求成功的场景(如接口调用、数据爬取)中非常实用'''
response.raise_for_status()
result = response.json()
if result.get("errcode") == 0:
print("钉钉通知发送成功")
else:
print(f"钉钉通知发送失败: {result.get('errmsg')}")
return result
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None
if __name__ == "__main__":
# 从环境变量获取构建信息
job_name = os.getenv("JOB_NAME", "未知项目")
build_number = os.getenv("BUILD_NUMBER", "0")
build_user = os.getenv("BUILD_USER", "debug")
# build_url = os.getenv("BUILD_URL", "#)
build_url = f'http://192.168.3.165:8080/jenkins/job/ceshi//{build_number}/console'
# jenkins-test钉钉内部群通知-0001
webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=9ac34e44f528d56d6cbd7d4a50ac6a25e8264750bd7401a3543a43c816c1816f"
# 初始化计数
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from case.game_scrip import success_count, fail_count, fail_files
success_count = success_count
fail_count = fail_count
fail_module = fail_files
send_dingtalk_message(webhook_url, success_count, fail_count, fail_module, job_name, build_number, build_user, build_url)