首页 / 博客 / 机器人·功能实战

微信群签到打卡机器人开发实战

分类:机器人·功能实战 · 标签:微信签到打卡、微信群机器人、Python

前言

在企业内部、学习小组、健身打卡群等场景里,每天手动统计成员签到是一件重复又繁琐的事。管理员需要不断翻看群消息、记录谁发了打卡、谁还没打、到月底再汇总——这套流程稳定占据大量精力,却几乎没有技术含量。

微信群签到打卡机器人可以彻底替代这个人工流程:机器人实时监听群内消息,识别符合格式的打卡内容,自动计数并写入数据库,随时响应查询指令,定时推送签到排行或提醒未打卡成员。本文从需求拆解开始,完整演示一套可上线运行的实现方案,涵盖消息接收与解析、打卡状态存储、自动回复、定时任务等核心环节,所有代码均可直接参考改造。


一、需求拆解与整体架构

1.1 典型业务场景

场景触发方式机器人动作
成员发打卡消息群内含关键词(如"打卡""签到")自动回复"已记录",写入数据库
成员查询自己的进度私聊或群内发"我的签到"返回本月已签天数与连续天数
管理员查询排行群内发"签到排行"返回当日或本月 TOP10
每晚提醒定时任务(如 22:00)向群发送当日未签到成员名单
月末汇总定时任务(每月最后一天)发送月度打卡报告

1.2 整体技术架构

微信客户端(手机/Hook 进程)
        │ 群消息到达
        ▼
  消息回调服务(FastAPI)  ←── 平台主动 POST 到你的公网地址
        │
        ├── 消息解析模块:判断是否为打卡内容
        ├── 打卡记录模块:写 SQLite / MySQL
        ├── 回复模块:调用发消息接口
        └── 定时任务模块:APScheduler 定时推送

消息回调是整套方案的入口:平台在收到微信消息后,将消息体 POST 到开发者预先设置的公网 URL,开发者服务返回 HTTP 200 即表示已接收。后续所有逻辑都从这个回调展开。

实际运行时,回调地址需要能被平台服务器访问到,因此必须具备公网可达性。开发阶段可通过 frp 或 ngrok 做内网穿透临时测试;正式上线应部署到有固定公网 IP 的云服务器,并配置 HTTPS,否则部分平台会拒绝回调注册。


二、环境准备与项目结构

2.1 依赖安装

bashpip install fastapi uvicorn apscheduler requests python-dotenv

SQLite 为 Python 内置,无需额外安装;若数据量较大可替换为 MySQL(使用 pymysqlsqlalchemy)。

2.2 项目目录结构

checkin_bot/
├── main.py          # FastAPI 入口,注册回调路由和定时任务
├── config.py        # 配置项(占位符,从环境变量读取)
├── db.py            # 数据库初始化与 CRUD
├── parser.py        # 消息解析,判断是否打卡
├── sender.py        # 封装发消息接口
├── scheduler.py     # 定时任务(提醒、汇总)
└── .env             # 本地环境变量(不提交 Git)

各模块职责清晰,修改任一功能只需改对应文件,不影响整体入口逻辑。.env 文件用于在本地存放 Token、AppId 等敏感信息,务必加入 .gitignore,不要提交到代码仓库。

2.3 配置文件(config.py)

pythonimport os
from dotenv import load_dotenv

load_dotenv()

# 以下均为占位符,注册后在官方文档获取真实值
BASE  = "https://你的接口域名"   # 注册后在官方文档获取
TOKEN = os.getenv("API_TOKEN", "你的Token")
APPID = os.getenv("APP_ID", "你的appId")
HEADERS = {"token": TOKEN}       # 鉴权字段名以官方文档为准

# 签到群的 wxid,多个群用列表
TARGET_GROUPS = os.getenv("TARGET_GROUPS", "").split(",")

# 签到关键词(命中其一即视为打卡)
CHECKIN_KEYWORDS = ["打卡", "签到", "✅", "完成"]
代码为示例,具体接口地址与字段以官方文档为准。

