首页 / 博客 / 机器人·功能实战

把微信消息实时同步到钉钉/飞书(消息转发实践)

分类:机器人·功能实战 · 标签:微信消息转发、钉钉机器人、飞书Webhook

前言

很多团队同时运营微信群和企业内部的钉钉/飞书工作台。客户消息打进微信,运营同学还要手动复制粘贴转到钉钉群通知同事,既费时又容易遗漏。更理想的方式是:微信收到消息,服务端自动识别来源,再通过 Webhook 把内容推送到钉钉或飞书频道,全程无需人工干预。

本文完整介绍这套"微信 → 中间服务 → 钉钉/飞书"消息转发系统的设计思路与代码实现,适合有一定 Python 基础的后端同学参考落地。


一、整体架构与数据流

消息转发系统可以拆成三个环节:

微信消息(回调推送)
      ↓
中间服务(Flask/FastAPI)
      ↓
钉钉 Webhook / 飞书 Webhook

各环节职责:

环节技术选型核心任务
微信消息接入HTTP 回调(Webhook)接收微信消息原始数据
中间服务Python + Flask解析、过滤、路由、格式转换
下行推送钉钉/飞书 Webhook把格式化消息投递到对应群

为什么需要"中间服务"?

直接把钉钉 Webhook 地址填到微信回调里是行不通的——微信回调是私有格式,钉钉/飞书无法识别。必须有一个中间服务做翻译:

  1. 接收微信回调 POST 请求;
  2. 解析消息类型(文字、图片、文件、语音……);
  3. 按路由规则(哪个群转到哪里)决定目标频道;
  4. 转换成钉钉/飞书各自要求的 JSON 格式;
  5. 调用目标 Webhook 推送。

二、微信侧:接入消息回调

使用微信 HTTP API 驱动个人微信账号时,平台会把收到的所有消息以 POST 请求推送到你预先注册的回调地址。回调字段结构(示例,具体以官方文档为准):

json{
  "appId": "设备ID",
  "fromWxid": "发送方wxid",
  "toWxid": "接收方wxid(群或个人)",
  "type": 1,
  "content": "消息正文",
  "msgId": "唯一消息ID",
  "createTime": 1718000000
}

type 字段表示消息类型,常见值对照:

type 值含义
1文字
3图片
34语音
43视频
49文件/链接卡片
10000系统通知(如有人入群)
代码为示例,具体接口与字段以官方文档为准。

2.1 设置回调地址

通过 setCallback 接口把你的中间服务地址注册到平台:

pythonimport requests

BASE  = "https://你的接口域名"   # 注册后在官方文档获取
TOKEN = "你的Token"
APPID = "你的appId"
HEADERS = {"token": TOKEN}       # 鉴权字段名以官方文档为准

def set_callback(callback_url: str):
    url = f"{BASE}/setCallback"
    payload = {
        "appId": APPID,
        "callbackUrl": callback_url
    }
    resp = requests.post(url, json=payload, headers=HEADERS)
    return resp.json()

# 示例:把你的服务器地址注册进去
result = set_callback("https://your-server.example.com/wechat/callback")
print(result)

回调地址必须公网可达,且在收到 POST 后必须返回 HTTP 200,否则平台会重试或停止推送。建议在部署前用 ngrok 或类似工具做本地调试,确认回调能正常接收再上线到生产服务器。


三、中间服务:解析与路由

使用 Flask 搭建接收端,处理微信回调并做消息路由。

3.1 项目依赖

flask
requests

3.2 接收回调

pythonfrom flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/wechat/callback", methods=["POST"])
def wechat_callback():
    data = request.get_json(force=True, silent=True)
    if not data:
        return jsonify({"code": 400}), 400

    msg_type = data.get("type", -1)
    from_wxid = data.get("fromWxid", "")
    content   = data.get("content", "")
    to_wxid   = data.get("toWxid", "")

    # 仅处理文字消息(其他类型按需扩展)
    if msg_type == 1:
        handle_text_message(from_wxid, to_wxid, content)

    return jsonify({"code": 200}), 200

3.3 路由规则(群 → 目标平台映射)

在配置文件或字典里维护"哪个微信群 → 转发到哪个 Webhook"的映射,是整个系统最灵活的部分:

python# 路由规则:微信群wxid → (平台类型, Webhook地址)
ROUTE_TABLE = {
    "xxxxxxxx@chatroom": ("dingtalk", "https://oapi.dingtalk.com/robot/send?access_token=xxx"),
    "yyyyyyyy@chatroom": ("feishu",   "https://open.feishu.cn/open-apis/bot/v2/hook/yyy"),
    # 私聊也可路由
    "wxid_zzz": ("feishu", "https://open.feishu.cn/open-apis/bot/v2/hook/zzz"),
}

