首页 / 博客 / API·多语言·接口

Python定时任务调度微信群发(APScheduler)

分类:API·多语言·接口 · 标签:Python定时群发、APScheduler微信、微信群发自动化

前言

做私域运营的团队都遇到过同一个痛点:每天要在固定时间向多个微信群推送早报、活动通知、促销信息,全靠人工操作既耗时又容易遗漏。Python 的 APScheduler 库提供了成熟的定时调度能力,结合支持 个人微信API 的 WechatApi 服务,可以把这套重复劳动完全自动化,让微信群发变得像写一个 cron 任务一样简单。

APScheduler 基础与选型

APScheduler(Advanced Python Scheduler)是 Python 生态中使用最广泛的任务调度库之一,支持四种调度方式:

调度方式触发器典型场景
dateDateTrigger只执行一次的延迟任务
intervalIntervalTrigger每隔 N 秒/分/时重复执行
cronCronTrigger类 Unix cron 表达式,精确到秒
calendarintervalCalendarIntervalTrigger按日历周期(天/周/月/年)

针对微信群发场景,最常用的是 CronTriggerIntervalTrigger

APScheduler 支持三种执行器(executor):

群发 HTTP 请求属于 I/O 等待,直接用默认线程池即可,无需引入异步复杂度。

安装依赖:

bashpip install apscheduler requests

WechatApi 接口鉴权与调用范式

WechatApi 基于 iPad 协议实现个人微信的 HTTP API 能力,不依赖网页或 PC 客户端,稳定性更高,支持发送文字、图片、文件、小程序卡片等多种消息类型。详见 微信iPad协议 说明页。

鉴权方式:所有请求需在 HTTP Header 中携带 VideosApi-token,值为控制台分配的 API Token。

业务参数约定

以发送群消息为例,接口调用范式如下:

pythonimport requests

BASE_URL = "https://your-api-endpoint.wechatapi.net"
TOKEN = "your_videos_api_token"   # 控制台获取,勿硬编码到代码中
APP_ID = "your_device_app_id"     # 设备ID

def send_group_text(room_id: str, content: str) -> dict:
    """向指定微信群发送文字消息"""
    url = f"{BASE_URL}/api/message/sendText"
    headers = {
        "VideosApi-token": TOKEN,
        "Content-Type": "application/json"
    }
    payload = {
        "appId": APP_ID,
        "toWxid": room_id,       # 微信群的 roomId,以 @chatroom 结尾
        "content": content
    }
    resp = requests.post(url, json=payload, headers=headers, timeout=10)
    resp.raise_for_status()
    return resp.json()

返回体示例:

json{
    "ret": 200,
    "msg": "success",
    "data": {
        "msgId": "1234567890",
        "clientMsgId": "abc123"
    }
}

ret 不等于 200 时,应记录日志并按策略重试,不能直接 pass 掉错误。

构建定时群发调度器

下面给出一个完整的可用结构,将消息内容、目标群列表、定时规则三者解耦,方便维护和扩展。

消息内容管理

不要把消息文案硬编码到调度函数里。推荐将内容放在独立配置文件(JSON/YAML)或数据库中,调度任务执行时动态加载,这样运营人员可以随时修改文案而不用动代码。

pythonimport json
import os
from datetime import datetime

def load_message_template(template_name: str) -> str:
    """从模板文件加载消息内容,支持日期变量替换"""
    template_dir = "./templates"
    filepath = os.path.join(template_dir, f"{template_name}.txt")
    with open(filepath, encoding="utf-8") as f:
        content = f.read()
    # 替换日期占位符
    today = datetime.now().strftime("%Y年%m月%d日")
    content = content.replace("{{date}}", today)
    return content


def get_target_groups() -> list[str]:
    """从配置文件获取需要群发的群 roomId 列表"""
    with open("./config/groups.json", encoding="utf-8") as f:
        config = json.load(f)
    return config.get("morning_report_groups", [])

定时任务注册

pythonfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import logging
import time

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

