前言
很多团队同时运营微信群和企业内部的钉钉/飞书工作台。客户消息打进微信,运营同学还要手动复制粘贴转到钉钉群通知同事,既费时又容易遗漏。更理想的方式是:微信收到消息,服务端自动识别来源,再通过 Webhook 把内容推送到钉钉或飞书频道,全程无需人工干预。
本文完整介绍这套"微信 → 中间服务 → 钉钉/飞书"消息转发系统的设计思路与代码实现,适合有一定 Python 基础的后端同学参考落地。
一、整体架构与数据流
消息转发系统可以拆成三个环节:
微信消息(回调推送)
↓
中间服务(Flask/FastAPI)
↓
钉钉 Webhook / 飞书 Webhook
各环节职责:
| 环节 | 技术选型 | 核心任务 |
|---|---|---|
| 微信消息接入 | HTTP 回调(Webhook) | 接收微信消息原始数据 |
| 中间服务 | Python + Flask | 解析、过滤、路由、格式转换 |
| 下行推送 | 钉钉/飞书 Webhook | 把格式化消息投递到对应群 |
为什么需要"中间服务"?
直接把钉钉 Webhook 地址填到微信回调里是行不通的——微信回调是私有格式,钉钉/飞书无法识别。必须有一个中间服务做翻译:
- 接收微信回调 POST 请求;
- 解析消息类型(文字、图片、文件、语音……);
- 按路由规则(哪个群转到哪里)决定目标频道;
- 转换成钉钉/飞书各自要求的 JSON 格式;
- 调用目标 Webhook 推送。
二、微信侧:接入消息回调
使用微信 HTTP API 驱动个人微信账号时,平台会把收到的所有消息以 POST 请求推送到你预先注册的回调地址。回调字段结构(示例,具体以官方文档为准):
json{
"appId": "设备ID",
"fromWxid": "发送方wxid",
"toWxid": "接收方wxid(群或个人)",
"type": 1,
"content": "消息正文",
"msgId": "唯一消息ID",
"createTime": 1718000000
}
type 字段表示消息类型,常见值对照:
| type 值 | 含义 |
|---|---|
| 1 | 文字 |
| 3 | 图片 |
| 34 | 语音 |
| 43 | 视频 |
| 49 | 文件/链接卡片 |
| 10000 | 系统通知(如有人入群) |
代码为示例,具体接口与字段以官方文档为准。
2.1 设置回调地址
通过 setCallback 接口把你的中间服务地址注册到平台:
pythonimport requests
BASE = "https://你的接口域名" # 注册后在官方文档获取
TOKEN = "你的Token"
APPID = "你的appId"
HEADERS = {"token": TOKEN} # 鉴权字段名以官方文档为准
def set_callback(callback_url: str):
url = f"{BASE}/setCallback"
payload = {
"appId": APPID,
"callbackUrl": callback_url
}
resp = requests.post(url, json=payload, headers=HEADERS)
return resp.json()
# 示例:把你的服务器地址注册进去
result = set_callback("https://your-server.example.com/wechat/callback")
print(result)
回调地址必须公网可达,且在收到 POST 后必须返回 HTTP 200,否则平台会重试或停止推送。建议在部署前用 ngrok 或类似工具做本地调试,确认回调能正常接收再上线到生产服务器。
三、中间服务:解析与路由
使用 Flask 搭建接收端,处理微信回调并做消息路由。
3.1 项目依赖
flask
requests
3.2 接收回调
pythonfrom flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/wechat/callback", methods=["POST"])
def wechat_callback():
data = request.get_json(force=True, silent=True)
if not data:
return jsonify({"code": 400}), 400
msg_type = data.get("type", -1)
from_wxid = data.get("fromWxid", "")
content = data.get("content", "")
to_wxid = data.get("toWxid", "")
# 仅处理文字消息(其他类型按需扩展)
if msg_type == 1:
handle_text_message(from_wxid, to_wxid, content)
return jsonify({"code": 200}), 200
3.3 路由规则(群 → 目标平台映射)
在配置文件或字典里维护"哪个微信群 → 转发到哪个 Webhook"的映射,是整个系统最灵活的部分:
python# 路由规则:微信群wxid → (平台类型, Webhook地址)
ROUTE_TABLE = {
"xxxxxxxx@chatroom": ("dingtalk", "https://oapi.dingtalk.com/robot/send?access_token=xxx"),
"yyyyyyyy@chatroom": ("feishu", "https://open.feishu.cn/open-apis/bot/v2/hook/yyy"),
# 私聊也可路由
"wxid_zzz": ("feishu", "https://open.feishu.cn/open-apis/bot/v2/hook/zzz"),
}
def handle_text_message(from_wxid: str, to_wxid: str, content: str):
# 群消息:to_wxid 以 @chatroom 结尾;私聊:to_wxid 是自己的 wxid
target_id = to_wxid if to_wxid.endswith("@chatroom") else from_wxid
if target_id not in ROUTE_TABLE:
return # 不在路由表里的消息忽略
platform, webhook_url = ROUTE_TABLE[target_id]
sender_label = from_wxid # 实际可维护 wxid→昵称 的映射
if platform == "dingtalk":
push_to_dingtalk(webhook_url, sender_label, content)
elif platform == "feishu":
push_to_feishu(webhook_url, sender_label, content)
实际落地时,路由表建议从外部配置文件(如 YAML 或数据库)加载,方便运营人员在不重启服务的情况下动态调整转发规则。同时可以在路由层加过滤逻辑,例如只转发包含特定关键词的消息,或屏蔽某些发送人,减少无效通知对钉钉/飞书频道的干扰。
四、推送到钉钉
钉钉自定义机器人使用 text 或 markdown 格式。推荐用 markdown,可以展示发送人和时间。
pythonimport requests
import datetime
def push_to_dingtalk(webhook_url: str, sender: str, content: str):
"""
通过钉钉自定义机器人 Webhook 推送消息。
钉钉 Webhook 格式以钉钉开发者文档为准。
"""
now = datetime.datetime.now().strftime("%H:%M:%S")
body = {
"msgtype": "markdown",
"markdown": {
"title": "微信消息同步",
"text": (
f"**微信消息同步**\n\n"
f"**发送人:** {sender}\n\n"
f"**时间:** {now}\n\n"
f"**内容:**\n\n> {content}"
)
},
"at": {
"isAtAll": False
}
}
try:
resp = requests.post(webhook_url, json=body, timeout=5)
result = resp.json()
if result.get("errcode") != 0:
print(f"[钉钉推送失败] {result}")
except Exception as e:
print(f"[钉钉推送异常] {e}")
注意: 钉钉机器人默认开启了安全设置(关键词/IP白名单/签名),如果启用了签名验证,还需要在请求头里附加时间戳和 HMAC-SHA256 签名,具体参考钉钉开发者文档。此外,钉钉对同一机器人的调用频率有限制(默认每分钟不超过 20 条),高并发场景下建议在服务端做令牌桶限速,超出限额的消息进入队列延迟发送,而不是直接丢弃。
五、推送到飞书
飞书自定义机器人同样使用 Webhook,支持 text、post(富文本)、interactive(卡片)等格式。
pythondef push_to_feishu(webhook_url: str, sender: str, content: str):
"""
通过飞书自定义机器人 Webhook 推送消息。
飞书 Webhook 格式以飞书开放平台文档为准。
"""
body = {
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "微信消息同步",
"content": [
[{"tag": "text", "text": f"发送人:{sender}"}],
[{"tag": "text", "text": f"内容:{content}"}],
]
}
}
}
}
try:
resp = requests.post(webhook_url, json=body, timeout=5)
result = resp.json()
if result.get("code") != 0:
print(f"[飞书推送失败] {result}")
except Exception as e:
print(f"[飞书推送异常] {e}")
飞书的 post 富文本格式中,content 是二维数组——外层每个元素是一行,内层是该行的文字/链接/at 元素,灵活性比钉钉更高。如果需要在飞书侧 @指定成员,可以在 content 数组里加入 {"tag": "at", "user_id": "对应飞书user_id"} 元素,前提是机器人所在群已开启 at 权限。
六、处理图片与文件消息
文字消息最简单,但实际场景里图片和文件同样重要。对于非文字消息,有两种处理策略:
策略 A:仅推文字占位
当消息类型不是文字时,把"[图片]"、"[文件]"等占位文本推送到钉钉/飞书,不传输实际文件。这是最简单的实现,适合对内容完整性要求不高的场景。
pythonTYPE_LABEL = {
3: "[图片]",
34: "[语音]",
43: "[视频]",
49: "[文件/链接]",
}
def handle_any_message(from_wxid: str, to_wxid: str, msg_type: int, content: str):
if msg_type == 1:
text = content
else:
text = TYPE_LABEL.get(msg_type, f"[未知消息类型 {msg_type}]")
target_id = to_wxid if to_wxid.endswith("@chatroom") else from_wxid
if target_id not in ROUTE_TABLE:
return
platform, webhook_url = ROUTE_TABLE[target_id]
if platform == "dingtalk":
push_to_dingtalk(webhook_url, from_wxid, text)
elif platform == "feishu":
push_to_feishu(webhook_url, from_wxid, text)
策略 B:先下载再上传(完整转发)
通过 API 的 downloadImage / downloadFile 接口把文件下载到本地,再调用钉钉/飞书的文件上传接口,最后发送文件消息。
这种方案实现复杂度更高,但可以让接收方直接在钉钉/飞书里预览图片或下载文件。下载时建议做异步处理并加延迟,避免频繁下载触发风控:
pythonimport time
import threading
def async_download_and_push(webhook_url: str, platform: str, msg_id: str, sender: str):
"""
后台线程:下载文件后再推送,避免阻塞回调响应。
具体下载接口以官方文档为准。
"""
time.sleep(2) # 适当延迟,避免并发过高
# 1. 调用下载接口获取文件内容(略)
# 2. 上传到钉钉/飞书获取 file_key(略)
# 3. 发送文件消息(略)
pass
# 在回调处理里启动后台线程
threading.Thread(target=async_download_and_push,
args=(webhook_url, platform, msg_id, sender),
daemon=True).start()
七、使用 HTTP 接口驱动微信
本文示例中,微信侧的消息接收依赖能把个人微信账号暴露为 HTTP 接口的服务层。WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,HTTP 调用即可,适合本文这类需要接收回调并主动收发消息的场景,详情参考官方文档。
接口调用时有几点实践建议值得注意:
- 回调服务必须返回 200:如果你的 Flask 服务抛异常返回了 500,平台会重试,可能出现重复消息。建议在最外层
try/except兜底,始终返回 200。 - 不要在回调里做耗时操作:下载文件、调用外部接口等耗时逻辑放到后台线程或消息队列,回调函数本身只做接收和入队。
- 做消息去重:根据
msgId记录已处理的消息 ID(存 Redis 或本地字典),防止平台重试导致重复推送。 - 注意微信账号安全:频繁的自动化操作可能触发微信风控,建议控制消息发送频率,不要在极短时间内批量操作,同时避免在同一账号上同时挂载多个自动化任务。
八、常见问题排查
| 现象 | 可能原因 | 排查方向 |
|---|---|---|
| 收不到微信消息 | 回调地址不可达 | 检查公网IP/端口/防火墙;用 curl 从外网测试回调地址 |
| 收到消息但推钉钉失败 | 关键词安全设置不满足 | 消息内容里加上机器人配置的关键词;或改用签名模式 |
| 飞书推送返回 code 非0 | Webhook 失效或格式错误 | 重新获取 Webhook URL;对照飞书文档检查 JSON 结构 |
| 消息重复出现 | 回调响应超时导致重试 | 加快回调处理速度;加 msgId 去重逻辑 |
| 图片/文件消息丢失 | 仅处理了 type==1 | 在 handle_any_message 里补充其他 type 分支 |
| 钉钉推送触发频率限制 | 短时间内消息量过大 | 服务端加令牌桶限速,超限消息排队延迟发送 |
总结
微信到钉钉/飞书的消息转发,核心思路是:用 HTTP API 接入微信回调,在中间服务里做消息解析、路由分发和格式转换,再调用目标平台的 Webhook 完成推送。路由表配置是整个系统最关键的扩展点,可以按群维度灵活映射多个目标频道,也可以按消息类型做差异化处理。
落地时需要重点关注几个稳定性细节:回调必须快速响应并始终返回 200;耗时逻辑异步处理;利用 msgId 做去重防止重复推送;钉钉/飞书各平台的频率限制也要在服务端做好缓冲。把这几点处理好,整套系统就能稳定运行,切实降低团队跨平台沟通的人工成本。
