QChatGPT/pkg/platform/sources/qqbotpy.py

395 lines
14 KiB
Python
Raw Normal View History

2024-02-07 23:21:32 +08:00
from __future__ import annotations
import logging
import typing
import datetime
import asyncio
import re
import traceback
import json
import threading
import mirai
import botpy
import botpy.message as botpy_message
import botpy.types.message as botpy_message_type
from .. import adapter as adapter_model
from ...pipeline.longtext.strategies import forward
from ...core import app
class OfficialGroupMessage(mirai.GroupMessage):
pass
event_handler_mapping = {
mirai.GroupMessage: ["on_at_message_create", "on_group_at_message_create"],
mirai.FriendMessage: ["on_direct_message_create"],
}
cached_message_ids = {}
"""由于QQ官方的消息id是字符串而YiriMirai的消息id是整数所以需要一个索引来进行转换"""
id_index = 0
def save_msg_id(message_id: str) -> int:
"""保存消息id"""
global id_index, cached_message_ids
crt_index = id_index
id_index += 1
cached_message_ids[str(crt_index)] = message_id
return crt_index
cached_member_openids = {}
"""QQ官方 用户的id是字符串而YiriMirai的用户id是整数所以需要一个索引来进行转换"""
member_openid_index = 100
def save_member_openid(member_openid: str) -> int:
"""保存用户id"""
global member_openid_index, cached_member_openids
if member_openid in cached_member_openids.values():
return list(cached_member_openids.keys())[list(cached_member_openids.values()).index(member_openid)]
crt_index = member_openid_index
member_openid_index += 1
cached_member_openids[str(crt_index)] = member_openid
return crt_index
cached_group_openids = {}
"""QQ官方 群组的id是字符串而YiriMirai的群组id是整数所以需要一个索引来进行转换"""
group_openid_index = 1000
def save_group_openid(group_openid: str) -> int:
"""保存群组id"""
global group_openid_index, cached_group_openids
if group_openid in cached_group_openids.values():
return list(cached_group_openids.keys())[list(cached_group_openids.values()).index(group_openid)]
crt_index = group_openid_index
group_openid_index += 1
cached_group_openids[str(crt_index)] = group_openid
return crt_index
class OfficialMessageConverter(adapter_model.MessageConverter):
"""QQ 官方消息转换器"""
@staticmethod
def yiri2target(message_chain: mirai.MessageChain):
"""将 YiriMirai 的消息链转换为 QQ 官方消息"""
msg_list = []
if type(message_chain) is mirai.MessageChain:
msg_list = message_chain.__root__
elif type(message_chain) is list:
msg_list = message_chain
else:
raise Exception("Unknown message type: " + str(message_chain) + str(type(message_chain)))
offcial_messages: list[dict] = []
"""
{
"type": "text",
"content": "Hello World!"
}
{
"type": "image",
"content": "https://example.com/example.jpg"
}
"""
# 遍历并转换
for component in msg_list:
if type(component) is mirai.Plain:
offcial_messages.append({
"type": "text",
"content": component.text
})
elif type(component) is mirai.Image:
if component.url is not None:
offcial_messages.append(
{
"type": "image",
"content": component.url
}
)
elif component.path is not None:
offcial_messages.append(
{
"type": "file_image",
"content": component.path
}
)
elif type(component) is mirai.At:
offcial_messages.append(
{
"type": "at",
"content": ""
}
)
elif type(component) is mirai.AtAll:
print("上层组件要求发送 AtAll 消息,但 QQ 官方 API 不支持此消息类型,忽略此消息。")
elif type(component) is mirai.Voice:
print("上层组件要求发送 Voice 消息,但 QQ 官方 API 不支持此消息类型,忽略此消息。")
elif type(component) is forward.Forward:
# 转发消息
yiri_forward_node_list = component.node_list
# 遍历并转换
for yiri_forward_node in yiri_forward_node_list:
try:
message_chain = yiri_forward_node.message_chain
# 平铺
offcial_messages.extend(OfficialMessageConverter.yiri2target(message_chain))
except Exception as e:
import traceback
traceback.print_exc()
return offcial_messages
@staticmethod
def extract_message_chain_from_obj(message: typing.Union[botpy_message.Message, botpy_message.DirectMessage], message_id: str = None, bot_account_id: int = 0) -> mirai.MessageChain:
yiri_msg_list = []
# 存id
yiri_msg_list.append(mirai.models.message.Source(id=save_msg_id(message_id), time=datetime.datetime.now()))
if type(message) is not botpy_message.DirectMessage:
yiri_msg_list.append(mirai.At(target=bot_account_id))
if hasattr(message, "mentions"):
for mention in message.mentions:
if mention.bot:
continue
yiri_msg_list.append(mirai.At(target=mention.id))
for attachment in message.attachments:
if attachment.content_type == "image":
yiri_msg_list.append(mirai.Image(url=attachment.url))
else:
logging.warning("不支持的附件类型:" + attachment.content_type + ",忽略此附件。")
content = re.sub(r"<@!\d+>", "", str(message.content))
if content.strip() != "":
yiri_msg_list.append(mirai.Plain(text=content))
chain = mirai.MessageChain(yiri_msg_list)
return chain
class OfficialEventConverter(adapter_model.EventConverter):
"""事件转换器"""
@staticmethod
def yiri2target(event: typing.Type[mirai.Event]):
if event == mirai.GroupMessage:
return botpy_message.Message
elif event == mirai.FriendMessage:
return botpy_message.DirectMessage
else:
raise Exception("未支持转换的事件类型(YiriMirai -> Official): " + str(event))
@staticmethod
def target2yiri(event: typing.Union[botpy_message.Message, botpy_message.DirectMessage]) -> mirai.Event:
import mirai.models.entities as mirai_entities
if type(event) == botpy_message.Message: # 频道内,转群聊事件
permission = "MEMBER"
if '2' in event.member.roles:
permission = "ADMINISTRATOR"
elif '4' in event.member.roles:
permission = "OWNER"
return mirai.GroupMessage(
sender=mirai_entities.GroupMember(
id=event.author.id,
member_name=event.author.username,
permission=permission,
group=mirai_entities.Group(
id=event.channel_id,
name=event.author.username,
permission=mirai_entities.Permission.Member
),
special_title='',
join_timestamp=int(datetime.datetime.strptime(event.member.joined_at, "%Y-%m-%dT%H:%M:%S%z").timestamp()),
last_speak_timestamp=datetime.datetime.now().timestamp(),
mute_time_remaining=0,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(event, event.id),
time=int(datetime.datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S%z").timestamp()),
)
elif type(event) == botpy_message.DirectMessage: # 私聊,转私聊事件
return mirai.FriendMessage(
sender=mirai_entities.Friend(
id=event.guild_id,
nickname=event.author.username,
remark=event.author.username,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(event, event.id),
time=int(datetime.datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S%z").timestamp()),
)
elif type(event) == botpy_message.GroupMessage:
replacing_member_id = save_member_openid(event.author.member_openid)
return OfficialGroupMessage(
sender=mirai_entities.GroupMember(
id=replacing_member_id,
member_name=replacing_member_id,
permission="MEMBER",
group=mirai_entities.Group(
id=save_group_openid(event.group_openid),
name=replacing_member_id,
permission=mirai_entities.Permission.Member
),
special_title='',
join_timestamp=int(0),
last_speak_timestamp=datetime.datetime.now().timestamp(),
mute_time_remaining=0,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(event, event.id),
time=int(datetime.datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S%z").timestamp()),
)
2024-02-08 13:12:33 +08:00
@adapter_model.adapter_class("qq-botpy")
2024-02-07 23:21:32 +08:00
class OfficialAdapter(adapter_model.MessageSourceAdapter):
"""QQ 官方消息适配器"""
bot: botpy.Client = None
bot_account_id: int = 0
message_converter: OfficialMessageConverter = OfficialMessageConverter()
# event_handler: adapter_model.EventHandler = adapter_model.EventHandler()
cfg: dict = None
cached_official_messages: dict = {}
"""缓存的 qq-botpy 框架消息对象
message_id: botpy_message.Message | botpy_message.DirectMessage
"""
ap: app.Application
def __init__(self, cfg: dict, ap: app.Application):
"""初始化适配器"""
self.cfg = cfg
self.ap = ap
switchs = {}
for intent in cfg['intents']:
switchs[intent] = True
del cfg['intents']
intents = botpy.Intents(**switchs)
self.bot = botpy.Client(intents=intents)
async def send_message(
self,
target_type: str,
target_id: str,
message: mirai.MessageChain
):
pass
async def reply_message(
self,
message_source: mirai.MessageEvent,
message: mirai.MessageChain,
quote_origin: bool = False
):
message_list = self.message_converter.yiri2target(message)
tasks = []
msg_seq = 1
for msg in message_list:
args = {}
if msg['type'] == 'text':
args['content'] = msg['content']
elif msg['type'] == 'image':
args['image'] = msg['content']
elif msg['type'] == 'file_image':
args['file_image'] = msg["content"]
else:
continue
if quote_origin:
args['message_reference'] = botpy_message_type.Reference(message_id=cached_message_ids[str(message_source.message_chain.message_id)])
if type(message_source) == mirai.GroupMessage:
args['channel_id'] = str(message_source.sender.group.id)
args['msg_id'] = cached_message_ids[str(message_source.message_chain.message_id)]
await self.bot.api.post_message(**args)
elif type(message_source) == mirai.FriendMessage:
args['guild_id'] = str(message_source.sender.id)
args['msg_id'] = cached_message_ids[str(message_source.message_chain.message_id)]
await self.bot.api.post_dms(**args)
elif type(message_source) == OfficialGroupMessage:
# args['guild_id'] = str(message_source.sender.group.id)
# args['msg_id'] = cached_message_ids[str(message_source.message_chain.message_id)]
# await self.bot.api.post_message(**args)
if 'image' in args or 'file_image' in args:
continue
args['group_openid'] = cached_group_openids[str(message_source.sender.group.id)]
args['msg_id'] = cached_message_ids[str(message_source.message_chain.message_id)]
args['msg_seq'] = msg_seq
msg_seq += 1
await self.bot.api.post_group_message(
**args
)
async def is_muted(self, group_id: int) -> bool:
return False
def register_listener(
self,
event_type: typing.Type[mirai.Event],
callback: typing.Callable[[mirai.Event], None]
):
try:
async def wrapper(message: typing.Union[botpy_message.Message, botpy_message.DirectMessage, botpy_message.GroupMessage]):
self.cached_official_messages[str(message.id)] = message
await callback(OfficialEventConverter.target2yiri(message))
for event_handler in event_handler_mapping[event_type]:
setattr(self.bot, event_handler, wrapper)
except Exception as e:
traceback.print_exc()
raise e
def unregister_listener(
self,
event_type: typing.Type[mirai.Event],
callback: typing.Callable[[mirai.Event], None]
):
delattr(self.bot, event_handler_mapping[event_type])
async def run_async(self):
self.ap.logger.info("运行 QQ 官方适配器")
await self.bot.start(
**self.cfg
)
def kill(self) -> bool:
return False