群的 wxid 可以通过接口查询群列表获取,也可以在回调日志中打印到达的 toWxid 字段来确认。建议先让目标群发一条测试消息,通过日志确认 wxid 后再填入配置。


三、数据库设计(db.py)

3.1 表结构

pythonimport sqlite3
from datetime import date

DB_PATH = "checkin.db"

def init_db():
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute("""
        CREATE TABLE IF NOT EXISTS checkins (
            id        INTEGER PRIMARY KEY AUTOINCREMENT,
            group_id  TEXT NOT NULL,       -- 群 wxid
            user_id   TEXT NOT NULL,       -- 成员 wxid
            nickname  TEXT,                -- 群内昵称(快照)
            check_date TEXT NOT NULL,      -- 日期 YYYY-MM-DD
            content   TEXT,                -- 原始打卡消息内容
            created_at TEXT DEFAULT (datetime('now','localtime'))
        )
    """)
    # 防止同一用户同一天重复计数:唯一索引
    c.execute("""
        CREATE UNIQUE INDEX IF NOT EXISTS uq_daily
        ON checkins(group_id, user_id, check_date)
    """)
    conn.commit()
    conn.close()

def record_checkin(group_id, user_id, nickname, content):
    """
    插入打卡记录;若当天已打过则静默忽略(IGNORE 策略)。
    返回 True=本次有效打卡,False=今天已打过。
    """
    today = date.today().isoformat()
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    try:
        c.execute(
            "INSERT OR IGNORE INTO checkins(group_id,user_id,nickname,check_date,content) "
            "VALUES(?,?,?,?,?)",
            (group_id, user_id, nickname, today, content)
        )
        inserted = c.rowcount == 1
        conn.commit()
        return inserted
    finally:
        conn.close()

def get_user_monthly(group_id, user_id, year_month: str):
    """统计某用户本月已签天数,year_month='2025-06'"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute(
        "SELECT COUNT(*) FROM checkins "
        "WHERE group_id=? AND user_id=? AND check_date LIKE ?",
        (group_id, user_id, f"{year_month}%")
    )
    count = c.fetchone()[0]
    conn.close()
    return count

def get_today_checkin_users(group_id):
    """返回今日已打卡 user_id 集合"""
    today = date.today().isoformat()
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute(
        "SELECT user_id FROM checkins WHERE group_id=? AND check_date=?",
        (group_id, today)
    )
    result = {row[0] for row in c.fetchall()}
    conn.close()
    return result

def get_monthly_rank(group_id, year_month: str, top_n=10):
    """本月打卡天数排行"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute(
        "SELECT user_id, nickname, COUNT(*) AS cnt "
        "FROM checkins "
        "WHERE group_id=? AND check_date LIKE ? "
        "GROUP BY user_id ORDER BY cnt DESC LIMIT ?",
        (group_id, f"{year_month}%", top_n)
    )
    rows = c.fetchall()
    conn.close()
    return rows  # [(user_id, nickname, cnt), ...]

唯一索引保证同一用户同一天只计一次,即便消息多次触发也不会重复写入。

3.2 数据库选型说明

对于日活成员在 500 人以内的单群场景,SQLite 完全够用,文件读写性能满足需求且零运维成本。若同时管理 10 个以上的打卡群,或者需要跨服务共享数据,建议迁移到 MySQL 或 PostgreSQL。迁移时只需将 sqlite3 换成 pymysqlpsycopg2,SQL 语句基本兼容,唯一需要调整的是 INSERT OR IGNORE 改为 INSERT IGNORE(MySQL)或 INSERT ... ON CONFLICT DO NOTHING(PostgreSQL)。


四、消息解析与打卡判断(parser.py)

pythonimport re
from config import CHECKIN_KEYWORDS

def is_checkin(content: str) -> bool:
    """
    判断消息内容是否为打卡:
    命中任意关键词,且消息长度不超过 200 字(过滤长篇讨论文字中偶然出现关键词的情况)。
    """
    if not content or len(content) > 200:
        return False
    for kw in CHECKIN_KEYWORDS:
        if kw in content:
            return True
    return False

