首页 / 博客 / API·多语言·接口

Python异步并发收发微信消息(asyncio实战)

分类:API·多语言·接口 · 标签:Python微信API、微信消息异步、asyncio

前言

在需要同时管理数十乃至数百个微信账号的业务场景下——比如客服系统、营销推送、自动化回复机器人——传统同步阻塞式发送很快会成为性能瓶颈。Python 的 asyncio + aiohttp 组合天然适合这类 I/O 密集型任务:单线程事件循环即可驱动上百路并发请求,不依赖多线程带来的锁竞争。本文以 WechatApi 提供的个人微信 HTTP API 为接口后端,完整演示如何用异步并发方式高效收发消息、控制速率以及处理重试逻辑。


同步 vs 异步:先量化差距

在深入代码之前,先看看两种模式的核心差异,帮助你判断是否值得投入改造成本。

维度同步(requests)异步(asyncio + aiohttp)
并发模型一次一请求,阻塞等待响应事件循环,挂起等待 I/O,切换执行其他任务
线程/进程开销并发需多线程/多进程单线程,无上下文切换开销
适合场景脚本、单账号、低频操作多账号批量推送、实时消息轮询
典型吞吐(100账号)~10-20 msg/s(受线程池限制)~200-500 msg/s(受网络和服务端限速)
编码复杂度中(需理解 async/await、事件循环)
异常隔离单次失败影响同批次每个协程独立,失败可单独重试

结论很清楚:账号数量或消息量一旦上去,异步方案的优势是数量级的。WechatApi 的个人微信接口 支持多 appId 独立鉴权,天然契合这种"多账号并发"的异步架构。


环境准备与项目结构

bashpip install aiohttp asyncio

建议的目录结构:

wechat_async/
├── client.py        # 封装单个 API 调用
├── scheduler.py     # 并发调度与信号量控制
├── retry.py         # 指数退避重试装饰器
└── main.py          # 入口

基础配置常量(不要把 token 硬编码进代码,用环境变量或配置文件):

python# config.py
import os

API_BASE = "https://api.wechatapi.net"   # 示意域名,以实际文档为准
VIDEOS_API_TOKEN = os.environ["VIDEOS_API_TOKEN"]  # VideosApi-token 鉴权头

# 每个账号都有独立 appId,从控制台获取
# https://newmanager.wechatapi.net/dashboard/
APP_IDS = [
    "app_xxxx_01",
    "app_xxxx_02",
    "app_xxxx_03",
    # ... 按需填入
]

# 并发控制
MAX_CONCURRENT = 20   # 同一时刻最多 20 路并发请求
RETRY_TIMES = 3
RETRY_BACKOFF = 1.5   # 指数退避基数(秒)

封装异步 HTTP 客户端

WechatApi 的接口风格是标准的 HTTP POST + JSON 请求体,鉴权通过请求头 VideosApi-token 传递,响应格式统一为:

json{"ret": 200, "msg": "success", "data": {...}}

我们先封装一个带会话复用的异步客户端:

python# client.py
import asyncio
import aiohttp
import logging
from config import API_BASE, VIDEOS_API_TOKEN, RETRY_TIMES, RETRY_BACKOFF

logger = logging.getLogger(__name__)


async def _request_with_retry(
    session: aiohttp.ClientSession,
    endpoint: str,
    payload: dict,
    retries: int = RETRY_TIMES,
) -> dict:
    """
    带指数退避重试的单次 POST 请求。
    endpoint: 相对路径,如 "/message/send"
    """
    url = f"{API_BASE}{endpoint}"
    headers = {
        "VideosApi-token": VIDEOS_API_TOKEN,
        "Content-Type": "application/json",
    }
    last_exc = None
    for attempt in range(retries):
        try:
            async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                result = await resp.json()
                if result.get("ret") == 200:
                    return result
                # 业务层错误(如账号异常、频率超限)不重试,直接返回
                logger.warning("API 业务错误 attempt=%d: %s", attempt, result)
                return result
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            last_exc = e
            wait = RETRY_BACKOFF ** attempt
            logger.warning("请求异常 attempt=%d, 等待 %.1fs 后重试: %s", attempt, wait, e)
            await asyncio.sleep(wait)

    logger.error("重试耗尽,最后异常: %s", last_exc)
    return {"ret": -1, "msg": str(last_exc), "data": {}}


