前言
很多人都有这样的需求:微信消息量太大,靠人工一条条回复既耗时又容易遗漏。无论是客服场景、私域运营,还是个人助理,如果能让微信自动识别消息内容并做出响应,效率会大幅提升。
本文从零开始,用 Python 搭建一套完整的微信自动回复机器人,涵盖环境搭建、消息接收、关键词匹配、自动发送全流程。不依赖任何桌面注入工具,全程通过 HTTP 接口驱动,部署后可在服务器后台长期稳定运行。整篇文章所有代码均可直接运行,读完即可动手复现。
一、整体架构与思路
在正式写代码之前,先梳理清楚这套机器人的工作原理,避免走弯路。
1.1 核心流程
微信消息 → 平台回调(HTTP POST)→ 你的服务器 → 解析/判断 → 调用发消息接口 → 回复给用户
整个流程分三段:
- 消息接收:微信收到消息后,平台会把消息内容以 HTTP POST 的形式推送到你预先设置好的回调地址。你需要一台有公网 IP 的服务器来接收这个推送。
- 逻辑处理:你的服务收到回调后,解析消息内容,根据关键词或规则决定如何回复。
- 消息发送:调用平台提供的发消息接口,把回复内容推送给对方。
1.2 技术选型
| 模块 | 选型 | 理由 |
|---|---|---|
| Web 框架 | Flask | 轻量,适合回调服务 |
| HTTP 客户端 | requests | 调用发消息接口 |
| 规则匹配 | 正则 + 字典 | 灵活、易扩展 |
| 部署 | 公网 VPS | 回调地址必须公网可达 |
Python 版本建议 3.8+。需要特别说明的是,这套方案的核心是"接收回调 + 调用发消息接口"两个环节,两者是完全解耦的——即使发消息接口调用失败,回调服务依然正常运行,不会相互影响。
二、环境准备
2.1 安装依赖
bashpip install flask requests
2.2 项目目录结构
wechat-bot/
├── app.py # 主服务,接收回调 + 调用发消息接口
├── rules.py # 自动回复规则配置
├── sender.py # 封装发消息逻辑
└── config.py # 配置(域名、Token、appId)
目录结构刻意保持扁平,避免过早引入复杂的包结构。实际项目扩大后可以按需拆分,但对于入门阶段,四个文件已经足够清晰。
2.3 配置文件 config.py
python# config.py
# 以下均为占位符,注册后请以官方文档为准填写真实值
BASE = "https://你的接口域名" # 注册后在官方文档获取
TOKEN = "你的Token"
APPID = "你的appId" # 扫码登录后在后台获取
HEADERS = {"token": TOKEN} # 鉴权字段名以官方文档为准
代码为示例,具体接口地址、字段名以官方文档为准。
关于 appId 和 Token 的获取方式:appId 是每个登录设备的唯一标识,扫码登录后由平台分配,不同账号对应不同 appId。Token 是调用接口的鉴权凭证,通常在注册账户后从控制台获取,部分平台支持定期轮换以提升安全性。两者都不应硬编码在代码里提交到公开仓库,建议通过环境变量注入,例如:
pythonimport os
TOKEN = os.environ.get("WECHAT_TOKEN", "")
APPID = os.environ.get("WECHAT_APPID", "")
三、消息接收:搭建回调服务
平台会把微信收到的消息以 POST 请求推送到你设置的回调地址。下面用 Flask 写一个极简的接收服务。
3.1 基础回调接收
python# app.py
from flask import Flask, request, jsonify
from sender import send_text
from rules import get_reply
app = Flask(__name__)
@app.route("/callback", methods=["POST"])
def callback():
data = request.get_json(silent=True) or {}
# 基本字段(字段名以官方文档为准)
app_id = data.get("appId", "")
from_wxid = data.get("fromWxid", "")
msg_type = data.get("type", 0)
content = data.get("content", "")
# 只处理文本消息(type 值以文档为准,此处以 1 为例)
if msg_type == 1 and from_wxid:
reply = get_reply(content)
if reply:
send_text(app_id, from_wxid, reply)
# 必须返回 200,否则平台会重试
return jsonify({"code": 200})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
3.2 回调服务的关键注意事项
这里有几个细节容易被忽略,逐一说清楚:
必须快速返回 200:回调函数要在几秒内给平台返回 HTTP 200,否则平台会判断回调失败并重试。如果你的处理逻辑比较耗时(比如查数据库、调 AI 接口),不要在回调函数里同步等待结果,应该把任务丢进消息队列(如 Redis Queue、Celery),异步处理完再发回复。
自己主动发出的消息不会触发回调:只有对方发给你的消息才会推回调,机器人自动发出的消息不会再次触发,不用担心死循环。
回调地址必须公网可达:本地 127.0.0.1 不行。本地开发时可以用 ngrok 做内网穿透,生产环境必须部署到有公网 IP 的服务器,并确保防火墙开放对应端口。
消息类型字段要对照文档确认:不同平台对消息类型的编码不同,文本、图片、语音、视频、位置等各有对应的 type 值,代码里的 msg_type == 1 只是示例,实际需查阅平台文档确认。
3.3 设置回调地址
服务启动后,调用平台的 setCallback 接口把你的回调地址注册上去:
pythonimport requests
from config import BASE, APPID, HEADERS
def register_callback(callback_url: str):
url = f"{BASE}/setCallback"
payload = {
"appId": APPID,
"callbackUrl": callback_url
}
resp = requests.post(url, json=payload, headers=HEADERS)
print(resp.json())
# 示例
register_callback("http://你的公网IP:8080/callback")
成功后平台返回 {"ret": 200, "msg": "操作成功"},后续每条消息都会推到这个地址。回调地址只需注册一次,除非你换了服务器或端口。
四、自动回复规则引擎
规则引擎决定机器人"够不够聪明"。本节先实现关键词匹配,再扩展到正则和默认回复。
4.1 规则配置 rules.py
python# rules.py
import re
# 精确关键词 → 回复内容
EXACT_RULES = {
"你好": "你好!有什么可以帮你的?",
"Hello": "Hi~ 有什么问题请直接说。",
"价格": "请移步官网查看最新定价,或直接说明需求,我来为你解答。",
"帮助": "常用指令:\n- 发送「价格」了解收费\n- 发送「教程」获取使用文档\n- 发送「人工」转接真人客服",
"人工": "正在为您转接,请稍候…",
}
# 正则关键词 → 回复内容(按顺序匹配,先匹配先返回)
REGEX_RULES = [
(r"订单[号码编]?\s*(\d+)", "已收到您的订单查询,订单号 {match} 正在处理,请耐心等候。"),
(r"退款|退货|退钱", "退款申请请提供订单号和原因,我们会在 1 个工作日内处理。"),
(r"发货|物流|快递", "请提供订单号,我来帮您查询物流状态。"),
]
# 兜底回复
DEFAULT_REPLY = "感谢您的消息,我们会尽快回复。如需紧急处理请拨打客服电话。"
def get_reply(content: str) -> str:
"""根据消息内容返回自动回复,无匹配时返回默认回复"""
content = content.strip()
# 1. 精确匹配(不区分大小写)
for keyword, reply in EXACT_RULES.items():
if keyword.lower() in content.lower():
return reply
# 2. 正则匹配
for pattern, reply_tpl in REGEX_RULES:
m = re.search(pattern, content)
if m:
return reply_tpl.format(match=m.group(0))
# 3. 默认回复
return DEFAULT_REPLY
4.2 规则设计的几个实操要点
精确匹配顺序有讲究:Python 字典在 3.7+ 保持插入顺序,所以 EXACT_RULES 里靠前的关键词优先级更高。如果"你好"和"你好啊"都在规则里,要把更具体的放在前面,避免被宽泛规则截断。
正则里的捕获组用于动态回复:上面 订单[号码编]?\s*(\d+) 中的 (\d+) 会捕获订单号,然后通过 .format(match=...) 填入回复模板,实现"带上下文"的回复效果。如果回复模板里不需要引用匹配内容,直接写固定字符串即可,不需要捕获组。
兜底回复要谨慎:DEFAULT_REPLY 在所有规则都不匹配时触发。如果机器人面向的是陌生人群体,兜底回复要写得自然,避免让人一眼看出是机器人。也可以把兜底设为 None,然后在 app.py 里判断 if reply is None: return,对不认识的消息完全不响应。
规则热更新:如果希望不重启服务就能更新规则,可以把 EXACT_RULES 和 REGEX_RULES 存到 Redis 或数据库,每次 get_reply 被调用时从缓存读取,设置合理的缓存过期时间(比如 60 秒)。
这套规则引擎足够应对大多数场景。后期可以把规则存到数据库,或者接入大模型做语义理解。
五、消息发送模块
5.1 封装发送函数 sender.py
python# sender.py
import requests
from config import BASE, APPID, HEADERS
def send_text(app_id: str, to_wxid: str, content: str) -> dict:
"""
发送文本消息
接口路径以官方文档为准,此处仅为示例
"""
url = f"{BASE}/message/postText"
payload = {
"appId": app_id or APPID,
"toWxid": to_wxid,
"content": content,
}
try:
resp = requests.post(url, json=payload, headers=HEADERS, timeout=10)
result = resp.json()
if result.get("ret") != 200:
print(f"[发送失败] {result}")
return result
except Exception as e:
print(f"[网络异常] {e}")
return {}
def send_image(app_id: str, to_wxid: str, img_url: str) -> dict:
"""
发送图片消息
"""
url = f"{BASE}/message/postImage"
payload = {
"appId": app_id or APPID,
"toWxid": to_wxid,
"imgUrl": img_url,
}
try:
resp = requests.post(url, json=payload, headers=HEADERS, timeout=10)
return resp.json()
except Exception as e:
print(f"[网络异常] {e}")
return {}
代码为示例,具体接口路径、请求字段请以官方文档为准。
5.2 发送接口的参数细节
toWxid 是消息目标的微信 ID,来源于回调数据的 fromWxid 字段(私聊)或群 ID(群消息),格式不同,不能混用。content 字段支持纯文本,包括换行符 \n;如果需要在文本里 @ 群成员,通常有独立的 ats 参数(值为被 @ 成员的 wxid 列表),具体以文档为准。
timeout=10 是个重要的保险设置。如果接口服务器响应慢,没有超时限制会导致回调函数一直挂起,最终把 Flask 的工作线程耗尽,整个服务无响应。10 秒超时在大多数网络环境下已足够宽裕。
六、进阶:群消息处理与频率控制
6.1 区分私聊与群聊
群消息的 toWxid 通常以特定字符串结尾(具体格式以文档为准)。可以简单判断:
pythondef is_group(wxid: str) -> bool:
"""判断是否为群消息,规则以文档为准"""
return wxid.endswith("@chatroom")
@app.route("/callback", methods=["POST"])
def callback():
data = request.get_json(silent=True) or {}
from_wxid = data.get("fromWxid", "")
to_wxid = data.get("toWxid", "")
msg_type = data.get("type", 0)
content = data.get("content", "")
if msg_type != 1:
return jsonify({"code": 200})
# 群消息:只有 @机器人 时才回复
if is_group(to_wxid):
app_id = data.get("appId", "")
# 判断是否被 @ 的逻辑以实际字段为准
if "被@" in content or content.startswith("@机器人"):
reply = get_reply(content)
if reply:
send_text(app_id, to_wxid, reply)
else:
# 私聊直接回复
app_id = data.get("appId", "")
reply = get_reply(content)
if reply:
send_text(app_id, from_wxid, reply)
return jsonify({"code": 200})
群聊场景下,建议只在被 @ 时才触发自动回复,而不是对群里所有消息都响应。原因有两点:一是群消息量大,全量响应容易被认为是刷屏,触发群成员举报;二是无关消息也会进入规则引擎,误匹配率高,回复质量差。
6.2 频率控制,避免触发风控
批量或高频发消息是微信账号被限制的主要原因之一。建议在发送函数里加简单的速率限制:
pythonimport time
import threading
_lock = threading.Lock()
_last_send_time = 0
MIN_INTERVAL = 1.5 # 两次发消息最短间隔(秒),可按需调整
def send_text_safe(app_id: str, to_wxid: str, content: str) -> dict:
global _last_send_time
with _lock:
now = time.time()
gap = now - _last_send_time
if gap < MIN_INTERVAL:
time.sleep(MIN_INTERVAL - gap)
_last_send_time = time.time()
return send_text(app_id, to_wxid, content)
除速率限制外,还有几条实践建议值得关注:
被动触发优先:自动回复应该以收到消息为触发条件,尽量不要写主动定时推送逻辑。主动批量推送是风控重点打击的行为,被动响应风险则低得多。
去重缓存:短时间内同一个 wxid 发来相同内容时,不必每次都回复。可以用一个简单的字典记录"最近一次回复时间",60 秒内收到重复消息不重复作答,既节省接口调用次数,又显得不那么机械。
新账号过渡期:新登录的账号建议在线观察 3 天,让账号有正常的人工操作记录,再逐步开启自动化功能,能有效降低触发风控的概率。
七、完整启动流程
把上面所有文件准备好之后,按以下步骤启动:
bash# 第一步:启动回调服务
python app.py
# 服务监听在 0.0.0.0:8080
# 第二步:注册回调地址(只需执行一次)
python -c "
from config import BASE, APPID, HEADERS
import requests
r = requests.post(
f'{BASE}/setCallback',
json={'appId': APPID, 'callbackUrl': 'http://你的公网IP:8080/callback'},
headers=HEADERS
)
print(r.json())
"
# 第三步:确认微信在线
python -c "
from config import BASE, APPID, HEADERS
import requests
r = requests.post(f'{BASE}/checkOnline', json={'appId': APPID}, headers=HEADERS)
print(r.json())
"
三步全部返回 ret: 200 后,给微信账号发一条「你好」,机器人应该会自动回复。
生产环境建议用进程守护工具(如 supervisor 或 systemd)管理 Flask 进程,确保服务器重启后自动拉起,不需要手动介入。同时建议把 Flask 前面加一层 nginx 做反向代理,处理 HTTPS 证书和请求头转发,让回调地址看起来更正规,部分平台也要求回调地址必须是 HTTPS。
如果不想自己维护接口服务器,WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,HTTP 调用即可,上面的 Flask 回调服务和规则引擎部分保持不变。
八、常见问题排查
| 现象 | 可能原因 | 排查方法 |
|---|---|---|
| 收不到回调 | 回调地址不可达 / 微信掉线 | curl 自测回调接口;checkOnline 确认在线 |
| 回调收到但不回复 | 消息 type 判断错误 | 打印原始 data,对照文档确认 type 值 |
| 发消息返回非 200 | Token 过期 / appId 错误 | 检查 config.py;重新获取 Token |
| 发消息成功但对方收不到 | to_wxid 格式错误 | 确认 wxid 来源,群 wxid 和个人 wxid 格式不同 |
| 运行一段时间后停止回复 | 微信账号掉线 | 加定时任务轮询 checkOnline,掉线告警 |
| 回调被重复推送 | 处理时间超时导致平台重试 | 加消息 ID 去重;把耗时操作移到异步队列 |
| 群消息全部被回复 | 未做群聊 @ 判断 | 参考第六节,只在被 @ 时响应群消息 |
关于回调重复推送值得多说一句。如果你的处理逻辑偶尔耗时超过平台等待上限,平台会重发同一条消息,导致机器人回复了两次。解决办法是在收到回调时先用 msgId 做去重检查,已处理过的 msgId 直接返回 200 跳过,不重复处理。可以用 Redis 的 SET NX EX 命令实现分布式去重,单机场景用内存字典配合 TTL 清理即可。
总结
至此,一套基于 Python + Flask + HTTP 回调的微信自动回复机器人已经完整实现。核心思路是:注册回调地址接收消息,在本地做关键词和正则匹配,再调用发消息接口返回结果。代码结构分层清晰,规则、发送、配置各自独立,后续扩展接入数据库或大模型都不需要改动核心流程。实际落地时最容易踩的坑集中在两处:回调地址的公网可达性,以及发消息的频率控制——把这两点处理好,机器人就能长期稳定运行。
