前言
在企业内部、学习小组、健身打卡群等场景里,每天手动统计成员签到是一件重复又繁琐的事。管理员需要不断翻看群消息、记录谁发了打卡、谁还没打、到月底再汇总——这套流程稳定占据大量精力,却几乎没有技术含量。
微信群签到打卡机器人可以彻底替代这个人工流程:机器人实时监听群内消息,识别符合格式的打卡内容,自动计数并写入数据库,随时响应查询指令,定时推送签到排行或提醒未打卡成员。本文从需求拆解开始,完整演示一套可上线运行的实现方案,涵盖消息接收与解析、打卡状态存储、自动回复、定时任务等核心环节,所有代码均可直接参考改造。
一、需求拆解与整体架构
1.1 典型业务场景
| 场景 | 触发方式 | 机器人动作 |
|---|---|---|
| 成员发打卡消息 | 群内含关键词(如"打卡""签到") | 自动回复"已记录",写入数据库 |
| 成员查询自己的进度 | 私聊或群内发"我的签到" | 返回本月已签天数与连续天数 |
| 管理员查询排行 | 群内发"签到排行" | 返回当日或本月 TOP10 |
| 每晚提醒 | 定时任务(如 22:00) | 向群发送当日未签到成员名单 |
| 月末汇总 | 定时任务(每月最后一天) | 发送月度打卡报告 |
1.2 整体技术架构
微信客户端(手机/Hook 进程)
│ 群消息到达
▼
消息回调服务(FastAPI) ←── 平台主动 POST 到你的公网地址
│
├── 消息解析模块:判断是否为打卡内容
├── 打卡记录模块:写 SQLite / MySQL
├── 回复模块:调用发消息接口
└── 定时任务模块:APScheduler 定时推送
消息回调是整套方案的入口:平台在收到微信消息后,将消息体 POST 到开发者预先设置的公网 URL,开发者服务返回 HTTP 200 即表示已接收。后续所有逻辑都从这个回调展开。
实际运行时,回调地址需要能被平台服务器访问到,因此必须具备公网可达性。开发阶段可通过 frp 或 ngrok 做内网穿透临时测试;正式上线应部署到有固定公网 IP 的云服务器,并配置 HTTPS,否则部分平台会拒绝回调注册。
二、环境准备与项目结构
2.1 依赖安装
bashpip install fastapi uvicorn apscheduler requests python-dotenv
SQLite 为 Python 内置,无需额外安装;若数据量较大可替换为 MySQL(使用 pymysql 或 sqlalchemy)。
2.2 项目目录结构
checkin_bot/
├── main.py # FastAPI 入口,注册回调路由和定时任务
├── config.py # 配置项(占位符,从环境变量读取)
├── db.py # 数据库初始化与 CRUD
├── parser.py # 消息解析,判断是否打卡
├── sender.py # 封装发消息接口
├── scheduler.py # 定时任务(提醒、汇总)
└── .env # 本地环境变量(不提交 Git)
各模块职责清晰,修改任一功能只需改对应文件,不影响整体入口逻辑。.env 文件用于在本地存放 Token、AppId 等敏感信息,务必加入 .gitignore,不要提交到代码仓库。
2.3 配置文件(config.py)
pythonimport os
from dotenv import load_dotenv
load_dotenv()
# 以下均为占位符,注册后在官方文档获取真实值
BASE = "https://你的接口域名" # 注册后在官方文档获取
TOKEN = os.getenv("API_TOKEN", "你的Token")
APPID = os.getenv("APP_ID", "你的appId")
HEADERS = {"token": TOKEN} # 鉴权字段名以官方文档为准
# 签到群的 wxid,多个群用列表
TARGET_GROUPS = os.getenv("TARGET_GROUPS", "").split(",")
# 签到关键词(命中其一即视为打卡)
CHECKIN_KEYWORDS = ["打卡", "签到", "✅", "完成"]
代码为示例,具体接口地址与字段以官方文档为准。
群的 wxid 可以通过接口查询群列表获取,也可以在回调日志中打印到达的 toWxid 字段来确认。建议先让目标群发一条测试消息,通过日志确认 wxid 后再填入配置。
三、数据库设计(db.py)
3.1 表结构
pythonimport sqlite3
from datetime import date
DB_PATH = "checkin.db"
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS checkins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id TEXT NOT NULL, -- 群 wxid
user_id TEXT NOT NULL, -- 成员 wxid
nickname TEXT, -- 群内昵称(快照)
check_date TEXT NOT NULL, -- 日期 YYYY-MM-DD
content TEXT, -- 原始打卡消息内容
created_at TEXT DEFAULT (datetime('now','localtime'))
)
""")
# 防止同一用户同一天重复计数:唯一索引
c.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS uq_daily
ON checkins(group_id, user_id, check_date)
""")
conn.commit()
conn.close()
def record_checkin(group_id, user_id, nickname, content):
"""
插入打卡记录;若当天已打过则静默忽略(IGNORE 策略)。
返回 True=本次有效打卡,False=今天已打过。
"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
c.execute(
"INSERT OR IGNORE INTO checkins(group_id,user_id,nickname,check_date,content) "
"VALUES(?,?,?,?,?)",
(group_id, user_id, nickname, today, content)
)
inserted = c.rowcount == 1
conn.commit()
return inserted
finally:
conn.close()
def get_user_monthly(group_id, user_id, year_month: str):
"""统计某用户本月已签天数,year_month='2025-06'"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"SELECT COUNT(*) FROM checkins "
"WHERE group_id=? AND user_id=? AND check_date LIKE ?",
(group_id, user_id, f"{year_month}%")
)
count = c.fetchone()[0]
conn.close()
return count
def get_today_checkin_users(group_id):
"""返回今日已打卡 user_id 集合"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"SELECT user_id FROM checkins WHERE group_id=? AND check_date=?",
(group_id, today)
)
result = {row[0] for row in c.fetchall()}
conn.close()
return result
def get_monthly_rank(group_id, year_month: str, top_n=10):
"""本月打卡天数排行"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"SELECT user_id, nickname, COUNT(*) AS cnt "
"FROM checkins "
"WHERE group_id=? AND check_date LIKE ? "
"GROUP BY user_id ORDER BY cnt DESC LIMIT ?",
(group_id, f"{year_month}%", top_n)
)
rows = c.fetchall()
conn.close()
return rows # [(user_id, nickname, cnt), ...]
唯一索引保证同一用户同一天只计一次,即便消息多次触发也不会重复写入。
3.2 数据库选型说明
对于日活成员在 500 人以内的单群场景,SQLite 完全够用,文件读写性能满足需求且零运维成本。若同时管理 10 个以上的打卡群,或者需要跨服务共享数据,建议迁移到 MySQL 或 PostgreSQL。迁移时只需将 sqlite3 换成 pymysql 或 psycopg2,SQL 语句基本兼容,唯一需要调整的是 INSERT OR IGNORE 改为 INSERT IGNORE(MySQL)或 INSERT ... ON CONFLICT DO NOTHING(PostgreSQL)。
四、消息解析与打卡判断(parser.py)
pythonimport re
from config import CHECKIN_KEYWORDS
def is_checkin(content: str) -> bool:
"""
判断消息内容是否为打卡:
命中任意关键词,且消息长度不超过 200 字(过滤长篇讨论文字中偶然出现关键词的情况)。
"""
if not content or len(content) > 200:
return False
for kw in CHECKIN_KEYWORDS:
if kw in content:
return True
return False
def extract_date_tag(content: str):
"""
可选:从消息中提取用户自标注的日期(如"2025-06-01打卡")。
返回 None 则使用服务器当天日期。
"""
match = re.search(r"(\d{4}-\d{2}-\d{2})", content)
return match.group(1) if match else None
关键词列表建议在实际部署时根据群规约定调整,过于宽泛的关键词(如单独的"完成")可能误触发,可改为正则前缀匹配(如 ^打卡)。
实操时还需注意以下几个边界情况:
- 引用回复:有些成员会引用别人的消息后附加"打卡",回调中引用消息的内容和当前发言者的内容往往混在同一个
content字段里,建议先检查平台是否提供quoteMsg字段来做区分,避免把别人的引用内容误算作本人打卡。 - 撤回消息:成员打卡后撤回,目前大多数回调方案不会自动删除已写入的打卡记录,如果对数据精度要求高,需要监听撤回事件消息类型,在接收到撤回通知时到数据库中将对应记录删除。
- 机器人自身消息:机器人发的回复消息也会触发回调,需要通过判断
fromWxid是否等于机器人自身的 wxid 来过滤掉自己发的消息,否则会出现"机器人回复被自己触发打卡"的死循环。
五、发消息封装(sender.py)
WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,HTTP 调用即可,详见官方文档。
下面以发送群文本消息为例封装调用函数:
pythonimport requests
from config import BASE, HEADERS, APPID
def send_group_text(group_wxid: str, content: str):
"""
向指定群发送文本消息。
接口路径及请求体字段以官方文档为准,此处为示例结构。
"""
url = f"{BASE}/message/postText"
body = {
"appId": APPID,
"toWxid": group_wxid,
"content": content
}
try:
resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
data = resp.json()
if data.get("ret") != 200:
print(f"[send_group_text] 失败: {data}")
except Exception as e:
print(f"[send_group_text] 请求异常: {e}")
def send_private_text(user_wxid: str, content: str):
"""向成员发送私聊消息(用于私下通知未打卡者)"""
url = f"{BASE}/message/postText"
body = {
"appId": APPID,
"toWxid": user_wxid,
"content": content
}
try:
resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
data = resp.json()
if data.get("ret") != 200:
print(f"[send_private_text] 失败: {data}")
except Exception as e:
print(f"[send_private_text] 请求异常: {e}")
返回体格式为{"ret":200,"msg":"操作成功","data":{...}},ret==200表示成功。
六、回调服务主逻辑(main.py)
pythonfrom fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from datetime import date
import requests
from config import BASE, HEADERS, APPID, TARGET_GROUPS
from db import init_db, record_checkin, get_user_monthly, get_monthly_rank
from parser import is_checkin
from sender import send_group_text
from scheduler import setup_scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
setup_scheduler()
# 启动时注册回调地址(以官方文档为准,此处仅示意)
# requests.post(f"{BASE}/setCallback", json={"appId": APPID, "callbackUrl": "https://你的公网地址/callback"}, headers=HEADERS)
yield
app = FastAPI(lifespan=lifespan)
@app.post("/callback")
async def callback(request: Request):
"""
平台将微信消息 POST 到此地址。
必须返回 HTTP 200,否则平台会认为推送失败并重试。
字段名以官方文档为准,下面使用示例字段名。
"""
payload = await request.json()
msg_type = payload.get("type") # 消息类型,文本一般为 1 或 "text"
from_wxid = payload.get("fromWxid") # 发送者 wxid
to_wxid = payload.get("toWxid") # 接收者(群 wxid 或个人)
content = payload.get("content", "")
# 群消息一般 toWxid 以 @ 或特定后缀区分,具体以文档为准
# 此处简单判断 toWxid 是否在目标群列表
group_id = to_wxid if to_wxid in TARGET_GROUPS else None
# ── 群内打卡消息 ──
if group_id and is_checkin(content):
nickname = payload.get("fromNickname", from_wxid)
inserted = record_checkin(group_id, from_wxid, nickname, content)
if inserted:
ym = date.today().strftime("%Y-%m")
total = get_user_monthly(group_id, from_wxid, ym)
send_group_text(group_id, f"✅ {nickname} 打卡成功!本月已累计 {total} 天。")
else:
send_group_text(group_id, f"@{from_wxid} 今天已经打过卡啦,明天继续加油!")
# ── 查询指令:签到排行 ──
elif group_id and "签到排行" in content:
ym = date.today().strftime("%Y-%m")
rows = get_monthly_rank(group_id, ym, top_n=10)
if not rows:
send_group_text(group_id, "本月暂无打卡记录。")
else:
lines = [f"📊 {ym} 本月打卡排行:"]
for i, (uid, nick, cnt) in enumerate(rows, 1):
lines.append(f"{i}. {nick or uid} {cnt} 天")
send_group_text(group_id, "\n".join(lines))
return {"code": 200}
6.1 本地启动
bashuvicorn main:app --host 0.0.0.0 --port 8000
公网穿透可用 frp 或 ngrok 做开发测试;生产环境建议部署到有公网 IP 的云服务器,并配置 HTTPS(可用 Nginx + Let's Encrypt)。
6.2 回调注册时机
服务启动后,需要主动调用平台的"注册回调"接口,将你的公网地址告知平台,平台才会开始向该地址推送消息。部分平台支持在管理后台填写回调地址,无需代码调用;部分平台需要 API 动态注册。建议在 lifespan 的启动阶段完成注册,并在注册成功后打印日志确认,以便排查问题。
七、定时任务:每晚提醒与月末汇总(scheduler.py)
pythonfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import date
import calendar
import requests
from config import BASE, HEADERS, APPID, TARGET_GROUPS
from db import get_today_checkin_users, get_monthly_rank
from sender import send_group_text
def get_all_group_members(group_wxid: str) -> list:
"""
获取群成员列表。
接口路径及返回字段以官方文档为准,此处为示例结构。
返回 [(wxid, nickname), ...]
"""
url = f"{BASE}/getChatroomMemberList"
body = {"appId": APPID, "chatroomId": group_wxid}
try:
resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
data = resp.json()
if data.get("ret") == 200:
members = data.get("data", {}).get("members", [])
return [(m.get("wxid"), m.get("nickName")) for m in members]
except Exception as e:
print(f"[get_all_group_members] 异常: {e}")
return []
def daily_remind():
"""每晚 22:00 提醒未打卡成员"""
today = date.today().isoformat()
for group_id in TARGET_GROUPS:
if not group_id:
continue
checked_users = get_today_checkin_users(group_id)
all_members = get_all_group_members(group_id)
# 过滤出未打卡成员(排除机器人自身 wxid 可在 config 中配置)
not_checked = [(wxid, nick) for wxid, nick in all_members if wxid not in checked_users]
if not not_checked:
send_group_text(group_id, f"🎉 {today} 全员打卡完成,太棒了!")
else:
names = "、".join(nick or wxid for wxid, nick in not_checked[:20])
send_group_text(group_id, f"⏰ {today} 打卡提醒\n以下成员还未打卡:\n{names}\n\n请在今晚 23:59 前完成!")
def monthly_summary():
"""每月最后一天 23:30 发月度汇总"""
today = date.today()
last_day = calendar.monthrange(today.year, today.month)[1]
if today.day != last_day:
return # 非最后一天则跳过
ym = today.strftime("%Y-%m")
for group_id in TARGET_GROUPS:
if not group_id:
continue
rows = get_monthly_rank(group_id, ym, top_n=10)
if not rows:
continue
lines = [f"📅 {ym} 月度打卡报告 TOP10:"]
medal = ["🥇", "🥈", "🥉"]
for i, (uid, nick, cnt) in enumerate(rows, 1):
prefix = medal[i - 1] if i <= 3 else f"{i}."
lines.append(f"{prefix} {nick or uid} {cnt} 天")
send_group_text(group_id, "\n".join(lines))
def setup_scheduler():
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
# 每天 22:00 提醒
scheduler.add_job(daily_remind, CronTrigger(hour=22, minute=0))
# 每天 23:30 尝试月末汇总(函数内部判断是否最后一天)
scheduler.add_job(monthly_summary, CronTrigger(hour=23, minute=30))
scheduler.start()
print("定时任务已启动")
定时任务运行在后台线程中,与 FastAPI 的请求处理线程互不干扰。需要注意的是,APScheduler 的 BackgroundScheduler 在主进程退出时会自动停止,因此无需手动清理;但如果使用 gunicorn 多 worker 部署,每个 worker 都会启动自己的调度器,导致同一时刻多次发送提醒消息,需改用 GeventScheduler 或将定时任务单独拆分到独立进程中运行。
八、防封与稳定性注意事项
批量操作微信接口时,频率控制是保障账号安全的关键。以下几点在实际部署中需要重点关注:
消息发送频率:机器人每次回复都会产生一条消息,若群活跃度较高(打卡消息密集),建议对回复做队列缓冲,而不是收到即同步发送。可用 queue.Queue + 独立线程控制每条消息间隔 2-5 秒。
定时推送时间:每晚提醒建议安排在 21:00-22:00,不要在深夜发送以免被成员标记骚扰。群发消息每次只发一条,不要在短时间内连续向同一群发多条。
获取群成员列表:该接口调用不宜过频,建议每天只在定时提醒前调用一次,将结果缓存复用,不要每次收到消息都实时获取。
错误重试:网络抖动时接口可能返回非 200,建议加简单的指数退避重试(最多 3 次),而不是无限循环重试。
账号在线时长:新登录的账号建议在线稳定 3 天后再开启群消息自动回复功能,降低风控触发概率。
回调幂等:平台在推送失败时可能重发同一条消息,数据库的唯一索引(uq_daily)已保证打卡记录幂等,但发送回复时需注意不要因重复处理而发送多次"打卡成功"提示——可在内存中缓存最近处理过的 msgId(示例中未展示,实际按需加)。
服务进程守护:生产环境建议使用 supervisor 或 systemd 对 uvicorn 进程做守护,服务异常退出后自动重启。同时开启日志持久化,将 stdout 重定向到文件,方便事后排查打卡记录丢失或提醒未发送等问题。
九、功能扩展思路
完成基础版本后,还可以按需扩展以下能力:
| 扩展功能 | 实现思路 |
|---|---|
| 连续打卡天数统计 | 查询该用户近 N 天是否每天均有记录,逐天回溯计数 |
| 补卡申请流程 | 成员私聊发"补卡 2025-06-01",机器人 @ 管理员审批后写入 |
| 图片打卡 | 回调中判断消息类型为图片,下载图片后存储路径;可结合 OCR 验证图片内容 |
| 多维度打卡(早/午/晚) | 在唯一索引中加入"打卡时间段"维度,每天允许多次不同时段打卡 |
| 打卡积分兑换 | 打卡天数累计积分,单独建积分表,提供兑换指令 |
| Web 管理后台 | 用 Flask/FastAPI 额外提供一个 HTML 页面,展示打卡日历和导出 Excel |
总结
微信群签到打卡机器人的核心在于三件事:稳定接收消息回调、准确识别打卡意图、可靠地写入与查询记录。本文给出了从项目结构、数据库设计、消息解析、接口封装到定时任务的完整实现路径,稍作配置即可适配不同群场景。在此基础上叠加连续签到、补卡审批、图片验证等扩展,便能满足绝大多数打卡管理需求。
实际部署时,建议先在测试群验证关键词匹配和回调幂等逻辑,再切换到正式群;同时做好数据库定期备份,避免因服务器故障导致历史打卡数据丢失。本文代码均为示例框架,接口地址、鉴权字段等具体细节以官方文档为准。
