import json import logging import os import queue import threading import time from datetime import timedelta from enum import Enum from typing import Any, Optional, Union from uuid import UUID from flask import current_app from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token from core.ops.entities.config_entity import ( LangfuseConfig, LangSmithConfig, TracingProviderEnum, ) from core.ops.entities.trace_entity import ( DatasetRetrievalTraceInfo, GenerateNameTraceInfo, MessageTraceInfo, ModerationTraceInfo, SuggestedQuestionTraceInfo, ToolTraceInfo, WorkflowTraceInfo, ) from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace from core.ops.utils import get_message_data from extensions.ext_database import db from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig from models.workflow import WorkflowAppLog, WorkflowRun from tasks.ops_trace_task import process_trace_tasks provider_config_map = { TracingProviderEnum.LANGFUSE.value: { 'config_class': LangfuseConfig, 'secret_keys': ['public_key', 'secret_key'], 'other_keys': ['host'], 'trace_instance': LangFuseDataTrace }, TracingProviderEnum.LANGSMITH.value: { 'config_class': LangSmithConfig, 'secret_keys': ['api_key'], 'other_keys': ['project', 'endpoint'], 'trace_instance': LangSmithDataTrace } } class OpsTraceManager: @classmethod def encrypt_tracing_config( cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None ): """ Encrypt tracing config. :param tenant_id: tenant id :param tracing_provider: tracing provider :param tracing_config: tracing config dictionary to be encrypted :param current_trace_config: current tracing configuration for keeping existing values :return: encrypted tracing configuration """ # Get the configuration class and the keys that require encryption config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \ provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys'] new_config = {} # Encrypt necessary keys for key in secret_keys: if key in tracing_config: if '*' in tracing_config[key]: # If the key contains '*', retain the original value from the current config new_config[key] = current_trace_config.get(key, tracing_config[key]) else: # Otherwise, encrypt the key new_config[key] = encrypt_token(tenant_id, tracing_config[key]) for key in other_keys: new_config[key] = tracing_config.get(key, "") # Create a new instance of the config class with the new configuration encrypted_config = config_class(**new_config) return encrypted_config.model_dump() @classmethod def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict): """ Decrypt tracing config :param tenant_id: tenant id :param tracing_provider: tracing provider :param tracing_config: tracing config :return: """ config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \ provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys'] new_config = {} for key in secret_keys: if key in tracing_config: new_config[key] = decrypt_token(tenant_id, tracing_config[key]) for key in other_keys: new_config[key] = tracing_config.get(key, "") return config_class(**new_config).model_dump() @classmethod def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict): """ Decrypt tracing config :param tracing_provider: tracing provider :param decrypt_tracing_config: tracing config :return: """ config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \ provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys'] new_config = {} for key in secret_keys: if key in decrypt_tracing_config: new_config[key] = obfuscated_token(decrypt_tracing_config[key]) for key in other_keys: new_config[key] = decrypt_tracing_config.get(key, "") return config_class(**new_config).model_dump() @classmethod def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str): """ Get decrypted tracing config :param app_id: app id :param tracing_provider: tracing provider :return: """ trace_config_data: TraceAppConfig = db.session.query(TraceAppConfig).filter( TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider ).first() if not trace_config_data: return None # decrypt_token tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id decrypt_tracing_config = cls.decrypt_tracing_config( tenant_id, tracing_provider, trace_config_data.tracing_config ) return decrypt_tracing_config @classmethod def get_ops_trace_instance( cls, app_id: Optional[Union[UUID, str]] = None, message_id: Optional[str] = None, conversation_id: Optional[str] = None ): """ Get ops trace through model config :param app_id: app_id :param message_id: message_id :param conversation_id: conversation_id :return: """ if conversation_id is not None: conversation_data: Conversation = db.session.query(Conversation).filter( Conversation.id == conversation_id ).first() if conversation_data: app_id = conversation_data.app_id if message_id is not None: record: Message = db.session.query(Message).filter(Message.id == message_id).first() app_id = record.app_id if isinstance(app_id, UUID): app_id = str(app_id) if app_id is None: return None app: App = db.session.query(App).filter( App.id == app_id ).first() app_ops_trace_config = json.loads(app.tracing) if app.tracing else None if app_ops_trace_config is not None: tracing_provider = app_ops_trace_config.get('tracing_provider') else: return None # decrypt_token decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider) if app_ops_trace_config.get('enabled'): trace_instance, config_class = provider_config_map[tracing_provider]['trace_instance'], \ provider_config_map[tracing_provider]['config_class'] tracing_instance = trace_instance(config_class(**decrypt_trace_config)) return tracing_instance return None @classmethod def get_app_config_through_message_id(cls, message_id: str): app_model_config = None message_data = db.session.query(Message).filter(Message.id == message_id).first() conversation_id = message_data.conversation_id conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first() if conversation_data.app_model_config_id: app_model_config = db.session.query(AppModelConfig).filter( AppModelConfig.id == conversation_data.app_model_config_id ).first() elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs: app_model_config = conversation_data.override_model_configs return app_model_config @classmethod def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str): """ Update app tracing config :param app_id: app id :param enabled: enabled :param tracing_provider: tracing provider :return: """ # auth check if tracing_provider not in provider_config_map.keys() and tracing_provider is not None: raise ValueError(f"Invalid tracing provider: {tracing_provider}") app_config: App = db.session.query(App).filter(App.id == app_id).first() app_config.tracing = json.dumps( { "enabled": enabled, "tracing_provider": tracing_provider, } ) db.session.commit() @classmethod def get_app_tracing_config(cls, app_id: str): """ Get app tracing config :param app_id: app id :return: """ app: App = db.session.query(App).filter(App.id == app_id).first() if not app.tracing: return { "enabled": False, "tracing_provider": None } app_trace_config = json.loads(app.tracing) return app_trace_config @staticmethod def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str): """ Check trace config is effective :param tracing_config: tracing config :param tracing_provider: tracing provider :return: """ config_type, trace_instance = provider_config_map[tracing_provider]['config_class'], \ provider_config_map[tracing_provider]['trace_instance'] tracing_config = config_type(**tracing_config) return trace_instance(tracing_config).api_check() class TraceTaskName(str, Enum): CONVERSATION_TRACE = 'conversation_trace' WORKFLOW_TRACE = 'workflow_trace' MESSAGE_TRACE = 'message_trace' MODERATION_TRACE = 'moderation_trace' SUGGESTED_QUESTION_TRACE = 'suggested_question_trace' DATASET_RETRIEVAL_TRACE = 'dataset_retrieval_trace' TOOL_TRACE = 'tool_trace' GENERATE_NAME_TRACE = 'generate_name_trace' class TraceTask: def __init__( self, trace_type: Any, message_id: Optional[str] = None, workflow_run: Optional[WorkflowRun] = None, conversation_id: Optional[str] = None, timer: Optional[Any] = None, **kwargs ): self.trace_type = trace_type self.message_id = message_id self.workflow_run = workflow_run self.conversation_id = conversation_id self.timer = timer self.kwargs = kwargs self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001") def execute(self): method_name, trace_info = self.preprocess() return trace_info def preprocess(self): if self.trace_type == TraceTaskName.CONVERSATION_TRACE: return TraceTaskName.CONVERSATION_TRACE, self.conversation_trace(**self.kwargs) if self.trace_type == TraceTaskName.WORKFLOW_TRACE: return TraceTaskName.WORKFLOW_TRACE, self.workflow_trace(self.workflow_run, self.conversation_id) elif self.trace_type == TraceTaskName.MESSAGE_TRACE: return TraceTaskName.MESSAGE_TRACE, self.message_trace(self.message_id) elif self.trace_type == TraceTaskName.MODERATION_TRACE: return TraceTaskName.MODERATION_TRACE, self.moderation_trace(self.message_id, self.timer, **self.kwargs) elif self.trace_type == TraceTaskName.SUGGESTED_QUESTION_TRACE: return TraceTaskName.SUGGESTED_QUESTION_TRACE, self.suggested_question_trace( self.message_id, self.timer, **self.kwargs ) elif self.trace_type == TraceTaskName.DATASET_RETRIEVAL_TRACE: return TraceTaskName.DATASET_RETRIEVAL_TRACE, self.dataset_retrieval_trace( self.message_id, self.timer, **self.kwargs ) elif self.trace_type == TraceTaskName.TOOL_TRACE: return TraceTaskName.TOOL_TRACE, self.tool_trace(self.message_id, self.timer, **self.kwargs) elif self.trace_type == TraceTaskName.GENERATE_NAME_TRACE: return TraceTaskName.GENERATE_NAME_TRACE, self.generate_name_trace( self.conversation_id, self.timer, **self.kwargs ) else: return '', {} # process methods for different trace types def conversation_trace(self, **kwargs): return kwargs def workflow_trace(self, workflow_run: WorkflowRun, conversation_id): workflow_id = workflow_run.workflow_id tenant_id = workflow_run.tenant_id workflow_run_id = workflow_run.id workflow_run_elapsed_time = workflow_run.elapsed_time workflow_run_status = workflow_run.status workflow_run_inputs = ( json.loads(workflow_run.inputs) if workflow_run.inputs else {} ) workflow_run_outputs = ( json.loads(workflow_run.outputs) if workflow_run.outputs else {} ) workflow_run_version = workflow_run.version error = workflow_run.error if workflow_run.error else "" total_tokens = workflow_run.total_tokens file_list = workflow_run_inputs.get("sys.file") if workflow_run_inputs.get("sys.file") else [] query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or "" # get workflow_app_log_id workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by( tenant_id=tenant_id, app_id=workflow_run.app_id, workflow_run_id=workflow_run.id ).first() workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None # get message_id message_data = db.session.query(Message.id).filter_by( conversation_id=conversation_id, workflow_run_id=workflow_run_id ).first() message_id = str(message_data.id) if message_data else None metadata = { "workflow_id": workflow_id, "conversation_id": conversation_id, "workflow_run_id": workflow_run_id, "tenant_id": tenant_id, "elapsed_time": workflow_run_elapsed_time, "status": workflow_run_status, "version": workflow_run_version, "total_tokens": total_tokens, "file_list": file_list, "triggered_form": workflow_run.triggered_from, } workflow_trace_info = WorkflowTraceInfo( workflow_data=workflow_run.to_dict(), conversation_id=conversation_id, workflow_id=workflow_id, tenant_id=tenant_id, workflow_run_id=workflow_run_id, workflow_run_elapsed_time=workflow_run_elapsed_time, workflow_run_status=workflow_run_status, workflow_run_inputs=workflow_run_inputs, workflow_run_outputs=workflow_run_outputs, workflow_run_version=workflow_run_version, error=error, total_tokens=total_tokens, file_list=file_list, query=query, metadata=metadata, workflow_app_log_id=workflow_app_log_id, message_id=message_id, start_time=workflow_run.created_at, end_time=workflow_run.finished_at, ) return workflow_trace_info def message_trace(self, message_id): message_data = get_message_data(message_id) if not message_data: return {} conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first() conversation_mode = conversation_mode[0] created_at = message_data.created_at inputs = message_data.message # get message file data message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first() file_list = [] if message_file_data and message_file_data.url is not None: file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else "" file_list.append(file_url) metadata = { "conversation_id": message_data.conversation_id, "ls_provider": message_data.model_provider, "ls_model_name": message_data.model_id, "status": message_data.status, "from_end_user_id": message_data.from_account_id, "from_account_id": message_data.from_account_id, "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, "message_id": message_id, } message_tokens = message_data.message_tokens message_trace_info = MessageTraceInfo( message_id=message_id, message_data=message_data.to_dict(), conversation_model=conversation_mode, message_tokens=message_tokens, answer_tokens=message_data.answer_tokens, total_tokens=message_tokens + message_data.answer_tokens, error=message_data.error if message_data.error else "", inputs=inputs, outputs=message_data.answer, file_list=file_list, start_time=created_at, end_time=created_at + timedelta(seconds=message_data.provider_response_latency), metadata=metadata, message_file_data=message_file_data, conversation_mode=conversation_mode, ) return message_trace_info def moderation_trace(self, message_id, timer, **kwargs): moderation_result = kwargs.get("moderation_result") inputs = kwargs.get("inputs") message_data = get_message_data(message_id) if not message_data: return {} metadata = { "message_id": message_id, "action": moderation_result.action, "preset_response": moderation_result.preset_response, "query": moderation_result.query, } # get workflow_app_log_id workflow_app_log_id = None if message_data.workflow_run_id: workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by( workflow_run_id=message_data.workflow_run_id ).first() workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None moderation_trace_info = ModerationTraceInfo( message_id=workflow_app_log_id if workflow_app_log_id else message_id, inputs=inputs, message_data=message_data.to_dict(), flagged=moderation_result.flagged, action=moderation_result.action, preset_response=moderation_result.preset_response, query=moderation_result.query, start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, ) return moderation_trace_info def suggested_question_trace(self, message_id, timer, **kwargs): suggested_question = kwargs.get("suggested_question") message_data = get_message_data(message_id) if not message_data: return {} metadata = { "message_id": message_id, "ls_provider": message_data.model_provider, "ls_model_name": message_data.model_id, "status": message_data.status, "from_end_user_id": message_data.from_account_id, "from_account_id": message_data.from_account_id, "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, } # get workflow_app_log_id workflow_app_log_id = None if message_data.workflow_run_id: workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by( workflow_run_id=message_data.workflow_run_id ).first() workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None suggested_question_trace_info = SuggestedQuestionTraceInfo( message_id=workflow_app_log_id if workflow_app_log_id else message_id, message_data=message_data.to_dict(), inputs=message_data.message, outputs=message_data.answer, start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, total_tokens=message_data.message_tokens + message_data.answer_tokens, status=message_data.status, error=message_data.error, from_account_id=message_data.from_account_id, agent_based=message_data.agent_based, from_source=message_data.from_source, model_provider=message_data.model_provider, model_id=message_data.model_id, suggested_question=suggested_question, level=message_data.status, status_message=message_data.error, ) return suggested_question_trace_info def dataset_retrieval_trace(self, message_id, timer, **kwargs): documents = kwargs.get("documents") message_data = get_message_data(message_id) if not message_data: return {} metadata = { "message_id": message_id, "ls_provider": message_data.model_provider, "ls_model_name": message_data.model_id, "status": message_data.status, "from_end_user_id": message_data.from_account_id, "from_account_id": message_data.from_account_id, "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, } dataset_retrieval_trace_info = DatasetRetrievalTraceInfo( message_id=message_id, inputs=message_data.query if message_data.query else message_data.inputs, documents=[doc.model_dump() for doc in documents], start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, message_data=message_data.to_dict(), ) return dataset_retrieval_trace_info def tool_trace(self, message_id, timer, **kwargs): tool_name = kwargs.get('tool_name') tool_inputs = kwargs.get('tool_inputs') tool_outputs = kwargs.get('tool_outputs') message_data = get_message_data(message_id) if not message_data: return {} tool_config = {} time_cost = 0 error = None tool_parameters = {} created_time = message_data.created_at end_time = message_data.updated_at agent_thoughts: list[MessageAgentThought] = message_data.agent_thoughts for agent_thought in agent_thoughts: if tool_name in agent_thought.tools: created_time = agent_thought.created_at tool_meta_data = agent_thought.tool_meta.get(tool_name, {}) tool_config = tool_meta_data.get('tool_config', {}) time_cost = tool_meta_data.get('time_cost', 0) end_time = created_time + timedelta(seconds=time_cost) error = tool_meta_data.get('error', "") tool_parameters = tool_meta_data.get('tool_parameters', {}) metadata = { "message_id": message_id, "tool_name": tool_name, "tool_inputs": tool_inputs, "tool_outputs": tool_outputs, "tool_config": tool_config, "time_cost": time_cost, "error": error, "tool_parameters": tool_parameters, } file_url = "" message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first() if message_file_data: message_file_id = message_file_data.id if message_file_data else None type = message_file_data.type created_by_role = message_file_data.created_by_role created_user_id = message_file_data.created_by file_url = f"{self.file_base_url}/{message_file_data.url}" metadata.update( { "message_file_id": message_file_id, "created_by_role": created_by_role, "created_user_id": created_user_id, "type": type, } ) tool_trace_info = ToolTraceInfo( message_id=message_id, message_data=message_data.to_dict(), tool_name=tool_name, start_time=timer.get("start") if timer else created_time, end_time=timer.get("end") if timer else end_time, tool_inputs=tool_inputs, tool_outputs=tool_outputs, metadata=metadata, message_file_data=message_file_data, error=error, inputs=message_data.message, outputs=message_data.answer, tool_config=tool_config, time_cost=time_cost, tool_parameters=tool_parameters, file_url=file_url, ) return tool_trace_info def generate_name_trace(self, conversation_id, timer, **kwargs): generate_conversation_name = kwargs.get("generate_conversation_name") inputs = kwargs.get("inputs") tenant_id = kwargs.get("tenant_id") start_time = timer.get("start") end_time = timer.get("end") metadata = { "conversation_id": conversation_id, "tenant_id": tenant_id, } generate_name_trace_info = GenerateNameTraceInfo( conversation_id=conversation_id, inputs=inputs, outputs=generate_conversation_name, start_time=start_time, end_time=end_time, metadata=metadata, tenant_id=tenant_id, ) return generate_name_trace_info trace_manager_timer = None trace_manager_queue = queue.Queue() trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5)) trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100)) class TraceQueueManager: def __init__(self, app_id=None, conversation_id=None, message_id=None): global trace_manager_timer self.app_id = app_id self.conversation_id = conversation_id self.message_id = message_id self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id) self.flask_app = current_app._get_current_object() if trace_manager_timer is None: self.start_timer() def add_trace_task(self, trace_task: TraceTask): global trace_manager_timer global trace_manager_queue try: if self.trace_instance: trace_manager_queue.put(trace_task) except Exception as e: logging.debug(f"Error adding trace task: {e}") finally: self.start_timer() def collect_tasks(self): global trace_manager_queue tasks = [] while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty(): task = trace_manager_queue.get_nowait() tasks.append(task) trace_manager_queue.task_done() return tasks def run(self): try: tasks = self.collect_tasks() if tasks: self.send_to_celery(tasks) except Exception as e: logging.debug(f"Error processing trace tasks: {e}") def start_timer(self): global trace_manager_timer if trace_manager_timer is None or not trace_manager_timer.is_alive(): trace_manager_timer = threading.Timer( trace_manager_interval, self.run ) trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}" trace_manager_timer.daemon = False trace_manager_timer.start() def send_to_celery(self, tasks: list[TraceTask]): with self.flask_app.app_context(): for task in tasks: trace_info = task.execute() task_data = { "app_id": self.app_id, "conversation_id": self.conversation_id, "message_id": self.message_id, "trace_info_type": type(trace_info).__name__, "trace_info": trace_info.model_dump() if trace_info else {}, } process_trace_tasks.delay(task_data)