def extract_date_tag(content: str):
    """
    可选:从消息中提取用户自标注的日期(如"2025-06-01打卡")。
    返回 None 则使用服务器当天日期。
    """
    match = re.search(r"(\d{4}-\d{2}-\d{2})", content)
    return match.group(1) if match else None

关键词列表建议在实际部署时根据群规约定调整,过于宽泛的关键词(如单独的"完成")可能误触发,可改为正则前缀匹配(如 ^打卡)。

实操时还需注意以下几个边界情况:


五、发消息封装(sender.py)

WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,HTTP 调用即可,详见官方文档。

下面以发送群文本消息为例封装调用函数:

pythonimport requests
from config import BASE, HEADERS, APPID

def send_group_text(group_wxid: str, content: str):
    """
    向指定群发送文本消息。
    接口路径及请求体字段以官方文档为准,此处为示例结构。
    """
    url = f"{BASE}/message/postText"
    body = {
        "appId": APPID,
        "toWxid": group_wxid,
        "content": content
    }
    try:
        resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
        data = resp.json()
        if data.get("ret") != 200:
            print(f"[send_group_text] 失败: {data}")
    except Exception as e:
        print(f"[send_group_text] 请求异常: {e}")

def send_private_text(user_wxid: str, content: str):
    """向成员发送私聊消息(用于私下通知未打卡者)"""
    url = f"{BASE}/message/postText"
    body = {
        "appId": APPID,
        "toWxid": user_wxid,
        "content": content
    }
    try:
        resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
        data = resp.json()
        if data.get("ret") != 200:
            print(f"[send_private_text] 失败: {data}")
    except Exception as e:
        print(f"[send_private_text] 请求异常: {e}")
返回体格式为 {"ret":200,"msg":"操作成功","data":{...}}ret==200 表示成功。

六、回调服务主逻辑(main.py)

pythonfrom fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from datetime import date
import requests

from config import BASE, HEADERS, APPID, TARGET_GROUPS
from db import init_db, record_checkin, get_user_monthly, get_monthly_rank
from parser import is_checkin
from sender import send_group_text
from scheduler import setup_scheduler

@asynccontextmanager
async def lifespan(app: FastAPI):
    init_db()
    setup_scheduler()
    # 启动时注册回调地址(以官方文档为准,此处仅示意)
    # requests.post(f"{BASE}/setCallback", json={"appId": APPID, "callbackUrl": "https://你的公网地址/callback"}, headers=HEADERS)
    yield

app = FastAPI(lifespan=lifespan)

@app.post("/callback")
async def callback(request: Request):
    """
    平台将微信消息 POST 到此地址。
    必须返回 HTTP 200,否则平台会认为推送失败并重试。
    字段名以官方文档为准,下面使用示例字段名。
    """
    payload = await request.json()

    msg_type  = payload.get("type")       # 消息类型,文本一般为 1 或 "text"
    from_wxid = payload.get("fromWxid")   # 发送者 wxid
    to_wxid   = payload.get("toWxid")     # 接收者(群 wxid 或个人)
    content   = payload.get("content", "")
    # 群消息一般 toWxid 以 @ 或特定后缀区分,具体以文档为准
    # 此处简单判断 toWxid 是否在目标群列表
    group_id = to_wxid if to_wxid in TARGET_GROUPS else None

    # ── 群内打卡消息 ──
    if group_id and is_checkin(content):
        nickname = payload.get("fromNickname", from_wxid)
        inserted = record_checkin(group_id, from_wxid, nickname, content)
        if inserted:
            ym = date.today().strftime("%Y-%m")
            total = get_user_monthly(group_id, from_wxid, ym)
            send_group_text(group_id, f"✅ {nickname} 打卡成功!本月已累计 {total} 天。")
        else:
            send_group_text(group_id, f"@{from_wxid} 今天已经打过卡啦,明天继续加油!")

    # ── 查询指令:签到排行 ──
    elif group_id and "签到排行" in content:
        ym = date.today().strftime("%Y-%m")
        rows = get_monthly_rank(group_id, ym, top_n=10)
        if not rows:
            send_group_text(group_id, "本月暂无打卡记录。")
        else:
            lines = [f"📊 {ym} 本月打卡排行:"]
            for i, (uid, nick, cnt) in enumerate(rows, 1):
                lines.append(f"{i}. {nick or uid}  {cnt} 天")
            send_group_text(group_id, "\n".join(lines))

    return {"code": 200}

