前言
做过微信运营的人都清楚,手动群发是件既费时又容易出错的事——该发的时候忘了发,一着急又连续发了好几条,反而引来用户投诉甚至微信封号风险。于是,实现一套能"定好时间、自动推送、控制频率"的微信定时群发系统,几乎是所有微信业务开发绕不过去的课题。
本文从实际需求出发,讲清楚定时群发的整体架构、任务调度的核心实现、以及如何在消息节奏上做好频率控制,避免因发送过快、发送量过大触发微信的风控机制。代码示例以 Python 为主,APScheduler 作调度引擎,配合 HTTP 接口完成消息投递,思路可迁移到任意语言栈。
一、定时群发的整体架构
在着手写代码之前,先把架构想清楚,后期才不会改一个地方崩另一个地方。
1.1 核心模块划分
| 模块 | 职责 |
|---|---|
| 任务存储层 | 持久化定时任务(发送对象、内容、触发时间、频率规则) |
| 调度引擎 | 按计划触发任务,支持单次/重复/Cron 三种模式 |
| 消息投递层 | 调用微信 HTTP 接口,真正把消息发出去 |
| 频率控制层 | 在投递前做节流,确保每批消息间隔符合安全阈值 |
| 状态记录层 | 记录每条消息的发送结果,便于重试和审计 |
这五层之间解耦明确:调度引擎只管"什么时候触发",频率控制层只管"能不能发",消息投递层只管"怎么发",三者互不干扰,也方便单独升级。实际部署时,各层可以运行在同一进程里,也可以拆成独立服务通过消息队列通信,取决于业务体量。
1.2 消息类型与发送目标
定时群发通常涉及两种维度:
- 目标维度:单聊(一对一)、群聊(微信群)、好友批量(遍历联系人列表逐个发送)
- 内容维度:纯文字、图片、图文链接、文件、语音
不同组合对应不同的接口调用逻辑,架构上建议把"内容类型"和"目标类型"都抽象成参数,而非硬编码进业务逻辑。这样后续新增内容类型(比如小程序卡片、视频号),只需要扩展投递层,不用改调度逻辑。
二、定时任务调度实现
Python 生态里,APScheduler(Advanced Python Scheduler)是做定时任务的主流选择,支持三种触发器:DateTrigger(单次定时)、IntervalTrigger(固定间隔重复)、CronTrigger(Cron 表达式)。
2.1 安装与初始化
bashpip install apscheduler
pythonfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# 任务持久化到 SQLite,防止重启丢任务
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}
executors = {
'default': ThreadPoolExecutor(max_workers=5) # 并发度按需调整
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
scheduler.start()
使用 BackgroundScheduler 而非 BlockingScheduler,调度在后台线程运行,主线程可以继续处理其他逻辑(比如接收回调、提供 Web API)。任务存进 SQLite,重启后依然有效。
2.2 添加三种触发模式的任务
pythonfrom datetime import datetime
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
def add_once_task(run_time: datetime, wxids: list, content: str):
"""单次定时发送"""
scheduler.add_job(
func=send_batch,
trigger=DateTrigger(run_date=run_time),
args=[wxids, content],
id=f"once_{int(run_time.timestamp())}",
replace_existing=True
)
def add_interval_task(hours: int, wxids: list, content: str, task_id: str):
"""固定间隔重复发送,例如每 24 小时一次"""
scheduler.add_job(
func=send_batch,
trigger=IntervalTrigger(hours=hours),
args=[wxids, content],
id=task_id,
replace_existing=True
)
def add_cron_task(cron_expr: str, wxids: list, content: str, task_id: str):
"""Cron 表达式,例如每周一早上 9:00"""
# cron_expr 示例: "0 9 * * 1"
parts = cron_expr.split()
scheduler.add_job(
func=send_batch,
trigger=CronTrigger(
minute=parts[0], hour=parts[1],
day=parts[2], month=parts[3], day_of_week=parts[4]
),
args=[wxids, content],
id=task_id,
replace_existing=True
)
2.3 任务管理接口
pythondef pause_task(task_id: str):
scheduler.pause_job(task_id)
def resume_task(task_id: str):
scheduler.resume_job(task_id)
def remove_task(task_id: str):
scheduler.remove_job(task_id)
def list_tasks():
jobs = scheduler.get_jobs()
return [
{
"id": job.id,
"next_run": str(job.next_run_time),
"func": job.func.__name__
}
for job in jobs
]
三、消息投递层实现
任务被触发后,核心动作是调用微信 HTTP 接口把消息发出去。这里展示文字消息和图片消息两种最常用的类型。
pythonimport requests
BASE = "https://你的接口域名" # 注册后在官方文档获取
TOKEN = "你的Token"
APPID = "你的appId"
HEADERS = {"token": TOKEN} # 鉴权字段名以官方文档为准
def send_text(to_wxid: str, content: str) -> bool:
"""发送文字消息到单个 wxid"""
url = f"{BASE}/message/postText"
body = {
"appId": APPID,
"toWxid": to_wxid,
"content": content
}
try:
resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
result = resp.json()
return result.get("ret") == 200
except Exception as e:
print(f"[发送失败] {to_wxid}: {e}")
return False
def send_image_by_url(to_wxid: str, img_url: str) -> bool:
"""发送图片消息"""
url = f"{BASE}/message/postImage"
body = {
"appId": APPID,
"toWxid": to_wxid,
"imgUrl": img_url
}
try:
resp = requests.post(url, json=body, headers=HEADERS, timeout=15)
result = resp.json()
return result.get("ret") == 200
except Exception as e:
print(f"[图片发送失败] {to_wxid}: {e}")
return False
代码为示例,具体接口路径、请求字段以官方文档为准。
四、频率控制:定时群发最关键的一环
发送速度是防封的核心变量。微信对高频、批量、机械化的消息发送极为敏感,一旦触发风控轻则消息被屏蔽,重则账号被封。
4.1 频率控制原则
根据实际经验,批量发送时建议遵守以下节奏:
| 场景 | 推荐间隔 | 单日上限 |
|---|---|---|
| 群发好友文字 | 每条间隔 3~8 秒(随机) | ≤ 200 人/天 |
| 群发好友图片 | 每条间隔 5~12 秒(随机) | ≤ 100 人/天 |
| 发送群消息 | 每条间隔 5~15 秒(随机) | 视群数量,单群≤20条/天 |
| 新账号(在线<7天) | 间隔翻倍,总量减半 | 更保守 |
随机间隔比固定间隔更安全——真实用户的操作节奏从来不是严格等时的,机器人如果每隔整整 5 秒发一条,反而更像机器。此外,发送时段也很关键:凌晨集中投递大量消息,被风控识别的概率远高于白天分散发送,建议把群发任务集中在上午 9:00 到晚上 21:00 之间,夜间时段不安排高频任务。
4.2 带随机间隔的批量发送函数
pythonimport time
import random
from typing import List, Tuple
def send_batch(wxids: List[str], content: str,
min_gap: float = 3.0, max_gap: float = 8.0) -> dict:
"""
批量发送,自带随机间隔控频
:param wxids: 发送目标的 wxid 列表
:param content: 消息内容
:param min_gap: 最小间隔秒数
:param max_gap: 最大间隔秒数
:return: 发送统计结果
"""
success_count = 0
fail_list: List[str] = []
for i, wxid in enumerate(wxids):
ok = send_text(wxid, content)
if ok:
success_count += 1
else:
fail_list.append(wxid)
# 最后一条不用等待
if i < len(wxids) - 1:
gap = random.uniform(min_gap, max_gap)
time.sleep(gap)
return {
"total": len(wxids),
"success": success_count,
"fail": fail_list
}
4.3 令牌桶限流(更精细的控制)
如果系统里同时存在多个定时任务,单纯靠 sleep 可能造成多个任务同时触发、瞬间并发打爆发送频率。这时可以引入令牌桶做全局限流:
pythonimport threading
import time
class TokenBucket:
"""令牌桶限流器:控制每秒最多发送 N 条消息"""
def __init__(self, rate: float = 0.3):
"""
:param rate: 每秒生成令牌数,0.3 即每 ~3.3 秒一个令牌
"""
self.rate = rate
self.tokens = 0.0
self.last_time = time.monotonic()
self.lock = threading.Lock()
def acquire(self) -> float:
"""获取一个令牌,返回需要等待的秒数"""
with self.lock:
now = time.monotonic()
elapsed = now - self.last_time
self.tokens = min(1.0, self.tokens + elapsed * self.rate)
self.last_time = now
if self.tokens >= 1.0:
self.tokens -= 1.0
return 0.0
else:
wait_time = (1.0 - self.tokens) / self.rate
self.tokens = 0.0
return wait_time
# 全局令牌桶,限速 0.3 条/秒(约每 3.3 秒一条)
_bucket = TokenBucket(rate=0.3)
def safe_send_text(to_wxid: str, content: str) -> bool:
"""经过令牌桶限流的发送函数"""
wait = _bucket.acquire()
if wait > 0:
time.sleep(wait)
return send_text(to_wxid, content)
令牌桶的优势是即使有多个线程同时想发消息,也会被序列化到安全的速率之内,不会因任务并发而突刺。
五、接入 HTTP 接口的完整示例
把上面所有模块串起来,展示一个"每天早上 9:00 给指定群发通知"的完整用例。
需要指出的是,目前让微信程序化收发消息,通常依赖能将微信协议封装成 REST API 的工具。WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,HTTP 调用即可,详情参考官方文档。拿到接口后,业务层只需要关心"发什么、发给谁、什么时候发",不用关心协议细节。
python# ── 配置区(以官方文档提供的域名和字段为准)──────────────────
BASE = "https://你的接口域名"
TOKEN = "你的Token"
APPID = "你的appId"
HEADERS = {"token": TOKEN}
# ── 目标群列表 ────────────────────────────────────────────────
TARGET_ROOMS = [
"xxxxxxxx@chatroom", # 群 wxid,从联系人列表接口获取
"yyyyyyyy@chatroom",
]
DAILY_CONTENT = "大家早上好!今日日报已更新,请查阅。"
# ── 群发函数(群消息不需要 ats 字段时可省略)──────────────────
def send_daily_notice():
results = []
for room_id in TARGET_ROOMS:
url = f"{BASE}/message/postText"
body = {"appId": APPID, "toWxid": room_id, "content": DAILY_CONTENT}
try:
r = requests.post(url, json=body, headers=HEADERS, timeout=10)
data = r.json()
results.append({"room": room_id, "ok": data.get("ret") == 200})
except Exception as e:
results.append({"room": room_id, "ok": False, "err": str(e)})
# 群之间也要间隔,防止瞬间批量
time.sleep(random.uniform(5, 12))
return results
# ── 注册定时任务:每天 09:00 执行 ────────────────────────────
add_cron_task(
cron_expr="0 9 * * *",
wxids=TARGET_ROOMS, # 此处示意,实际 send_daily_notice 自己持有目标
content=DAILY_CONTENT,
task_id="daily_morning_notice"
)
# 注意:实际使用时 func 应直接传 send_daily_notice,args 可为空
# scheduler.add_job(send_daily_notice, CronTrigger(hour=9, minute=0), id="daily_morning_notice")
六、任务状态持久化与重试
定时任务在生产环境一定要记录每次的执行结果,便于排查问题和补发失败消息。发送日志至少应包含:任务 ID、目标对象、消息内容摘要、发送状态、失败原因、发送时间。有了这些信息,运营人员才能在后台一眼看出哪条消息漏发,而不是靠肉眼比对用户反馈。
pythonimport sqlite3
from datetime import datetime
def init_log_db():
conn = sqlite3.connect("send_log.db")
conn.execute("""
CREATE TABLE IF NOT EXISTS send_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
wxid TEXT,
content TEXT,
status TEXT, -- 'success' / 'fail'
err_msg TEXT,
send_at TEXT
)
""")
conn.commit()
conn.close()
def log_send_result(task_id: str, wxid: str, content: str,
status: str, err_msg: str = ""):
conn = sqlite3.connect("send_log.db")
conn.execute(
"INSERT INTO send_log (task_id, wxid, content, status, err_msg, send_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(task_id, wxid, content, status, err_msg,
datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)
conn.commit()
conn.close()
def retry_failed(task_id: str):
"""查出失败记录,重新发送"""
conn = sqlite3.connect("send_log.db")
cursor = conn.execute(
"SELECT wxid, content FROM send_log WHERE task_id=? AND status='fail'",
(task_id,)
)
rows = cursor.fetchall()
conn.close()
for wxid, content in rows:
ok = send_text(wxid, content)
new_status = "success" if ok else "fail"
log_send_result(task_id + "_retry", wxid, content, new_status)
time.sleep(random.uniform(5, 10))
有了发送日志,就可以做"失败自动重试"或"手动补发",大幅提升群发的到达率。重试时同样要加间隔,不能因为是补发就忽视频率控制,否则集中重试反而更容易触发风控。
七、常见问题排查
| 现象 | 排查方向 |
|---|---|
| 消息发出但对方收不到 | 检查目标 wxid 是否正确;检查账号是否被对方屏蔽 |
| 接口返回 ret != 200 | 查看 msg 字段,可能是频率超限或账号风控 |
| 消息发送后账号异常 | 降低发送频率,检查单日总量是否超标 |
| 定时任务不触发 | 确认 scheduler 已 start;确认服务器时区设置正确 |
| 重启后任务丢失 | 确认使用了 SQLAlchemy JobStore 而非内存 JobStore |
| 群消息发不出去 | 确认 toWxid 是群的 chatroom id(后缀 @chatroom) |
服务器时区是一个容易被忽略的坑:部署在海外或使用 UTC 时区的服务器上,Cron 触发时间会与预期相差若干小时。建议在初始化调度器时显式指定时区:BackgroundScheduler(timezone='Asia/Shanghai'),避免节假日营销任务在凌晨才推出去。
总结
定时群发的难点不在于"定时"本身,而在于如何在自动化效率与微信风控之间找到平衡点:APScheduler 解决了任务调度的问题,随机间隔和令牌桶解决了频率控制的问题,SQLite 日志和重试机制解决了可靠性的问题。把这三个维度都做扎实,才算是真正能在生产跑得住的微信定时群发系统。实际落地时还需结合账号的实际状态(号龄、活跃度、历史风险记录)动态调整发送策略,切忌照搬数字,以上参数仅供参考,具体边界以官方文档为准。