async def send_text_message(
    session: aiohttp.ClientSession,
    app_id: str,
    to_wxid: str,
    content: str,
) -> dict:
    """发送文本消息"""
    payload = {
        "appId": app_id,
        "toWxid": to_wxid,
        "content": content,
    }
    return await _request_with_retry(session, "/message/sendText", payload)


async def get_message_list(
    session: aiohttp.ClientSession,
    app_id: str,
    page: int = 1,
) -> dict:
    """拉取消息列表(轮询收消息)"""
    payload = {
        "appId": app_id,
        "page": page,
    }
    return await _request_with_retry(session, "/message/list", payload)

几个设计要点:


信号量限速:保护服务端与自身账号

并发太高容易触发 WechatApi 服务端的频率限制,也可能导致微信账号被风控。asyncio.Semaphore 是控制最大并发数的最简洁方案:

python# scheduler.py
import asyncio
import aiohttp
from typing import List, Tuple
from client import send_text_message
from config import MAX_CONCURRENT

async def batch_send(
    tasks: List[Tuple[str, str, str]],   # [(app_id, to_wxid, content), ...]
) -> List[dict]:
    """
    并发发送批量消息,最大并发受 MAX_CONCURRENT 信号量限制。
    返回每条任务的 API 响应列表,顺序与输入对应。
    """
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    results = [None] * len(tasks)

    async def _worker(idx: int, app_id: str, to_wxid: str, content: str, session: aiohttp.ClientSession):
        async with semaphore:                          # 获取信号量槽位
            result = await send_text_message(session, app_id, to_wxid, content)
            results[idx] = result

    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT + 5)  # 连接池略大于并发数
    async with aiohttp.ClientSession(connector=connector) as session:
        coroutines = [
            _worker(i, app_id, to_wxid, content, session)
            for i, (app_id, to_wxid, content) in enumerate(tasks)
        ]
        await asyncio.gather(*coroutines, return_exceptions=False)

    return results

信号量工作原理asyncio.Semaphore(20) 相当于一个计数锁,同一时刻最多允许 20 个协程进入 async with semaphore 块,其余的在事件循环中挂起等待,不会阻塞线程。这比线程池的 ThreadPoolExecutor(max_workers=20) 内存占用低一个数量级。


异常重试与超时处理

网络环境不稳定时,一次请求失败不代表整个任务失败。优秀的异步程序必须区分"可重试异常"和"不可重试异常",并配合合理的退避策略,避免在服务端压力大时雪上加霜。

指数退避重试装饰器

在前面的 _request_with_retry 中已经内嵌了重试逻辑,但对于更通用的场景,可以把它封装成装饰器,让任意异步函数都能获得重试能力:

python# retry.py
import asyncio
import functools
import logging
from typing import Tuple, Type

logger = logging.getLogger(__name__)