scheduler = BlockingScheduler(timezone="Asia/Shanghai")


def job_morning_report():
    """早报群发任务"""
    content = load_message_template("morning_report")
    groups = get_target_groups()
    success, fail = 0, 0
    for room_id in groups:
        try:
            result = send_group_text(room_id, content)
            if result.get("ret") == 200:
                success += 1
                logger.info(f"发送成功: {room_id}")
            else:
                fail += 1
                logger.warning(f"发送失败: {room_id}, 原因: {result.get('msg')}")
        except Exception as e:
            fail += 1
            logger.error(f"异常: {room_id} - {e}")
        time.sleep(1)   # 批量发送时加间隔,避免频率过高

    logger.info(f"早报群发完成,成功 {success} 群,失败 {fail} 群")


# 每天 08:30 执行早报群发
scheduler.add_job(
    func=job_morning_report,
    trigger=CronTrigger(hour=8, minute=30, second=0),
    id="morning_report",
    name="早报群发",
    misfire_grace_time=300,   # 允许延迟执行的宽限期(秒)
    coalesce=True             # 若积压多次触发,只执行一次
)

if __name__ == "__main__":
    logger.info("调度器启动")
    scheduler.start()

misfire_grace_timecoalesce 这两个参数对生产部署非常重要:

多时段、多群组的复杂调度

实际私域运营中往往需要多种时段的群发任务并存,比如:早报、午间活动提醒、晚间复盘。可以通过循环批量注册任务:

pythonSCHEDULE_CONFIG = [
    {
        "id": "morning_report",
        "name": "早报群发",
        "template": "morning_report",
        "group_key": "morning_groups",
        "cron": {"hour": 8, "minute": 30}
    },
    {
        "id": "noon_promo",
        "name": "午间促销",
        "template": "noon_promo",
        "group_key": "promo_groups",
        "cron": {"hour": 12, "minute": 0}
    },
    {
        "id": "evening_summary",
        "name": "晚间复盘",
        "template": "evening_summary",
        "group_key": "all_groups",
        "cron": {"hour": 20, "minute": 30}
    }
]

for item in SCHEDULE_CONFIG:
    def make_job(cfg):
        def job():
            content = load_message_template(cfg["template"])
            groups = get_groups_by_key(cfg["group_key"])
            batch_send(groups, content)
        job.__name__ = cfg["id"]
        return job

    scheduler.add_job(
        func=make_job(item),
        trigger=CronTrigger(**item["cron"]),
        id=item["id"],
        name=item["name"],
        misfire_grace_time=300,
        coalesce=True
    )

注意这里用 make_job(cfg) 闭包包一层,避免 Python 闭包的经典陷阱——直接在 lambda 或内嵌函数里引用循环变量 item,在循环结束后所有任务都会用最后一个值。

如果群组数量很多,单线程串行发送会占用较长时间。可以将 job_morning_report 内部改为线程池并发:

pythonfrom concurrent.futures import ThreadPoolExecutor, as_completed

def batch_send(groups: list[str], content: str):
    with ThreadPoolExecutor(max_workers=5) as pool:
        futures = {pool.submit(send_group_text, gid, content): gid for gid in groups}
        for future in as_completed(futures):
            gid = futures[future]
            try:
                res = future.result()
                if res.get("ret") != 200:
                    logger.warning(f"群 {gid} 发送失败: {res.get('msg')}")
            except Exception as e:
                logger.error(f"群 {gid} 异常: {e}")

并发数 max_workers 建议不超过 5,避免短时间内产生太多并发请求触发接口限流。WechatApi 的接口文档中会标注各接口的 QPS 限制,调度前务必确认。

异常处理与重试机制

生产级群发系统必须有健壮的异常处理,至少覆盖以下几类错误:

  1. 网络超时requests.exceptions.Timeout,设置合理的 timeout 参数(建议 10-15 秒)
  2. 接口返回非 200:业务失败,记录并告警,不要静默吞掉
  3. Token 失效(ret=401):需要立即停止群发并通知运维,不能继续重试
  4. 设备掉线(ret=4xx):微信账号可能被踢出或需要重新登录,需要人工介入

