feat: 添加任务管理模块

This commit is contained in:
Junyan Qin 2024-10-22 18:09:18 +08:00
parent 26770439bb
commit c151665419
No known key found for this signature in database
GPG Key ID: 8AC0BEFE1743A015
10 changed files with 121 additions and 36 deletions

View File

@ -30,12 +30,18 @@ class HTTPController:
while True:
await asyncio.sleep(1)
task = asyncio.create_task(self.quart_app.run_task(
# task = asyncio.create_task(self.quart_app.run_task(
# host=self.ap.system_cfg.data['http-api']['host'],
# port=self.ap.system_cfg.data['http-api']['port'],
# shutdown_trigger=shutdown_trigger_placeholder
# ))
# self.ap.asyncio_tasks.append(task)
self.ap.task_mgr.create_task(self.quart_app.run_task(
host=self.ap.system_cfg.data['http-api']['host'],
port=self.ap.system_cfg.data['http-api']['port'],
shutdown_trigger=shutdown_trigger_placeholder
))
self.ap.asyncio_tasks.append(task)
async def register_routes(self) -> None:

View File

@ -69,11 +69,12 @@ class APIGroup(metaclass=abc.ABCMeta):
**kwargs
) -> asyncio.Task:
"""执行请求"""
task = asyncio.create_task(self._do(method, path, data, params, headers, **kwargs))
# task = asyncio.create_task(self._do(method, path, data, params, headers, **kwargs))
self.ap.asyncio_tasks.append(task)
# self.ap.asyncio_tasks.append(task)
return task
return self.ap.task_mgr.create_task(self._do(method, path, data, params, headers, **kwargs)).task
def gen_rid(
self

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import logging
import asyncio
import threading
import traceback
from ..platform import manager as im_mgr
@ -21,6 +22,7 @@ from ..utils import version as version_mgr, proxy as proxy_mgr, announce as anno
from ..persistence import mgr as persistencemgr
from ..api.http.controller import main as http_controller
from ..utils import logcache
from . import taskmgr
class Application:
@ -28,7 +30,8 @@ class Application:
event_loop: asyncio.AbstractEventLoop = None
asyncio_tasks: list[asyncio.Task] = []
# asyncio_tasks: list[asyncio.Task] = []
task_mgr: taskmgr.AsyncTaskManager = None
platform_mgr: im_mgr.PlatformManager = None
@ -103,8 +106,6 @@ class Application:
async def run(self):
await self.plugin_mgr.initialize_plugins()
tasks = []
try:
# 后续可能会允许动态重启其他任务
@ -113,28 +114,19 @@ class Application:
while True:
await asyncio.sleep(1)
tasks = [
asyncio.create_task(self.platform_mgr.run()), # 消息平台
asyncio.create_task(self.ctrl.run()), # 消息处理循环
asyncio.create_task(self.http_ctrl.run()), # http 接口服务
asyncio.create_task(never_ending())
]
self.asyncio_tasks.extend(tasks)
# tasks = [
# asyncio.create_task(self.platform_mgr.run()), # 消息平台
# asyncio.create_task(self.ctrl.run()), # 消息处理循环
# asyncio.create_task(self.http_ctrl.run()), # http 接口服务
# asyncio.create_task(never_ending())
# ]
# self.asyncio_tasks.extend(tasks)
self.task_mgr.create_task(self.platform_mgr.run())
self.task_mgr.create_task(self.ctrl.run())
self.task_mgr.create_task(self.http_ctrl.run())
self.task_mgr.create_task(never_ending())
# 挂系统信号处理
import signal
def signal_handler(sig, frame):
for task in self.asyncio_tasks:
task.cancel()
self.logger.info("程序退出.")
# 结束当前事件循环
self.event_loop.stop()
exit(0)
signal.signal(signal.SIGINT, signal_handler)
await asyncio.gather(*tasks, return_exceptions=True)
await self.task_mgr.wait_all()
except asyncio.CancelledError:
pass
except Exception as e:

View File

@ -49,6 +49,16 @@ async def make_app(loop: asyncio.AbstractEventLoop) -> app.Application:
async def main(loop: asyncio.AbstractEventLoop):
try:
# 挂系统信号处理
import signal
def signal_handler(sig, frame):
print("[Signal] 程序退出.")
os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
app_inst = await make_app(loop)
await app_inst.run()
except Exception as e:

View File

@ -27,8 +27,8 @@ async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging.
if constants.debug_mode:
level = logging.DEBUG
log_file_name = "data/logs/qcg-%s.log" % time.strftime(
"%Y-%m-%d-%H-%M-%S", time.localtime()
log_file_name = "data/logs/langbot-%s.log" % time.strftime(
"%Y-%m-%d", time.localtime()
)
qcg_logger = logging.getLogger("qcg")

View File

@ -18,6 +18,7 @@ from ...platform import manager as im_mgr
from ...persistence import mgr as persistencemgr
from ...api.http.controller import main as http_controller
from ...utils import logcache
from .. import taskmgr
@stage.stage_class("BuildAppStage")
@ -28,6 +29,7 @@ class BuildAppStage(stage.BootingStage):
async def run(self, ap: app.Application):
"""构建app对象的各个组件对象并初始化
"""
ap.task_mgr = taskmgr.AsyncTaskManager(ap)
proxy_mgr = proxy.ProxyManager(ap)
await proxy_mgr.initialize()

73
pkg/core/taskmgr.py Normal file
View File

@ -0,0 +1,73 @@
from __future__ import annotations
import asyncio
import typing
from . import app
class TaskContext:
"""任务跟踪上下文"""
current_action: str
"""当前正在执行的动作"""
log: str
"""记录日志"""
def __init__(self):
self.current_action = ""
self.log = ""
def log(self, msg: str):
self.log += msg + "\n"
def set_current_action(self, action: str):
self.current_action = action
class TaskWrapper:
"""任务包装器"""
task_type: str = "system" # 任务类型: system 或 user
"""任务类型"""
task_context: TaskContext
"""任务上下文"""
task: asyncio.Task
"""任务"""
ap: app.Application
"""应用实例"""
def __init__(self, ap: app.Application, coro: typing.Coroutine, task_type: str = "system", context: TaskContext = None):
self.ap = ap
self.task_context = context or TaskContext()
self.task = self.ap.event_loop.create_task(coro)
self.task_type = task_type
class AsyncTaskManager:
"""保存app中的所有异步任务
包含系统级的和用户级插件安装更新等由用户直接发起的"""
ap: app.Application
tasks: list[TaskWrapper]
"""所有任务"""
def __init__(self, ap: app.Application):
self.ap = ap
self.tasks = []
def create_task(self, coro: typing.Coroutine, task_type: str = "system", context: TaskContext = None) -> TaskWrapper:
wrapper = TaskWrapper(self.ap, coro, task_type, context)
self.tasks.append(wrapper)
return wrapper
async def wait_all(self):
await asyncio.gather(*[t.task for t in self.tasks], return_exceptions=True)
def get_all_tasks(self) -> list[TaskWrapper]:
return self.tasks

View File

@ -60,8 +60,9 @@ class Controller:
# 通知其他协程,有新的请求可以处理了
self.ap.query_pool.condition.notify_all()
task = asyncio.create_task(_process_query(selected_query))
self.ap.asyncio_tasks.append(task)
# task = asyncio.create_task(_process_query(selected_query))
# self.ap.asyncio_tasks.append(task)
self.ap.task_mgr.create_task(_process_query(selected_query))
except Exception as e:
# traceback.print_exc()

View File

@ -184,10 +184,10 @@ class PlatformManager:
tasks.append(exception_wrapper(adapter))
for task in tasks:
async_task = asyncio.create_task(task)
self.ap.asyncio_tasks.append(async_task)
# async_task = asyncio.create_task(task)
# self.ap.asyncio_tasks.append(async_task)
self.ap.task_mgr.create_task(task)
except Exception as e:
self.ap.logger.error('平台适配器运行出错: ' + str(e))
self.ap.logger.debug(f"Traceback: {traceback.format_exc()}")