def async_retry(
    retries: int = 3,
    backoff: float = 1.5,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
):
    """
    异步函数指数退避重试装饰器。
    :param retries:    最大重试次数(不含首次调用)
    :param backoff:    退避基数,第 n 次重试等待 backoff^n 秒
    :param exceptions: 触发重试的异常类型元组
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(retries + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt == retries:
                        break
                    wait = backoff ** attempt
                    logger.warning(
                        "[retry] %s attempt=%d/%d, 等待 %.2fs: %s",
                        func.__name__, attempt + 1, retries, wait, e,
                    )
                    await asyncio.sleep(wait)
            logger.error("[retry] %s 重试耗尽: %s", func.__name__, last_exc)
            raise last_exc
        return wrapper
    return decorator

使用示例——对 WechatApi 发送接口加装饰器:

pythonimport aiohttp
import asyncio
from retry import async_retry
from config import API_BASE, VIDEOS_API_TOKEN


@async_retry(retries=4, backoff=2.0, exceptions=(aiohttp.ClientError, asyncio.TimeoutError))
async def safe_send_message(session: aiohttp.ClientSession, app_id: str, to_wxid: str, content: str) -> dict:
    """带重试保障的文本消息发送,仅对网络异常重试,业务错误直接返回。"""
    url = f"{API_BASE}/message/sendText"
    headers = {
        "VideosApi-token": VIDEOS_API_TOKEN,
        "Content-Type": "application/json",
    }
    payload = {"appId": app_id, "toWxid": to_wxid, "content": content}
    async with session.post(url, json=payload, headers=headers,
                            timeout=aiohttp.ClientTimeout(total=8)) as resp:
        result = await resp.json()
        # 业务层错误不触发重试,直接返回给上层判断
        return result

超时分级设置

aiohttp 的 ClientTimeout 支持多层超时,细粒度控制比单一 total 更合理:

pythontimeout = aiohttp.ClientTimeout(
    total=15,        # 整个请求(包含重定向)的最大时长
    connect=3,       # TCP 连接建立超时
    sock_connect=3,  # socket 级别连接超时
    sock_read=10,    # 读取响应体超时
)

对 WechatApi 这类接口服务,建议 connect=3(连接快则好,超时说明网络有问题),sock_read=10(服务端处理消息发送有时需要稍长时间)。如果遇到批量发送时偶发 asyncio.TimeoutError,优先检查是否触发了服务端限流,而非盲目调大超时。


实战:批量给好友群发的异步任务编排

一个典型的营销场景是:从数据库取出一批好友 wxid,对每人发送个性化文案,完成后统计成功/失败数量。下面用 asyncio.gather 完整演示这一编排流程。

数据准备与任务构造

python# batch_push.py
import asyncio
import aiohttp
import logging
from client import send_text_message
from config import APP_IDS, MAX_CONCURRENT, VIDEOS_API_TOKEN

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

# 模拟从数据库读出的目标用户列表
# 实际场景替换为 SELECT wxid, nickname FROM contacts
TARGET_USERS = [
    {"wxid": "wxid_aaa001", "nickname": "张三"},
    {"wxid": "wxid_bbb002", "nickname": "李四"},
    {"wxid": "wxid_ccc003", "nickname": "王五"},
    # ... 实际可能有几百上千条
]

PUSH_APP_ID = APP_IDS[0]   # 使用第一个账号执行群发


def build_message(nickname: str) -> str:
    """构造个性化文案,实际可接模板引擎"""
    return (
        f"Hi {nickname},我们新上线了专属优惠活动,"
        f"欢迎访问 https://wechatapi.net 了解更多详情,"
        f"有问题随时回复本条消息~"
    )

带信号量的并发群发

pythonasync def push_one(
    session: aiohttp.ClientSession,
    semaphore: asyncio.Semaphore,
    app_id: str,
    user: dict,
) -> dict:
    """向单个用户发送消息,受信号量限流。"""
    async with semaphore:
        content = build_message(user["nickname"])
        result = await send_text_message(session, app_id, user["wxid"], content)
        status = "OK" if result.get("ret") == 200 else "FAIL"
        logger.info("%s → %s wxid=%s", status, user["nickname"], user["wxid"])
        return {"user": user, "result": result}


async def batch_push_to_friends(users: list, app_id: str, max_concurrent: int = MAX_CONCURRENT):
    """
    批量群发入口:asyncio.gather 并发调度所有任务,
    信号量确保同一时刻不超过 max_concurrent 路请求。
    """
    semaphore = asyncio.Semaphore(max_concurrent)
    connector = aiohttp.TCPConnector(limit=max_concurrent + 5)

    async with aiohttp.ClientSession(connector=connector) as session:
        # 为每个用户创建协程,全部提交给 gather
        coros = [
            push_one(session, semaphore, app_id, user)
            for user in users
        ]
        # return_exceptions=True 让单条失败不中断整批
        raw_results = await asyncio.gather(*coros, return_exceptions=True)

    # 统计结果
    success, fail, error = 0, 0, 0
    for item in raw_results:
        if isinstance(item, Exception):
            error += 1
        elif item["result"].get("ret") == 200:
            success += 1
        else:
            fail += 1

    logger.info("群发完成:成功=%d 业务失败=%d 异常=%d 共=%d",
                success, fail, error, len(users))
    return raw_results


if __name__ == "__main__":
    asyncio.run(batch_push_to_friends(TARGET_USERS, PUSH_APP_ID))

分批发送(避免瞬间压力)

当目标列表非常大(如 5000 人)时,即使有信号量,瞬间创建 5000 个协程对象也会消耗不少内存。建议分批处理,每批之间加短暂间隔:

pythonasync def chunked_batch_push(users: list, app_id: str, chunk_size: int = 200, interval: float = 2.0):
    """
    将用户列表切片,每次处理 chunk_size 条,批次间等待 interval 秒。
    适合超大规模群发场景,兼顾内存和服务端压力。
    """
    for i in range(0, len(users), chunk_size):
        chunk = users[i: i + chunk_size]
        logger.info("处理第 %d 批,共 %d 条", i // chunk_size + 1, len(chunk))
        await batch_push_to_friends(chunk, app_id)
        if i + chunk_size < len(users):
            logger.info("批次间等待 %.1fs ...", interval)
            await asyncio.sleep(interval)

这套编排方式的优势在于:每批内部仍是全并发(受信号量限制),批次之间的短暂停顿给 WechatApi 服务端留出喘息空间,同时让微信账号的发送频率更接近人类操作节奏,降低风控风险。实际项目中可以把 chunk_sizeinterval 做成配置项,根据账号权重和套餐等级动态调整——具体限速阈值可参考 WechatApi 开发文档


实战:多账号并发消息轮询(收消息)

发消息是典型的"写"操作,收消息则需要轮询——用 asyncio 可以轻松实现多账号同时轮询,互不阻塞:

python# main.py
import asyncio
import aiohttp
import logging
from config import APP_IDS, MAX_CONCURRENT
from client import get_message_list, send_text_message

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

POLL_INTERVAL = 3   # 轮询间隔(秒),根据业务需求调整


async def handle_messages(session: aiohttp.ClientSession, app_id: str, messages: list):
    """处理收到的消息,示例:自动回复"""
    for msg in messages:
        from_wxid = msg.get("fromWxid", "")
        text = msg.get("content", "")
        if not from_wxid or not text:
            continue
        # 简单关键词回复示例
        if "价格" in text:
            reply = "您好,详情请访问 https://wechatapi.net 查看套餐。"
        elif "你好" in text or "hello" in text.lower():
            reply = "您好!有什么可以帮您?"
        else:
            reply = None

        if reply:
            result = await send_text_message(session, app_id, from_wxid, reply)
            logger.info("回复 %s → ret=%s", from_wxid, result.get("ret"))


async def poll_account(session: aiohttp.ClientSession, app_id: str, semaphore: asyncio.Semaphore):
    """单账号无限轮询协程"""
    logger.info("开始轮询账号: %s", app_id)
    while True:
        async with semaphore:
            resp = await get_message_list(session, app_id)
        if resp.get("ret") == 200:
            messages = resp.get("data", {}).get("list", [])
            if messages:
                await handle_messages(session, app_id, messages)
        else:
            logger.warning("账号 %s 拉取消息失败: %s", app_id, resp.get("msg"))
        await asyncio.sleep(POLL_INTERVAL)


async def main():
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT + 5)
    async with aiohttp.ClientSession(connector=connector) as session:
        # 为每个 appId 创建独立的轮询协程,全部并发运行
        poll_tasks = [
            poll_account(session, app_id, semaphore)
            for app_id in APP_IDS
        ]
        await asyncio.gather(*poll_tasks)


if __name__ == "__main__":
    asyncio.run(main())

运行效果:50 个账号同时轮询,实际并发出 I/O 的协程受信号量控制不超过 20 路,其余挂起等待,CPU 占用极低,一台普通 2核 云服务器完全可以支撑。


生产级补充:日志、优雅退出与监控

几个让代码从"能用"变成"好用"的细节:

1. 优雅退出:接收 SIGINT/SIGTERM 时取消所有协程,避免消息处理中途截断。

pythonimport signal

def _shutdown(loop: asyncio.AbstractEventLoop):
    for task in asyncio.all_tasks(loop):
        task.cancel()

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, _shutdown, loop)
loop.add_signal_handler(signal.SIGTERM, _shutdown, loop)

2. 结构化日志:生产环境建议换成 structlog 或输出 JSON 格式,便于接入 ELK/Loki 等日志系统。

3. 速率指标:用一个简单的原子计数器统计每分钟发送量,配合 Prometheus / 自建告警,当某账号失败率超阈值时自动隔离。

pythonfrom collections import defaultdict
import time

_counters = defaultdict(int)

async def counted_send(session, app_id, to_wxid, content):
    result = await send_text_message(session, app_id, to_wxid, content)
    minute_key = int(time.time()) // 60
    _counters[f"{app_id}:{minute_key}"] += 1
    return result

常见问题与调优建议

Q:并发数设多少合适? 从 10 开始逐步调高,观察 WechatApi 服务端返回的限流错误(ret 非 200)。一般单账号建议不超过 5 QPS,多账号总并发可以高一些但要控制每账号频率。具体限制参考 WechatApi 开发文档

Q:aiohttp 报 "too many open files"? 调高操作系统文件描述符上限:ulimit -n 65535,同时确保 TCPConnector(limit=...) 不要超过系统限制。

Q:消息有重复拉取怎么办? 在本地维护一个已处理消息 ID 的 set(或 Redis SET),拉取到后先去重再处理。set 可定期清理,只保留最近 N 分钟的 ID。

Q:多进程 vs 单进程多协程? 账号数在 200 以内,单进程 asyncio 完全够用。超过 200 个账号建议按账号分组,用 multiprocessing 或多容器水平扩展,每个进程内再跑 asyncio 事件循环。


小结

本文完整演示了用 Python asyncio + aiohttp 对接 WechatApi 微信机器人接口 的异步并发方案,核心要点:

WechatApi 基于 iPad 协议,稳定性优于网页 Hook 方案,HTTP 接口风格对 Python 异步生态极其友好,是做多账号自动化场景的推荐选型。注册与控制台地址:newmanager.wechatapi.net/dashboard/,接口文档在 post.wechatapi.net,欢迎按需取用。

想动手试试?

WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,注册后几分钟跑通。

立即免费注册查看开发文档

相关产品页

🔗 微信机器人开发(产品页)🔗 微信客服机器人(产品页)🔗 微信群管理机器人(产品页)

相关文章

微信API接口返回失败/收不到消息?完整排查清单微信 API 怎么对接?Python 发出第一条消息实战Node.js 微信机器人开发教程(发消息 + 收回调)个人微信API能力清单:消息/好友/群/朋友圈接口一览
© 2025 WechatApi · 企业级微信智能机器人接入平台
官网价格帮助文档博客
苏ICP备2024128799号 · 苏ICP备2023038368号