关于OneBot的QQ机器人探索(第二期)
这是我去年编写的一个QQ聊天机器人,按照OneBot v11协议进行开发
当时刚入门,编写的代码耦合性较高,因此该代码仅能作为探索记录
代码太乱了,无法过多解释,有需要者自己看看吧(悲)
目前实现的主要功能有:
- echo 消息回显
- 搜索歌曲并发送音乐卡片
- AI聊天,概率性主动回复,支持在QQ中使用指令切换AI模型和提示词
- bing每日一图获取并发送
- 群聊管理之敏感词检测并禁言
入口函数:
import asyncio
from bot import Bot, Command
from msgProcFunctions import *async def main():cmd = Command("")# bot = Bot("ws://localhost:3001", cmd)# bot = Bot("ws://www.lyxyz5223.com:3001", cmd)bot = Bot("ws://localhost:5223", cmd)# bot = Bot("ws://localhost:3001", cmd)cmd.cmds.append(msgProc)await bot.connect()await bot.run()print("aa")if __name__=='__main__':asyncio.run(main())
聊天机器人聊天消息分发
- 由于代码开源方面问题,源OneBotv11接口的QQ消息数据处理相关类 暂不公开
wyyyyAPI = "https://docs-neteasecloudmusicapi.vercel.app/search?keywords="
wyyyyAPI = "https://music.163.com/api/search/get/web?csrf_token=hlpretag=&hlposttag=&type=1&offset=0&total=true&limit=10&s="
wyyyySearchList = []
wyyyySearch_Music_ID_List = []import base64
import os
import time
from bot import Bot,Message_Type
from event import Event
from message import MessageList
import requests as req
import asyncio
import threading
import AIAPI#,AIollama
import random
import threading# 全局读取敏感词列表
def is_base64(s):try:# 空字符串不是合法base64if not s or len(s) % 4 != 0:return Falsebase64.b64decode(s, validate=True)return Trueexcept Exception:return False# 1. 读取敏感词时记录来源文件
def load_badwords():badwords = []folder_path = os.path.join(os.path.dirname(__file__), "BadWords")if not os.path.exists(folder_path):print("敏感词文件夹不存在:", folder_path)return badwordsfor filename in os.listdir(folder_path):if filename.endswith(".txt"):file_path = os.path.join(folder_path, filename)try:with open(file_path, "r", encoding="utf-8") as f:for line in f:word = line.strip()if word and is_base64(word):try:decoded = base64.b64decode(word).decode("utf-8")badwords.append((decoded, filename))except Exception as e:print(f"敏感词解码失败(文件{filename}):", word, e)badwords.append((word, filename))elif word:badwords.append((word, filename))except Exception as e:print(f"敏感词文件读取失败:{file_path}", e)return badwordsBADWORDS = load_badwords()
print("敏感词列表加载完成,共", len(BADWORDS), "个敏感词")
print("敏感词列表:", BADWORDS[:10]) # 打印前10个敏感词# 2. 检查敏感词时返回命中词和文件名,并打印
def check_badwords(message: str):"""检查消息是否包含敏感词:param message: 消息内容:return: 命中的敏感词及其文件名列表"""hit_words = []for word, filename in BADWORDS:if word and word in message:print(f"命中敏感词:{word},文件:{filename}")hit_words.append((word, filename))return hit_wordsasync def echoProc(bot: Bot, m:Event, id:str, msg_type: Message_Type):msg = m.check().get_msg()f = 0send_msg = []for i in msg:if f == 0 and i["type"] == "at" and str(i["data"]["qq"]) == str(m.check().raw["self_id"]):f = 1continueelse:send_msg.append(i)if send_msg[0]["type"] == "text":send_msg[0]["data"]["text"] = str(send_msg[0]["data"]["text"]).strip()[6:]await bot.send_msg(id,msg_type,send_msg)async def bingProc(bot: Bot, zhiling:str, id:str,msg_type: Message_Type):zhiling = req.request("get","https://cn.bing.com/HPImageArchive.aspx",params={"format": "js", "idx": 0, "n": 1})print(zhiling.text)zhiling = zhiling.json()zhiling = "https://cn.bing.com" + zhiling["images"][0]["url"]#"https://cn.bing.com/HPImageArchive.aspx?format=js&idx=0&n=1"await bot.send_msg(id,msg_type,[MessageList.image(str(zhiling))])async def musicProc(bot: Bot, zhiling:str, id:str, msg_type: Message_Type):if(zhiling == "/music"):await bot.send_msg(id,msg_type,[MessageList.text("点歌格式(4种):\n首选:/music search 关键字,然后再/music select 列表数字序号 \n/music type id\n/music 1 title url\n/music 2 title openUrl audioUrl")])# await bot.send_msg(id,[MessageList.text("点歌格式:\n/music type id\n/music 1 title url\n/music 2 title openUrl audioUrl")])# await bot.send_msg(id,[MessageList.text("type:\n1:普通点歌\n2:普通点歌\n163:网易云点歌\nxm:虾米点歌\nqq:QQ音乐点歌")])# await bot.send_msg(id,[MessageList.text("id:\n1:普通点歌:歌曲名\n2:普通点歌:歌曲名\n163:网易云id\nxm:虾米id\nqq:QQ音乐id")])# await bot.send_msg(id,[MessageList.text("url:\n普通点歌:歌曲url")])# await bot.send_msg(id,[MessageList.text("openUrl:\n普通点歌:点击打开的url")])# await bot.send_msg(id,[MessageList.text("audioUrl:\n普通点歌:歌曲url")])# await bot.send_msg(id,[MessageList.text("示例:\n/music 1 歌曲名 歌曲url\n/music 2 歌曲名 点击打开的url 歌曲url\n/music 163 网易云id\n/music xm 虾米id\n/music qq QQ音乐id")])# await bot.send_msg(id,[MessageList.text("注意:\n/music 1 2 163 xm qq 后面必须跟空格")])await bot.send_msg(id,msg_type,[{"type": "music","data": {"type": "custom","url": "https://lw-sycdn.kuwo.cn/87898d2bfa406816e7a24dee4434e67c/66b3a70e/resource/30106/trackmedia/M5000004J5Dg2Aifkc.mp3?bitrate$128&from=vip","audio": "https://lw-sycdn.kuwo.cn/87898d2bfa406816e7a24dee4434e67c/66b3a70e/resource/30106/trackmedia/M5000004J5Dg2Aifkc.mp3?bitrate$128&from=vip","title": "Aurora - Hoaprox/Dang Minh"}}])passmusicNameAndUrl = zhiling.replace("/music", "").split()print("musicNameAndUrl",musicNameAndUrl)if musicNameAndUrl[0] == "163" or musicNameAndUrl[0] == "xm" or musicNameAndUrl[0] == "qq":await bot.send_msg(id,msg_type,[MessageList.Music(musicNameAndUrl[0], musicNameAndUrl[1])])# id点歌 /music type idmusicTitle = " ".join(musicNameAndUrl[1:len(musicNameAndUrl)-1])musicNameAndUrl[0] = musicNameAndUrl[0].lower()#指令小写字母化match musicNameAndUrl[0]:case "1":await bot.send_msg(id,msg_type,[MessageList.CustomMusic(url=musicNameAndUrl.pop(), audio=musicNameAndUrl.pop(),title=musicTitle)])# 普通点歌 /music 1 title Urlcase "2":await bot.send_msg(id,msg_type,[MessageList.CustomMusic(url=musicNameAndUrl[len(musicNameAndUrl)-2], audio=musicNameAndUrl.pop(),title=musicTitle)])# 普通点歌 /music 2 title OpenUrl AudioUrlcase "search":cont = req.request("get",wyyyyAPI + " ".join(musicNameAndUrl[1:])).json()#cont = json.dumps(cont)#print(cont)global wyyyySearchList,wyyyySearch_Music_ID_ListwyyyySearchList = []wyyyySearch_Music_ID_List = []countNum = 1for i in cont["result"]["songs"]:artists = ""for ii in i["artists"]:artists += ii["name"]artists += "/"artists = artists[:-1]wyyyySearchList.append(str(countNum) +"." + i["name"]+" - "+ artists)wyyyySearch_Music_ID_List.append(i["id"])countNum += 1print(i["name"])await bot.send_msg(id,msg_type,[MessageList.text("\n".join(wyyyySearchList))])passcase "select":Ordinal = int(musicNameAndUrl[1])-1await bot.send_msg(id,msg_type,[MessageList.text("已为您点歌:"+wyyyySearchList[Ordinal])])await bot.send_msg(id,msg_type,[MessageList.Music("163",wyyyySearch_Music_ID_List[Ordinal])])passasync def AIProc(bot: Bot, zhiling:str, id:str, msg_type: Message_Type, m: Event, api:str,model:str):chatText = zhiling.replace("/ai", "")sender_id = m.check().raw["sender"]["user_id"]# 发送者QQ号msg_id = m.check().raw["message_id"]# 消息ID# msg_list : list = []# if msg_type == Message_Type.group:# msg_list.append(MessageList.At(sender_id))# msg_list.append(MessageList.text(" "))# msg_list.append(MessageList.text("喵~"))# await bot.send_msg(id,msg_type,msg_list)def ai(text:str):text_AI = asyncio.run(AIAPI.chat(content=text,id=id,msg_type=msg_type,memory=True,api=api,model=model))msg_list = []msg_list.append(MessageList.reply(msg_id))if msg_type == Message_Type.group:msg_list.append(MessageList.At(sender_id))msg_list.append(MessageList.text("\n"))msg_list.append(MessageList.text(text_AI))asyncio.run(bot.send_msg(id,msg_type,msg_list))th = threading.Thread(target=ai,daemon=True,args=(chatText,))th.start()print("Thread Exit")# await bot.send_msg(id,msg_type,[MessageList.text(await AIollama.chat(chatText))])# class HashPairKey:#用于区分不同群聊的结构体
# def __init__(self):
# self.id = ""
# self.msg_type = Message_Type.group
# def __init__(self,id:str,msg_type:Message_Type):
# self.id = id
# self.msg_type = msg_typeasync def sendMsgProc(bot: Bot, zhiling:str, id:str,msg_type: Message_Type):text = zhiling.replace("/send_msg", "")text = text.strip()# /send_msg type id textz = text.split(" ")contents = " ".join(z[2:])if len(z) < 3:returnif z[0] == "group":await bot.send_msg(z[1],Message_Type.group,[MessageList.text(contents)])elif z[0] == "private":await bot.send_msg(z[1],Message_Type.private,[MessageList.text(contents)])async def sendPicProc(bot: Bot, zhiling:str, id:str,msg_type: Message_Type):text = zhiling.replace("/send_pic", "")text = text.strip()# /send_pic type id picUrl [picUrl] ...z = text.split(" ")if len(z) < 3:return# contents = " ".join(z[2:])imgUrls = z[2:]contents = []for i in imgUrls:contents.append(MessageList.image(i))if z[0] == "group":await bot.send_msg(z[1],Message_Type.group,contents)elif z[0] == "private":await bot.send_msg(z[1],Message_Type.private,contents)calcMsg : dict = {str : int
}
maxMsgThenReply : dict = {str : int
}
aiModel : dict = {str : [str,str]#id : [ai api,ai model]
}
badword_count = {} # 结构: {(group_id, user_id): [count, first_time]}
BADWORD_LIMIT = 3 # 1分钟内最多允许3次
BADWORD_PERIOD = 60 # 秒, 表示1分钟一次刷新计数defaultModel = ["deepseek",""]
async def msgProc(bot: Bot, m: Event):print(m.check().raw)zhiling = m.check().get_msg()print("消息内容:",zhiling)zhiling = m.check().get_raw_msg()at = m.check().get_the_first_at_object_qq_number_str()id : str = ""msg_type : Message_Typeself_id :str = str(m.check().raw["self_id"])sender_id = m.check().raw["sender"]["user_id"]# 发送者QQ号if m.check().raw["message_type"] == "private":id = m.check().raw["user_id"]msg_type = Message_Type.privateelif m.check().raw["message_type"] == "group":id = m.check().raw["group_id"]msg_type = Message_Type.groupid = str(id)memHashKey = id + ',' + msg_type.valueglobal calcMsg,maxMsgThenReply,aiModelif memHashKey not in calcMsg.keys():calcMsg[memHashKey] = 0if memHashKey not in maxMsgThenReply.keys():maxMsgThenReply[memHashKey] = 0calcMsg[memHashKey] += 1 # 消息计数+1,当达到一定值将重置为0,并回复消息if maxMsgThenReply[memHashKey] == 0:maxMsgThenReply[memHashKey] = random.randint(30, 50)# [30,50]区间,是闭区间# maxMsgThenReply[memHashKey] = random.randint(10, 20)if memHashKey not in aiModel.keys():aiModel[memHashKey] = defaultModelprint("calcMsg[memHashKey]:", calcMsg[memHashKey])print("maxMsgThenReply[memHashKey]:", maxMsgThenReply[memHashKey])print("aiModel[memHashKey]:", aiModel[memHashKey])match zhiling:case _ if msg_type == Message_Type.group: # 所有群聊消息检查msgText = zhiling.strip()# 智能禁言检测ban_duration = 60 # 禁言时长,单位秒key = (id, sender_id)now = time.time()if check_badwords(msgText):# 初始化或重置窗口if key not in badword_count or now - badword_count[key][1] > BADWORD_PERIOD:badword_count[key] = [1, now, [msgText]]else:badword_count[key][0] += 1badword_count[key][2].append(msgText)# 判断是否超限if badword_count[key][0] >= BADWORD_LIMIT:print(f"已禁言成员:{sender_id},1分钟内所有违规消息:{badword_count[key][2]}")await bot.send_msg(id, msg_type,[MessageList.At(sender_id), MessageList.text(f"1分钟内多次违规,已禁言!禁言成员:{sender_id}" # \n违规内容:\n" + "\n".join(badword_count[key][2]))])await bot.set_group_ban(group_id=id, user_id=sender_id, duration=ban_duration)badword_count[key] = [0, now, []] # 重置计数和消息列表# else:# await bot.send_msg(# id, msg_type,# [MessageList.text(f"检测到违规词,请注意言辞!({badword_count[key][0]}/{BADWORD_LIMIT})")]# )match zhiling:case _ if msg_type == Message_Type.group and at != self_id:zhiling = m.check().get_msg()msgText = ""for msgJson in zhiling:if msgJson["type"] != "at":find_it = Trueif msgJson["type"] == "text":msgText += msgJson["data"]["text"]#await bot.send_group_msg(id,[MessageList.text(msgText)])elif msgJson["type"] == "at":msgText += "@" + msgJson["data"]["qq"]elif msgJson["type"] == "image":passmsgText = msgText.strip()msgList = msgText.split()if msgList[0].lower() == "/changeai":if len(msgList) == 3:model = [msgList[1],msgList[2]]elif len(msgList) == 2:model = [msgList[1],""]else:model = defaultModelaiModel[memHashKey] = modelawait bot.send_msg(id,msg_type,[MessageList.text("已切换AI模型")])if msgText != "" and calcMsg[memHashKey] > maxMsgThenReply[memHashKey]:print("calcMsg[memHashKey] > maxMsgThenReply[memHashKey]", calcMsg[memHashKey],">",maxMsgThenReply[memHashKey])calcMsg[memHashKey] = 0maxMsgThenReply[memHashKey] = 0def sendAIResponse():AI_Response = []AI_Response.append(MessageList.text(asyncio.run(AIAPI.chat(content=msgText,id=id,msg_type=msg_type,memory=False,api=aiModel[memHashKey][0],model=aiModel[memHashKey][1]))))asyncio.run(bot.send_msg(id, msg_type, AI_Response))print("AI Intelligence Response Thread Start")threading.Thread(target=sendAIResponse,daemon=True).start()case _ if msg_type == Message_Type.private or (msg_type == Message_Type.group and at == self_id):# case _:zhiling = m.check().get_msg()find_it = FalsemsgText = ""for msgJson in zhiling:if msgJson["type"] != "at":find_it = Trueif msgJson["type"] == "text":msgText += msgJson["data"]["text"]#await bot.send_group_msg(id,[MessageList.text(msgText)])elif msgJson["type"] == "at":if msgJson["data"]["qq"] != self_id:msgText += "@" + msgJson["data"]["qq"]elif msgJson["type"] == "image":pass# msgText += msgJson["data"]["file"]if find_it == False:await bot.send_msg(id,msg_type,[MessageList.text("你干嘛!?")])else:zhiling = msgText.strip()z = zhiling.lower()match zhiling:case _ if "/echo" in zhiling and zhiling[:5] == "/echo":await echoProc(bot, m, id, msg_type)case "/bing":await bingProc(bot, zhiling, id, msg_type)case "/dev":await bot.send_msg(id,msg_type,[MessageList.text("调试信息:\n" + str(m.check().raw))])case "/random":await bot.send_msg(id,msg_type,[MessageList.image("https://moe.jitsu.top/img/")])case "/help":await bot.send_msg(id,msg_type,[MessageList.text("/bing——必应每日一图\n/dev——发送调试信息\n/random——随机二次元\n/help——帮助\n/echo——重复话语\n/music——点歌(签证以停止服务,将无法发送音乐卡片)\n/ai——与AI对话\n/changeAI——切换AI模型\n/ShowCurrentAI——显示当前AI模型\n/ChangePrompt——修改提示词\n")])case _ if "/music" in zhiling and zhiling[:6] == "/music":await musicProc(bot, zhiling, id, msg_type)case _ if "/changeai" in zhiling.lower() and zhiling[:9].lower() == "/changeai":msgList = zhiling.split()if msgList[0].lower() == "/changeai":if len(msgList) == 3:model = [msgList[1],msgList[2]]elif len(msgList) == 2:model = [msgList[1],""]else:model = defaultModelaiModel[memHashKey] = modelawait bot.send_msg(id,msg_type,[MessageList.text("已切换AI模型")])case _ if "/showcurrentai" in zhiling.lower() and zhiling[:14].lower() == "/showcurrentai":await bot.send_msg(id,msg_type,[MessageList.text("当前AI模型:\n" + aiModel[memHashKey][0] + "->" + (aiModel[memHashKey][1] if aiModel[memHashKey][1] != "" else "默认"))])case _ if "/changeprompt" in zhiling.lower() and zhiling[:13].lower() == "/changeprompt":await AIAPI.changePrompt(prompt=zhiling[14:],id=id,msg_type=msg_type)await bot.send_msg(id,msg_type,[MessageList.text("提示词已修改,已清空当前聊天记录")])case _ if "/send_msg" in zhiling.lower() and zhiling[:9].lower() == "/send_msg":await sendMsgProc(bot, zhiling, id, msg_type)case _ if "/send_pic" in zhiling.lower() and zhiling[:9].lower() == "/send_pic":await sendPicProc(bot, zhiling, id, msg_type)# case _:case _ if "/ai" in zhiling and zhiling[:3] == "/ai":await AIProc(bot, zhiling, id, msg_type, m,aiModel[memHashKey][0],aiModel[memHashKey][1])
下面是接入本地ollama服务的代码
import ollama
import json
from ollama import AsyncClient
#client = ollama.Client(host="localhost:11434")
client = AsyncClient(host="http://localhost:11434")ChatCount=0
messages = []async def chat(content:str) -> str:global ChatCount,messagesmessage = {'role': 'user','content': content}messages.append(message)msg_to_send = json.dumps(messages)response = await client.chat(model='qwen2:7b',messages=messages,stream=False)message = {"role": "assistant","content": response["message"]["content"]}messages.append(message)ChatCount += 1if(ChatCount > 10):messages = []return response["message"]["content"]
然后是发送AI回复的处理函数
# Please install OpenAI SDK first: `pip3 install openai`import os# 获取模块文件路径
from openai import OpenAI,AsyncOpenAI
import asyncio
import json
from enum import Enum
from bot import Message_Type
messages : dict = {str : list}# 定义一个异步函数chat,用于与AI进行对话
# DeepSeek的model有"deepseek-chat"和"deepseek-reasoner"
# kimi的model有moonshot-v1-8k moonshot-v1-32k moonshot-v1-128k moonshot-v1-8k-vision-preview moonshot-v1-32k-vision-preview moonshot-v1-128k-vision-preview,后者昂贵
async def chat(content:str, id:str="all", msg_type:Message_Type=Message_Type.group, memory:bool=True, api:str="deepseek", model:str="deepseek-chat", prompt:str="") -> str:model = model.strip()hashKey = id + ',' + msg_type.valueapis = {}with open(os.path.join(os.path.dirname(__file__),"ai_apis.json"), "r", encoding="utf-8") as api_file:apis = json.load(api_file)if api not in apis.keys() and model == "":raise Exception(f"API {api} not found")# if model not in apis[api]["models"]:# raise Exception(f"Model {model} not found in API {api}")if model == "":model = apis[api]["models"][0]# 创建一个AsyncOpenAI客户端client = AsyncOpenAI(api_key=apis[api]["key"], base_url=apis[api]["url"])# 定义一个全局变量messages,用于存储对话历史global messages# 定义一个字典message,用于存储用户发送的消息message = {'role': 'user','content': content}# 定义一个空字符串response,用于存储AI的回复response = ''try:# 打印正在获取AI回复print("getting AI response")# 如果group_id不在messages的键中,则将group_id添加到messages的键中,并将对应的值设为一个空列表if hashKey not in messages.keys():if prompt == "":with open(os.path.join(os.path.dirname(__file__),"AIPrompt.txt"), "r", encoding="utf-8") as prompt_file:prompt = prompt_file.read()messages[hashKey] = [# 此处可用于存放提示词{"role": "system",# "content": "请你扮演一个刚从美国留学回国的人,说话时候会故意中文夹杂部分英文单词,显得非常fancy,对话中总是带有很强的优越感。""content": prompt},]else:messages[hashKey] = []# 将message添加到messages[hashKey]中send_messages = messages[hashKey] + [message]# 调用client.chat.completions.create方法,获取AI的回复response = await client.chat.completions.with_raw_response.create(model=model,messages=send_messages,stream=False,n=1# 下文choices中元素的个数)# 打印获取到的AI回复print("Succeeded gotten AI response")if response.status_code != 200 or response.text == '':raise Exception(f"DeepSeek API response status code: {response.status_code}, response: {response.text if response.text != '' else '(null)'}")responseJson = json.loads(response.text)print("AI Response:"+responseJson["choices"][0]["message"]["content"])# 如果memory为True,则将message和AI的回复添加到messages[hashKey]中if memory:messages[hashKey].append(message)messages[hashKey].append(responseJson["choices"][0]["message"])# 关闭clientawait client.close()# 返回AI的回复return responseJson["choices"][0]["message"]["content"]except Exception as e:# 如果发生异常,则打印异常信息import traceback,sys# tb = traceback.extract_tb(sys.exc_info()[2])print(sys.exc_info())# 关闭clientawait client.close()# 返回异常信息和API的返回信息# return str(sys.exc_info()) + '\nException contents: ' + str(e) + "\nHttp Response: " + str(response)return ''async def changePrompt(prompt:str, id:str="all", msg_type:Message_Type=Message_Type.group):global messageshashKey = id + ',' + msg_type.valuemessages[hashKey] = [{"role": "system","content": prompt},]
if __name__ == "__main__":# path = os.path.dirname(__file__) + "\\"# with open(path + "AIPrompt2.txt", "r", encoding="utf-8") as prompt_file:# content = prompt_file.read()# print(content)async def main():print(await chat("写一个正则表达式,要求匹配一个字符串中包含另一个字符串(b),且b前面没有b的第一个字符,若有,则b前面的该字符前必须有该字符"))asyncio.run(main())
其中,AIAPI.py使用到的ai_apis.json格式如下
{"localhost":{"name": "localhost","url": "http://localhost:11434","key": "","models": ["wangshenzhi/gemma2-9b-chinese-chat:latest","qwen3:8b","qwen2:7b"]},"kimi":{"name": "kimi","url": "https://api.moonshot.cn/v1","key": "","models": ["moonshot-v1-8k","moonshot-v1-32k","moonshot-v1-128k","moonshot-v1-8k-vision-preview","moonshot-v1-32k-vision-preview","moonshot-v1-128k-vision-preview"]},"deepseek":{"name": "deepseek","url": "https://api.deepseek.com","key": "","models": ["deepseek-chat","deepseek-reasoner"]}
}