From 350f4758963a4d6b0f09a431465a8ef1dbdc51e8 Mon Sep 17 00:00:00 2001 From: Joe <1264204425@qq.com> Date: Fri, 8 Nov 2024 10:47:39 +0800 Subject: [PATCH 1/3] fix: optimizing code --- api/core/ops/entities/config_entity.py | 1 + api/core/ops/entities/trace_entity.py | 11 +++++++++++ api/core/ops/ops_trace_manager.py | 17 ++++++++--------- api/tasks/ops_trace_task.py | 17 ++++++++++------- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/api/core/ops/entities/config_entity.py b/api/core/ops/entities/config_entity.py index 3454ab20e6..ef0f9c708f 100644 --- a/api/core/ops/entities/config_entity.py +++ b/api/core/ops/entities/config_entity.py @@ -57,3 +57,4 @@ class LangSmithConfig(BaseTracingConfig): OPS_FILE_PATH = "ops_trace/" +OPS_TRACE_FAILED_KEY = "FAILED_OPS_TRACE" diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index db6ce9d9c3..256595286f 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -23,6 +23,11 @@ class BaseTraceInfo(BaseModel): return v return "" + class Config: + json_encoders = { + datetime: lambda v: v.isoformat(), + } + class WorkflowTraceInfo(BaseTraceInfo): workflow_data: Any @@ -100,6 +105,12 @@ class GenerateNameTraceInfo(BaseTraceInfo): tenant_id: str +class TaskData(BaseModel): + app_id: str + trace_info_type: str + trace_info: Any + + trace_info_info_map = { "WorkflowTraceInfo": WorkflowTraceInfo, "MessageTraceInfo": MessageTraceInfo, diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 1421c6ee61..9b476bc4a5 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -23,13 +23,14 @@ from core.ops.entities.trace_entity import ( MessageTraceInfo, ModerationTraceInfo, SuggestedQuestionTraceInfo, + TaskData, ToolTraceInfo, TraceTaskName, WorkflowTraceInfo, ) from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace -from core.ops.utils import convert_datetime_to_str, get_message_data +from core.ops.utils import get_message_data from extensions.ext_database import db from extensions.ext_storage import storage from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig @@ -744,15 +745,13 @@ class TraceQueueManager: for task in tasks: file_id = uuid4().hex trace_info = task.execute() - task_data = { - "app_id": task.app_id, - "trace_info_type": type(trace_info).__name__, - "trace_info": trace_info.model_dump() if trace_info else {}, - } - task_data = convert_datetime_to_str(task_data) - json_data = json.dumps(task_data, ensure_ascii=False).encode("utf-8") + task_data = TaskData( + app_id=task.app_id, + trace_info_type=type(trace_info).__name__, + trace_info=trace_info.model_dump() if trace_info else None, + ) file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json" - storage.save(file_path, json_data) + storage.save(file_path, task_data.model_dump_json().encode("utf-8")) file_info = { "file_id": file_id, "app_id": task.app_id, diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index 848bae25aa..b3963a7be4 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -4,9 +4,10 @@ import logging from celery import shared_task from flask import current_app -from core.ops.entities.config_entity import OPS_FILE_PATH +from core.ops.entities.config_entity import OPS_FILE_PATH, OPS_TRACE_FAILED_KEY from core.ops.entities.trace_entity import trace_info_info_map from core.rag.models.document import Document +from extensions.ext_redis import redis_client from extensions.ext_storage import storage from models.model import Message from models.workflow import WorkflowRun @@ -25,9 +26,9 @@ def process_trace_tasks(file_info): app_id = file_info.get("app_id") file_id = file_info.get("file_id") file_path = f"{OPS_FILE_PATH}{app_id}/{file_id}.json" - file_data = storage.load(file_path) - trace_info = json.loads(file_data).get("trace_info") - trace_info_type = json.loads(file_data).get("trace_info_type") + file_data = json.loads(storage.load(file_path)) + trace_info = file_data.get("trace_info") + trace_info_type = file_data.get("trace_info_type") trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) if trace_info.get("message_data"): @@ -44,7 +45,9 @@ def process_trace_tasks(file_info): if trace_type: trace_info = trace_type(**trace_info) trace_instance.trace(trace_info) - except Exception: - logging.exception(f"Processing trace tasks failed, app_id: {app_id}") - finally: storage.delete(file_path) + logging.info(f"Processing trace tasks success, app_id: {app_id}") + except Exception: + failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}" + redis_client.incr(failed_key) + logging.info(f"Processing trace tasks failed, app_id: {app_id}") From 532dfd6ae86d75bd890cb8d3809a449ca317cccb Mon Sep 17 00:00:00 2001 From: Joe <1264204425@qq.com> Date: Fri, 8 Nov 2024 10:49:53 +0800 Subject: [PATCH 2/3] chore: remove convert_datetime_to_str --- api/core/ops/utils.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/api/core/ops/utils.py b/api/core/ops/utils.py index 82401a9de8..3cd3fb5756 100644 --- a/api/core/ops/utils.py +++ b/api/core/ops/utils.py @@ -43,17 +43,3 @@ def replace_text_with_content(data): return [replace_text_with_content(item) for item in data] else: return data - - -def convert_datetime_to_str(data): - if isinstance(data, dict): - for key, value in data.items(): - if isinstance(value, datetime): - data[key] = value.isoformat() - elif isinstance(value, dict): - data[key] = convert_datetime_to_str(value) - elif isinstance(value, list): - data[key] = [convert_datetime_to_str(item) if isinstance(item, dict | list) else item for item in value] - elif isinstance(data, list): - data = [convert_datetime_to_str(item) if isinstance(item, dict | list) else item for item in data] - return data From 36cf784c9c686610c81d8e08f9b446a722bf995b Mon Sep 17 00:00:00 2001 From: Joe <1264204425@qq.com> Date: Fri, 8 Nov 2024 14:16:02 +0800 Subject: [PATCH 3/3] feat: add delete file finally --- api/tasks/ops_trace_task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index b3963a7be4..34c62dc923 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -45,9 +45,10 @@ def process_trace_tasks(file_info): if trace_type: trace_info = trace_type(**trace_info) trace_instance.trace(trace_info) - storage.delete(file_path) logging.info(f"Processing trace tasks success, app_id: {app_id}") except Exception: failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}" redis_client.incr(failed_key) logging.info(f"Processing trace tasks failed, app_id: {app_id}") + finally: + storage.delete(file_path)