建议引入指数退避重试:

pythonimport time
from functools import wraps

def retry_on_failure(max_retries=3, backoff_base=2):
    """指数退避重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    result = func(*args, **kwargs)
                    if result.get("ret") == 200:
                        return result
                    # 401/设备异常不重试
                    if result.get("ret") in (401, 403):
                        logger.critical(f"鉴权失败或设备异常,停止重试: {result}")
                        raise RuntimeError("不可恢复的接口错误")
                    raise ValueError(f"业务失败: {result}")
                except RuntimeError:
                    raise
                except Exception as e:
                    if attempt < max_retries - 1:
                        wait = backoff_base ** attempt
                        logger.warning(f"第 {attempt+1} 次失败,{wait}s 后重试: {e}")
                        time.sleep(wait)
                    else:
                        logger.error(f"重试耗尽,放弃: {e}")
                        raise
        return wrapper
    return decorator


@retry_on_failure(max_retries=3, backoff_base=2)
def send_group_text_with_retry(room_id: str, content: str) -> dict:
    return send_group_text(room_id, content)

部署与运维注意事项

进程守护

调度器是长驻进程,必须用进程管理工具守护,推荐 systemd(Linux 服务器)或 Supervisor

bash# 用 systemd 注册服务(/etc/systemd/system/wechat-scheduler.service)
[Unit]
Description=WechatApi Group Broadcast Scheduler
After=network.target

[Service]
Type=simple
User=deploy
WorkingDirectory=/opt/wechat-scheduler
ExecStart=/opt/wechat-scheduler/venv/bin/python main.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
bashsystemctl daemon-reload
systemctl enable wechat-scheduler
systemctl start wechat-scheduler
journalctl -u wechat-scheduler -f   # 实时查看日志

Token 安全管理

VideosApi-token 是高权限凭证,不能出现在代码仓库中。正确做法:

时区问题

APScheduler 在创建 BlockingScheduler 时指定 timezone="Asia/Shanghai" 是关键步骤。若服务器时区与预期不一致,定时将出现 8 小时偏移。可通过 date 命令确认服务器时区,或在代码启动时打印 datetime.now()datetime.utcnow() 做核对。

消息频率与账号安全

微信二次开发 场景中,账号安全是首要考量。WechatApi 基于 iPad 协议,相比网页/PC 协议风险更低,但仍建议遵守以下原则:

pythonimport random

def humanize_delay(min_sec=0.5, max_sec=2.0):
    """发送间隔随机化,模拟人工节奏"""
    time.sleep(random.uniform(min_sec, max_sec))

小结

APScheduler + WechatApi 是目前 Python 技术栈下实现微信定时群发最直接的组合方案。核心要点:用 CronTrigger 精确控制触发时间,开启 coalesce 防止任务积压,用指数退避处理瞬时网络抖动,Token 走环境变量管理,进程用 systemd 守护。

WechatApi 提供了完整的 微信API对接 能力,不仅限于文字群发,图片、视频号卡片、小程序消息同样支持,可以在同一个调度框架里扩展更多消息类型,满足从简单通知到完整 微信SCRM 系统的多层次需求。官网:https://wechatapi.net ,开发文档:https://post.wechatapi.net 。

想动手试试?

WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,注册后几分钟跑通。

立即免费注册查看开发文档

相关产品页

🔗 个人微信API(产品页)🔗 微信iPad协议(产品页)🔗 微信二次开发(产品页)

相关文章

微信API接口返回失败/收不到消息?完整排查清单微信 API 怎么对接?Python 发出第一条消息实战Node.js 微信机器人开发教程(发消息 + 收回调)个人微信API能力清单:消息/好友/群/朋友圈接口一览
© 2025 WechatApi · 企业级微信智能机器人接入平台
官网价格帮助文档博客
苏ICP备2024128799号 · 苏ICP备2023038368号