mirror of
https://github.com/RockChinQ/QChatGPT.git
synced 2024-11-16 19:57:04 +08:00
commit
f6ec0fda7a
12
main.py
12
main.py
|
@ -33,7 +33,7 @@ log_colors_config = {
|
||||||
'INFO': 'white',
|
'INFO': 'white',
|
||||||
'WARNING': 'yellow',
|
'WARNING': 'yellow',
|
||||||
'ERROR': 'red',
|
'ERROR': 'red',
|
||||||
'CRITICAL': 'bold_red',
|
'CRITICAL': 'cyan',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -307,13 +307,21 @@ def main(first_time_init=False):
|
||||||
import pkg.utils.updater
|
import pkg.utils.updater
|
||||||
try:
|
try:
|
||||||
if pkg.utils.updater.is_new_version_available():
|
if pkg.utils.updater.is_new_version_available():
|
||||||
pkg.utils.context.get_qqbot_manager().notify_admin("新版本可用,请发送 !update 进行自动更新\n更新日志:\n{}".format("\n".join(pkg.utils.updater.get_rls_notes())))
|
logging.info("新版本可用,请发送 !update 进行自动更新\n更新日志:\n{}".format("\n".join(pkg.utils.updater.get_rls_notes())))
|
||||||
else:
|
else:
|
||||||
logging.info("当前已是最新版本")
|
logging.info("当前已是最新版本")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.warning("检查更新失败:{}".format(e))
|
logging.warning("检查更新失败:{}".format(e))
|
||||||
|
|
||||||
|
try:
|
||||||
|
import pkg.utils.announcement as announcement
|
||||||
|
new_announcement = announcement.fetch_new()
|
||||||
|
if new_announcement != "":
|
||||||
|
logging.critical("[公告] {}".format(new_announcement))
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning("获取公告失败:{}".format(e))
|
||||||
|
|
||||||
return qqbot
|
return qqbot
|
||||||
|
|
||||||
|
|
||||||
|
|
44
pkg/utils/announcement.py
Normal file
44
pkg/utils/announcement.py
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
def read_latest() -> str:
|
||||||
|
resp = requests.get(
|
||||||
|
url="https://api.github.com/repos/RockChinQ/QChatGPT/contents/res/announcement",
|
||||||
|
)
|
||||||
|
obj_json = resp.json()
|
||||||
|
b64_content = obj_json["content"]
|
||||||
|
# 解码
|
||||||
|
content = base64.b64decode(b64_content).decode("utf-8")
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
def read_saved() -> str:
|
||||||
|
# 已保存的在res/announcement_saved
|
||||||
|
# 检查是否存在
|
||||||
|
if not os.path.exists("res/announcement_saved"):
|
||||||
|
with open("res/announcement_saved", "w") as f:
|
||||||
|
f.write("")
|
||||||
|
|
||||||
|
with open("res/announcement_saved", "r") as f:
|
||||||
|
content = f.read()
|
||||||
|
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
def write_saved(content: str):
|
||||||
|
# 已保存的在res/announcement_saved
|
||||||
|
with open("res/announcement_saved", "w") as f:
|
||||||
|
f.write(content)
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_new() -> str:
|
||||||
|
latest = read_latest()
|
||||||
|
saved = read_saved()
|
||||||
|
if latest.replace(saved, "").strip() == "":
|
||||||
|
return ""
|
||||||
|
else:
|
||||||
|
write_saved(latest)
|
||||||
|
return latest.replace(saved, "").strip()
|
Loading…
Reference in New Issue
Block a user