def handle_text_message(from_wxid: str, to_wxid: str, content: str):
    # 群消息:to_wxid 以 @chatroom 结尾;私聊:to_wxid 是自己的 wxid
    target_id = to_wxid if to_wxid.endswith("@chatroom") else from_wxid

    if target_id not in ROUTE_TABLE:
        return  # 不在路由表里的消息忽略

    platform, webhook_url = ROUTE_TABLE[target_id]
    sender_label = from_wxid  # 实际可维护 wxid→昵称 的映射

    if platform == "dingtalk":
        push_to_dingtalk(webhook_url, sender_label, content)
    elif platform == "feishu":
        push_to_feishu(webhook_url, sender_label, content)

实际落地时,路由表建议从外部配置文件(如 YAML 或数据库)加载,方便运营人员在不重启服务的情况下动态调整转发规则。同时可以在路由层加过滤逻辑,例如只转发包含特定关键词的消息,或屏蔽某些发送人,减少无效通知对钉钉/飞书频道的干扰。


四、推送到钉钉

钉钉自定义机器人使用 textmarkdown 格式。推荐用 markdown,可以展示发送人和时间。

pythonimport requests
import datetime

def push_to_dingtalk(webhook_url: str, sender: str, content: str):
    """
    通过钉钉自定义机器人 Webhook 推送消息。
    钉钉 Webhook 格式以钉钉开发者文档为准。
    """
    now = datetime.datetime.now().strftime("%H:%M:%S")
    body = {
        "msgtype": "markdown",
        "markdown": {
            "title": "微信消息同步",
            "text": (
                f"**微信消息同步**\n\n"
                f"**发送人:** {sender}\n\n"
                f"**时间:** {now}\n\n"
                f"**内容:**\n\n> {content}"
            )
        },
        "at": {
            "isAtAll": False
        }
    }
    try:
        resp = requests.post(webhook_url, json=body, timeout=5)
        result = resp.json()
        if result.get("errcode") != 0:
            print(f"[钉钉推送失败] {result}")
    except Exception as e:
        print(f"[钉钉推送异常] {e}")

注意: 钉钉机器人默认开启了安全设置(关键词/IP白名单/签名),如果启用了签名验证,还需要在请求头里附加时间戳和 HMAC-SHA256 签名,具体参考钉钉开发者文档。此外,钉钉对同一机器人的调用频率有限制(默认每分钟不超过 20 条),高并发场景下建议在服务端做令牌桶限速,超出限额的消息进入队列延迟发送,而不是直接丢弃。


五、推送到飞书

飞书自定义机器人同样使用 Webhook,支持 textpost(富文本)、interactive(卡片)等格式。

pythondef push_to_feishu(webhook_url: str, sender: str, content: str):
    """
    通过飞书自定义机器人 Webhook 推送消息。
    飞书 Webhook 格式以飞书开放平台文档为准。
    """
    body = {
        "msg_type": "post",
        "content": {
            "post": {
                "zh_cn": {
                    "title": "微信消息同步",
                    "content": [
                        [{"tag": "text", "text": f"发送人:{sender}"}],
                        [{"tag": "text", "text": f"内容:{content}"}],
                    ]
                }
            }
        }
    }
    try:
        resp = requests.post(webhook_url, json=body, timeout=5)
        result = resp.json()
        if result.get("code") != 0:
            print(f"[飞书推送失败] {result}")
    except Exception as e:
        print(f"[飞书推送异常] {e}")

飞书的 post 富文本格式中,content 是二维数组——外层每个元素是一行,内层是该行的文字/链接/at 元素,灵活性比钉钉更高。如果需要在飞书侧 @指定成员,可以在 content 数组里加入 {"tag": "at", "user_id": "对应飞书user_id"} 元素,前提是机器人所在群已开启 at 权限。


六、处理图片与文件消息

文字消息最简单,但实际场景里图片和文件同样重要。对于非文字消息,有两种处理策略:

策略 A:仅推文字占位

当消息类型不是文字时,把"[图片]"、"[文件]"等占位文本推送到钉钉/飞书,不传输实际文件。这是最简单的实现,适合对内容完整性要求不高的场景。

pythonTYPE_LABEL = {
    3:  "[图片]",
    34: "[语音]",
    43: "[视频]",
    49: "[文件/链接]",
}