6.1 本地启动

bashuvicorn main:app --host 0.0.0.0 --port 8000

公网穿透可用 frp 或 ngrok 做开发测试;生产环境建议部署到有公网 IP 的云服务器,并配置 HTTPS(可用 Nginx + Let's Encrypt)。

6.2 回调注册时机

服务启动后,需要主动调用平台的"注册回调"接口,将你的公网地址告知平台,平台才会开始向该地址推送消息。部分平台支持在管理后台填写回调地址,无需代码调用;部分平台需要 API 动态注册。建议在 lifespan 的启动阶段完成注册,并在注册成功后打印日志确认,以便排查问题。


七、定时任务:每晚提醒与月末汇总(scheduler.py)

pythonfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import date
import calendar
import requests

from config import BASE, HEADERS, APPID, TARGET_GROUPS
from db import get_today_checkin_users, get_monthly_rank
from sender import send_group_text

def get_all_group_members(group_wxid: str) -> list:
    """
    获取群成员列表。
    接口路径及返回字段以官方文档为准,此处为示例结构。
    返回 [(wxid, nickname), ...]
    """
    url = f"{BASE}/getChatroomMemberList"
    body = {"appId": APPID, "chatroomId": group_wxid}
    try:
        resp = requests.post(url, json=body, headers=HEADERS, timeout=10)
        data = resp.json()
        if data.get("ret") == 200:
            members = data.get("data", {}).get("members", [])
            return [(m.get("wxid"), m.get("nickName")) for m in members]
    except Exception as e:
        print(f"[get_all_group_members] 异常: {e}")
    return []

def daily_remind():
    """每晚 22:00 提醒未打卡成员"""
    today = date.today().isoformat()
    for group_id in TARGET_GROUPS:
        if not group_id:
            continue
        checked_users = get_today_checkin_users(group_id)
        all_members = get_all_group_members(group_id)
        # 过滤出未打卡成员(排除机器人自身 wxid 可在 config 中配置)
        not_checked = [(wxid, nick) for wxid, nick in all_members if wxid not in checked_users]
        if not not_checked:
            send_group_text(group_id, f"🎉 {today} 全员打卡完成,太棒了!")
        else:
            names = "、".join(nick or wxid for wxid, nick in not_checked[:20])
            send_group_text(group_id, f"⏰ {today} 打卡提醒\n以下成员还未打卡:\n{names}\n\n请在今晚 23:59 前完成!")

def monthly_summary():
    """每月最后一天 23:30 发月度汇总"""
    today = date.today()
    last_day = calendar.monthrange(today.year, today.month)[1]
    if today.day != last_day:
        return  # 非最后一天则跳过
    ym = today.strftime("%Y-%m")
    for group_id in TARGET_GROUPS:
        if not group_id:
            continue
        rows = get_monthly_rank(group_id, ym, top_n=10)
        if not rows:
            continue
        lines = [f"📅 {ym} 月度打卡报告 TOP10:"]
        medal = ["🥇", "🥈", "🥉"]
        for i, (uid, nick, cnt) in enumerate(rows, 1):
            prefix = medal[i - 1] if i <= 3 else f"{i}."
            lines.append(f"{prefix} {nick or uid}  {cnt} 天")
        send_group_text(group_id, "\n".join(lines))

def setup_scheduler():
    scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
    # 每天 22:00 提醒
    scheduler.add_job(daily_remind, CronTrigger(hour=22, minute=0))
    # 每天 23:30 尝试月末汇总(函数内部判断是否最后一天)
    scheduler.add_job(monthly_summary, CronTrigger(hour=23, minute=30))
    scheduler.start()
    print("定时任务已启动")

定时任务运行在后台线程中,与 FastAPI 的请求处理线程互不干扰。需要注意的是,APScheduler 的 BackgroundScheduler 在主进程退出时会自动停止,因此无需手动清理;但如果使用 gunicorn 多 worker 部署,每个 worker 都会启动自己的调度器,导致同一时刻多次发送提醒消息,需改用 GeventScheduler 或将定时任务单独拆分到独立进程中运行。


八、防封与稳定性注意事项

批量操作微信接口时,频率控制是保障账号安全的关键。以下几点在实际部署中需要重点关注:

消息发送频率:机器人每次回复都会产生一条消息,若群活跃度较高(打卡消息密集),建议对回复做队列缓冲,而不是收到即同步发送。可用 queue.Queue + 独立线程控制每条消息间隔 2-5 秒。

定时推送时间:每晚提醒建议安排在 21:00-22:00,不要在深夜发送以免被成员标记骚扰。群发消息每次只发一条,不要在短时间内连续向同一群发多条。

获取群成员列表:该接口调用不宜过频,建议每天只在定时提醒前调用一次,将结果缓存复用,不要每次收到消息都实时获取。

错误重试:网络抖动时接口可能返回非 200,建议加简单的指数退避重试(最多 3 次),而不是无限循环重试。

账号在线时长:新登录的账号建议在线稳定 3 天后再开启群消息自动回复功能,降低风控触发概率。

回调幂等:平台在推送失败时可能重发同一条消息,数据库的唯一索引(uq_daily)已保证打卡记录幂等,但发送回复时需注意不要因重复处理而发送多次"打卡成功"提示——可在内存中缓存最近处理过的 msgId(示例中未展示,实际按需加)。

服务进程守护:生产环境建议使用 supervisorsystemd 对 uvicorn 进程做守护,服务异常退出后自动重启。同时开启日志持久化,将 stdout 重定向到文件,方便事后排查打卡记录丢失或提醒未发送等问题。


九、功能扩展思路

完成基础版本后,还可以按需扩展以下能力:

扩展功能实现思路
连续打卡天数统计查询该用户近 N 天是否每天均有记录,逐天回溯计数
补卡申请流程成员私聊发"补卡 2025-06-01",机器人 @ 管理员审批后写入
图片打卡回调中判断消息类型为图片,下载图片后存储路径;可结合 OCR 验证图片内容
多维度打卡(早/午/晚)在唯一索引中加入"打卡时间段"维度,每天允许多次不同时段打卡
打卡积分兑换打卡天数累计积分,单独建积分表,提供兑换指令
Web 管理后台用 Flask/FastAPI 额外提供一个 HTML 页面,展示打卡日历和导出 Excel

总结

微信群签到打卡机器人的核心在于三件事:稳定接收消息回调、准确识别打卡意图、可靠地写入与查询记录。本文给出了从项目结构、数据库设计、消息解析、接口封装到定时任务的完整实现路径,稍作配置即可适配不同群场景。在此基础上叠加连续签到、补卡审批、图片验证等扩展,便能满足绝大多数打卡管理需求。

实际部署时,建议先在测试群验证关键词匹配和回调幂等逻辑,再切换到正式群;同时做好数据库定期备份,避免因服务器故障导致历史打卡数据丢失。本文代码均为示例框架,接口地址、鉴权字段等具体细节以官方文档为准。

想动手试试?

WechatApi 提供扫码登录、消息收发、好友与群管理等 REST 接口,注册后几分钟跑通。

立即免费注册查看开发文档

相关产品页

🔗 微信机器人开发(产品页)🔗 微信群管理机器人(产品页)🔗 微信Hook(产品页)

相关文章

30 分钟做一个微信自动回复机器人(完整实战)微信机器人接入 GPT,实现智能自动回复微信群管理机器人开发实战:自动迎新、答疑、踢人微信客服机器人怎么做?7×24自动应答+转人工方案
© 2025 WechatApi · 企业级微信智能机器人接入平台
官网价格帮助文档博客
苏ICP备2024128799号 · 苏ICP备2023038368号