前言
在需要同时管理数十乃至数百个微信账号的业务场景下——比如客服系统、营销推送、自动化回复机器人——传统同步阻塞式发送很快会成为性能瓶颈。Python 的 asyncio + aiohttp 组合天然适合这类 I/O 密集型任务:单线程事件循环即可驱动上百路并发请求,不依赖多线程带来的锁竞争。本文以 WechatApi 提供的个人微信 HTTP API 为接口后端,完整演示如何用异步并发方式高效收发消息、控制速率以及处理重试逻辑。
同步 vs 异步:先量化差距
在深入代码之前,先看看两种模式的核心差异,帮助你判断是否值得投入改造成本。
| 维度 | 同步(requests) | 异步(asyncio + aiohttp) |
|---|---|---|
| 并发模型 | 一次一请求,阻塞等待响应 | 事件循环,挂起等待 I/O,切换执行其他任务 |
| 线程/进程开销 | 并发需多线程/多进程 | 单线程,无上下文切换开销 |
| 适合场景 | 脚本、单账号、低频操作 | 多账号批量推送、实时消息轮询 |
| 典型吞吐(100账号) | ~10-20 msg/s(受线程池限制) | ~200-500 msg/s(受网络和服务端限速) |
| 编码复杂度 | 低 | 中(需理解 async/await、事件循环) |
| 异常隔离 | 单次失败影响同批次 | 每个协程独立,失败可单独重试 |
结论很清楚:账号数量或消息量一旦上去,异步方案的优势是数量级的。WechatApi 的个人微信接口 支持多 appId 独立鉴权,天然契合这种"多账号并发"的异步架构。
环境准备与项目结构
bashpip install aiohttp asyncio
建议的目录结构:
wechat_async/
├── client.py # 封装单个 API 调用
├── scheduler.py # 并发调度与信号量控制
├── retry.py # 指数退避重试装饰器
└── main.py # 入口
基础配置常量(不要把 token 硬编码进代码,用环境变量或配置文件):
python# config.py
import os
API_BASE = "https://api.wechatapi.net" # 示意域名,以实际文档为准
VIDEOS_API_TOKEN = os.environ["VIDEOS_API_TOKEN"] # VideosApi-token 鉴权头
# 每个账号都有独立 appId,从控制台获取
# https://newmanager.wechatapi.net/dashboard/
APP_IDS = [
"app_xxxx_01",
"app_xxxx_02",
"app_xxxx_03",
# ... 按需填入
]
# 并发控制
MAX_CONCURRENT = 20 # 同一时刻最多 20 路并发请求
RETRY_TIMES = 3
RETRY_BACKOFF = 1.5 # 指数退避基数(秒)
封装异步 HTTP 客户端
WechatApi 的接口风格是标准的 HTTP POST + JSON 请求体,鉴权通过请求头 VideosApi-token 传递,响应格式统一为:
json{"ret": 200, "msg": "success", "data": {...}}
我们先封装一个带会话复用的异步客户端:
python# client.py
import asyncio
import aiohttp
import logging
from config import API_BASE, VIDEOS_API_TOKEN, RETRY_TIMES, RETRY_BACKOFF
logger = logging.getLogger(__name__)
async def _request_with_retry(
session: aiohttp.ClientSession,
endpoint: str,
payload: dict,
retries: int = RETRY_TIMES,
) -> dict:
"""
带指数退避重试的单次 POST 请求。
endpoint: 相对路径,如 "/message/send"
"""
url = f"{API_BASE}{endpoint}"
headers = {
"VideosApi-token": VIDEOS_API_TOKEN,
"Content-Type": "application/json",
}
last_exc = None
for attempt in range(retries):
try:
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
result = await resp.json()
if result.get("ret") == 200:
return result
# 业务层错误(如账号异常、频率超限)不重试,直接返回
logger.warning("API 业务错误 attempt=%d: %s", attempt, result)
return result
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
last_exc = e
wait = RETRY_BACKOFF ** attempt
logger.warning("请求异常 attempt=%d, 等待 %.1fs 后重试: %s", attempt, wait, e)
await asyncio.sleep(wait)
logger.error("重试耗尽,最后异常: %s", last_exc)
return {"ret": -1, "msg": str(last_exc), "data": {}}
async def send_text_message(
session: aiohttp.ClientSession,
app_id: str,
to_wxid: str,
content: str,
) -> dict:
"""发送文本消息"""
payload = {
"appId": app_id,
"toWxid": to_wxid,
"content": content,
}
return await _request_with_retry(session, "/message/sendText", payload)
async def get_message_list(
session: aiohttp.ClientSession,
app_id: str,
page: int = 1,
) -> dict:
"""拉取消息列表(轮询收消息)"""
payload = {
"appId": app_id,
"page": page,
}
return await _request_with_retry(session, "/message/list", payload)
几个设计要点:
- 会话复用:
aiohttp.ClientSession在整个程序生命周期内只创建一次,底层连接池被所有协程共享,避免每次请求的 TCP 握手开销。 - 超时设置:
ClientTimeout(total=10)避免因单个请求挂死而耗尽事件循环资源。 - 业务错误 vs 网络错误:业务层返回的错误码(如账号被踢、参数非法)通常不适合重试,需要区分处理。
信号量限速:保护服务端与自身账号
并发太高容易触发 WechatApi 服务端的频率限制,也可能导致微信账号被风控。asyncio.Semaphore 是控制最大并发数的最简洁方案:
python# scheduler.py
import asyncio
import aiohttp
from typing import List, Tuple
from client import send_text_message
from config import MAX_CONCURRENT
async def batch_send(
tasks: List[Tuple[str, str, str]], # [(app_id, to_wxid, content), ...]
) -> List[dict]:
"""
并发发送批量消息,最大并发受 MAX_CONCURRENT 信号量限制。
返回每条任务的 API 响应列表,顺序与输入对应。
"""
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
results = [None] * len(tasks)
async def _worker(idx: int, app_id: str, to_wxid: str, content: str, session: aiohttp.ClientSession):
async with semaphore: # 获取信号量槽位
result = await send_text_message(session, app_id, to_wxid, content)
results[idx] = result
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT + 5) # 连接池略大于并发数
async with aiohttp.ClientSession(connector=connector) as session:
coroutines = [
_worker(i, app_id, to_wxid, content, session)
for i, (app_id, to_wxid, content) in enumerate(tasks)
]
await asyncio.gather(*coroutines, return_exceptions=False)
return results
信号量工作原理:asyncio.Semaphore(20) 相当于一个计数锁,同一时刻最多允许 20 个协程进入 async with semaphore 块,其余的在事件循环中挂起等待,不会阻塞线程。这比线程池的 ThreadPoolExecutor(max_workers=20) 内存占用低一个数量级。
异常重试与超时处理
网络环境不稳定时,一次请求失败不代表整个任务失败。优秀的异步程序必须区分"可重试异常"和"不可重试异常",并配合合理的退避策略,避免在服务端压力大时雪上加霜。
指数退避重试装饰器
在前面的 _request_with_retry 中已经内嵌了重试逻辑,但对于更通用的场景,可以把它封装成装饰器,让任意异步函数都能获得重试能力:
python# retry.py
import asyncio
import functools
import logging
from typing import Tuple, Type
logger = logging.getLogger(__name__)
def async_retry(
retries: int = 3,
backoff: float = 1.5,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
):
"""
异步函数指数退避重试装饰器。
:param retries: 最大重试次数(不含首次调用)
:param backoff: 退避基数,第 n 次重试等待 backoff^n 秒
:param exceptions: 触发重试的异常类型元组
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exc = e
if attempt == retries:
break
wait = backoff ** attempt
logger.warning(
"[retry] %s attempt=%d/%d, 等待 %.2fs: %s",
func.__name__, attempt + 1, retries, wait, e,
)
await asyncio.sleep(wait)
logger.error("[retry] %s 重试耗尽: %s", func.__name__, last_exc)
raise last_exc
return wrapper
return decorator
使用示例——对 WechatApi 发送接口加装饰器:
pythonimport aiohttp
import asyncio
from retry import async_retry
from config import API_BASE, VIDEOS_API_TOKEN
@async_retry(retries=4, backoff=2.0, exceptions=(aiohttp.ClientError, asyncio.TimeoutError))
async def safe_send_message(session: aiohttp.ClientSession, app_id: str, to_wxid: str, content: str) -> dict:
"""带重试保障的文本消息发送,仅对网络异常重试,业务错误直接返回。"""
url = f"{API_BASE}/message/sendText"
headers = {
"VideosApi-token": VIDEOS_API_TOKEN,
"Content-Type": "application/json",
}
payload = {"appId": app_id, "toWxid": to_wxid, "content": content}
async with session.post(url, json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=8)) as resp:
result = await resp.json()
# 业务层错误不触发重试,直接返回给上层判断
return result
超时分级设置
aiohttp 的 ClientTimeout 支持多层超时,细粒度控制比单一 total 更合理:
pythontimeout = aiohttp.ClientTimeout(
total=15, # 整个请求(包含重定向)的最大时长
connect=3, # TCP 连接建立超时
sock_connect=3, # socket 级别连接超时
sock_read=10, # 读取响应体超时
)
对 WechatApi 这类接口服务,建议 connect=3(连接快则好,超时说明网络有问题),sock_read=10(服务端处理消息发送有时需要稍长时间)。如果遇到批量发送时偶发 asyncio.TimeoutError,优先检查是否触发了服务端限流,而非盲目调大超时。
实战:批量给好友群发的异步任务编排
一个典型的营销场景是:从数据库取出一批好友 wxid,对每人发送个性化文案,完成后统计成功/失败数量。下面用 asyncio.gather 完整演示这一编排流程。
数据准备与任务构造
python# batch_push.py
import asyncio
import aiohttp
import logging
from client import send_text_message
from config import APP_IDS, MAX_CONCURRENT, VIDEOS_API_TOKEN
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# 模拟从数据库读出的目标用户列表
# 实际场景替换为 SELECT wxid, nickname FROM contacts
TARGET_USERS = [
{"wxid": "wxid_aaa001", "nickname": "张三"},
{"wxid": "wxid_bbb002", "nickname": "李四"},
{"wxid": "wxid_ccc003", "nickname": "王五"},
# ... 实际可能有几百上千条
]
PUSH_APP_ID = APP_IDS[0] # 使用第一个账号执行群发
def build_message(nickname: str) -> str:
"""构造个性化文案,实际可接模板引擎"""
return (
f"Hi {nickname},我们新上线了专属优惠活动,"
f"欢迎访问 https://wechatapi.net 了解更多详情,"
f"有问题随时回复本条消息~"
)
带信号量的并发群发
pythonasync def push_one(
session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore,
app_id: str,
user: dict,
) -> dict:
"""向单个用户发送消息,受信号量限流。"""
async with semaphore:
content = build_message(user["nickname"])
result = await send_text_message(session, app_id, user["wxid"], content)
status = "OK" if result.get("ret") == 200 else "FAIL"
logger.info("%s → %s wxid=%s", status, user["nickname"], user["wxid"])
return {"user": user, "result": result}
async def batch_push_to_friends(users: list, app_id: str, max_concurrent: int = MAX_CONCURRENT):
"""
批量群发入口:asyncio.gather 并发调度所有任务,
信号量确保同一时刻不超过 max_concurrent 路请求。
"""
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(limit=max_concurrent + 5)
async with aiohttp.ClientSession(connector=connector) as session:
# 为每个用户创建协程,全部提交给 gather
coros = [
push_one(session, semaphore, app_id, user)
for user in users
]
# return_exceptions=True 让单条失败不中断整批
raw_results = await asyncio.gather(*coros, return_exceptions=True)
# 统计结果
success, fail, error = 0, 0, 0
for item in raw_results:
if isinstance(item, Exception):
error += 1
elif item["result"].get("ret") == 200:
success += 1
else:
fail += 1
logger.info("群发完成:成功=%d 业务失败=%d 异常=%d 共=%d",
success, fail, error, len(users))
return raw_results
if __name__ == "__main__":
asyncio.run(batch_push_to_friends(TARGET_USERS, PUSH_APP_ID))
分批发送(避免瞬间压力)
当目标列表非常大(如 5000 人)时,即使有信号量,瞬间创建 5000 个协程对象也会消耗不少内存。建议分批处理,每批之间加短暂间隔:
pythonasync def chunked_batch_push(users: list, app_id: str, chunk_size: int = 200, interval: float = 2.0):
"""
将用户列表切片,每次处理 chunk_size 条,批次间等待 interval 秒。
适合超大规模群发场景,兼顾内存和服务端压力。
"""
for i in range(0, len(users), chunk_size):
chunk = users[i: i + chunk_size]
logger.info("处理第 %d 批,共 %d 条", i // chunk_size + 1, len(chunk))
await batch_push_to_friends(chunk, app_id)
if i + chunk_size < len(users):
logger.info("批次间等待 %.1fs ...", interval)
await asyncio.sleep(interval)
这套编排方式的优势在于:每批内部仍是全并发(受信号量限制),批次之间的短暂停顿给 WechatApi 服务端留出喘息空间,同时让微信账号的发送频率更接近人类操作节奏,降低风控风险。实际项目中可以把 chunk_size 和 interval 做成配置项,根据账号权重和套餐等级动态调整——具体限速阈值可参考 WechatApi 开发文档。
实战:多账号并发消息轮询(收消息)
发消息是典型的"写"操作,收消息则需要轮询——用 asyncio 可以轻松实现多账号同时轮询,互不阻塞:
python# main.py
import asyncio
import aiohttp
import logging
from config import APP_IDS, MAX_CONCURRENT
from client import get_message_list, send_text_message
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
POLL_INTERVAL = 3 # 轮询间隔(秒),根据业务需求调整
async def handle_messages(session: aiohttp.ClientSession, app_id: str, messages: list):
"""处理收到的消息,示例:自动回复"""
for msg in messages:
from_wxid = msg.get("fromWxid", "")
text = msg.get("content", "")
if not from_wxid or not text:
continue
# 简单关键词回复示例
if "价格" in text:
reply = "您好,详情请访问 https://wechatapi.net 查看套餐。"
elif "你好" in text or "hello" in text.lower():
reply = "您好!有什么可以帮您?"
else:
reply = None
if reply:
result = await send_text_message(session, app_id, from_wxid, reply)
logger.info("回复 %s → ret=%s", from_wxid, result.get("ret"))
async def poll_account(session: aiohttp.ClientSession, app_id: str, semaphore: asyncio.Semaphore):
"""单账号无限轮询协程"""
logger.info("开始轮询账号: %s", app_id)
while True:
async with semaphore:
resp = await get_message_list(session, app_id)
if resp.get("ret") == 200:
messages = resp.get("data", {}).get("list", [])
if messages:
await handle_messages(session, app_id, messages)
else:
logger.warning("账号 %s 拉取消息失败: %s", app_id, resp.get("msg"))
await asyncio.sleep(POLL_INTERVAL)
async def main():
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT + 5)
async with aiohttp.ClientSession(connector=connector) as session:
# 为每个 appId 创建独立的轮询协程,全部并发运行
poll_tasks = [
poll_account(session, app_id, semaphore)
for app_id in APP_IDS
]
await asyncio.gather(*poll_tasks)
if __name__ == "__main__":
asyncio.run(main())
运行效果:50 个账号同时轮询,实际并发出 I/O 的协程受信号量控制不超过 20 路,其余挂起等待,CPU 占用极低,一台普通 2核 云服务器完全可以支撑。
生产级补充:日志、优雅退出与监控
几个让代码从"能用"变成"好用"的细节:
1. 优雅退出:接收 SIGINT/SIGTERM 时取消所有协程,避免消息处理中途截断。
pythonimport signal
def _shutdown(loop: asyncio.AbstractEventLoop):
for task in asyncio.all_tasks(loop):
task.cancel()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, _shutdown, loop)
loop.add_signal_handler(signal.SIGTERM, _shutdown, loop)
2. 结构化日志:生产环境建议换成 structlog 或输出 JSON 格式,便于接入 ELK/Loki 等日志系统。
3. 速率指标:用一个简单的原子计数器统计每分钟发送量,配合 Prometheus / 自建告警,当某账号失败率超阈值时自动隔离。
pythonfrom collections import defaultdict
import time
_counters = defaultdict(int)
async def counted_send(session, app_id, to_wxid, content):
result = await send_text_message(session, app_id, to_wxid, content)
minute_key = int(time.time()) // 60
_counters[f"{app_id}:{minute_key}"] += 1
return result
常见问题与调优建议
Q:并发数设多少合适? 从 10 开始逐步调高,观察 WechatApi 服务端返回的限流错误(ret 非 200)。一般单账号建议不超过 5 QPS,多账号总并发可以高一些但要控制每账号频率。具体限制参考 WechatApi 开发文档。
Q:aiohttp 报 "too many open files"? 调高操作系统文件描述符上限:ulimit -n 65535,同时确保 TCPConnector(limit=...) 不要超过系统限制。
Q:消息有重复拉取怎么办? 在本地维护一个已处理消息 ID 的 set(或 Redis SET),拉取到后先去重再处理。set 可定期清理,只保留最近 N 分钟的 ID。
Q:多进程 vs 单进程多协程? 账号数在 200 以内,单进程 asyncio 完全够用。超过 200 个账号建议按账号分组,用 multiprocessing 或多容器水平扩展,每个进程内再跑 asyncio 事件循环。
小结
本文完整演示了用 Python asyncio + aiohttp 对接 WechatApi 微信机器人接口 的异步并发方案,核心要点:
- aiohttp.ClientSession 全局复用,避免连接重建开销;
- asyncio.Semaphore 控制最大并发,防止触发服务端限流或账号风控;
- 指数退避重试区分网络异常与业务异常,提升稳定性;
- asyncio.gather 统一编排批量群发任务,配合分批切片应对超大规模推送;
- 多账号同时轮询收消息,单线程驱动,资源占用极低。
WechatApi 基于 iPad 协议,稳定性优于网页 Hook 方案,HTTP 接口风格对 Python 异步生态极其友好,是做多账号自动化场景的推荐选型。注册与控制台地址:newmanager.wechatapi.net/dashboard/,接口文档在 post.wechatapi.net,欢迎按需取用。