def handle_any_message(from_wxid: str, to_wxid: str, msg_type: int, content: str):
    if msg_type == 1:
        text = content
    else:
        text = TYPE_LABEL.get(msg_type, f"[未知消息类型 {msg_type}]")

    target_id = to_wxid if to_wxid.endswith("@chatroom") else from_wxid
    if target_id not in ROUTE_TABLE:
        return
    platform, webhook_url = ROUTE_TABLE[target_id]
    if platform == "dingtalk":
        push_to_dingtalk(webhook_url, from_wxid, text)
    elif platform == "feishu":
        push_to_feishu(webhook_url, from_wxid, text)

策略 B:先下载再上传(完整转发)

通过 API 的 downloadImage / downloadFile 接口把文件下载到本地,再调用钉钉/飞书的文件上传接口,最后发送文件消息。

这种方案实现复杂度更高,但可以让接收方直接在钉钉/飞书里预览图片或下载文件。下载时建议做异步处理并加延迟,避免频繁下载触发风控:

pythonimport time
import threading

def async_download_and_push(webhook_url: str, platform: str, msg_id: str, sender: str):
    """
    后台线程:下载文件后再推送,避免阻塞回调响应。
    具体下载接口以官方文档为准。
    """
    time.sleep(2)  # 适当延迟,避免并发过高
    # 1. 调用下载接口获取文件内容(略)
    # 2. 上传到钉钉/飞书获取 file_key(略)
    # 3. 发送文件消息(略)
    pass

# 在回调处理里启动后台线程
threading.Thread(target=async_download_and_push,
                 args=(webhook_url, platform, msg_id, sender),
                 daemon=True).start()

七、使用 HTTP 接口驱动微信

本文示例中,微信侧的消息接收依赖能把个人微信账号暴露为 HTTP 接口的服务层。WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,HTTP 调用即可,适合本文这类需要接收回调并主动收发消息的场景,详情参考官方文档。

接口调用时有几点实践建议值得注意:

  1. 回调服务必须返回 200:如果你的 Flask 服务抛异常返回了 500,平台会重试,可能出现重复消息。建议在最外层 try/except 兜底,始终返回 200。
  2. 不要在回调里做耗时操作:下载文件、调用外部接口等耗时逻辑放到后台线程或消息队列,回调函数本身只做接收和入队。
  3. 做消息去重:根据 msgId 记录已处理的消息 ID(存 Redis 或本地字典),防止平台重试导致重复推送。
  4. 注意微信账号安全:频繁的自动化操作可能触发微信风控,建议控制消息发送频率,不要在极短时间内批量操作,同时避免在同一账号上同时挂载多个自动化任务。

八、常见问题排查

现象可能原因排查方向
收不到微信消息回调地址不可达检查公网IP/端口/防火墙;用 curl 从外网测试回调地址
收到消息但推钉钉失败关键词安全设置不满足消息内容里加上机器人配置的关键词;或改用签名模式
飞书推送返回 code 非0Webhook 失效或格式错误重新获取 Webhook URL;对照飞书文档检查 JSON 结构
消息重复出现回调响应超时导致重试加快回调处理速度;加 msgId 去重逻辑
图片/文件消息丢失仅处理了 type==1在 handle_any_message 里补充其他 type 分支
钉钉推送触发频率限制短时间内消息量过大服务端加令牌桶限速,超限消息排队延迟发送

总结

微信到钉钉/飞书的消息转发,核心思路是:用 HTTP API 接入微信回调,在中间服务里做消息解析、路由分发和格式转换,再调用目标平台的 Webhook 完成推送。路由表配置是整个系统最关键的扩展点,可以按群维度灵活映射多个目标频道,也可以按消息类型做差异化处理。

落地时需要重点关注几个稳定性细节:回调必须快速响应并始终返回 200;耗时逻辑异步处理;利用 msgId 做去重防止重复推送;钉钉/飞书各平台的频率限制也要在服务端做好缓冲。把这几点处理好,整套系统就能稳定运行,切实降低团队跨平台沟通的人工成本。

想动手试试?

WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,注册后几分钟跑通。

立即免费注册查看开发文档

相关产品页

🔗 微信机器人开发(产品页)🔗 微信群管理机器人(产品页)🔗 微信Hook(产品页)

相关文章

30 分钟做一个微信自动回复机器人(完整实战)微信机器人接入 GPT,实现智能自动回复微信群管理机器人开发实战:自动迎新、答疑、踢人微信客服机器人怎么做?7×24自动应答+转人工方案
© 2025 WechatApi · 企业级微信智能机器人接入平台
官网价格帮助文档博客
苏ICP备2024128799号 · 苏ICP备2023038368号