From dc23d9e693a187dd1dc9a463c5f3fca66fae6575 Mon Sep 17 00:00:00 2001 From: takatost Date: Wed, 24 Jul 2024 18:53:29 +0800 Subject: [PATCH] chore: optimize asynchronous workflow deletion performance of app related data --- api/tasks/remove_app_and_related_data_task.py | 45 ++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index 5aea8552da..378756e68c 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -45,6 +45,9 @@ def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): _delete_app_annotation_data(tenant_id, app_id) _delete_app_dataset_joins(tenant_id, app_id) _delete_app_workflows(tenant_id, app_id) + _delete_app_workflow_runs(tenant_id, app_id) + _delete_app_workflow_node_executions(tenant_id, app_id) + _delete_app_workflow_app_logs(tenant_id, app_id) _delete_app_conversations(tenant_id, app_id) _delete_app_messages(tenant_id, app_id) _delete_workflow_tool_providers(tenant_id, app_id) @@ -162,11 +165,6 @@ def _delete_app_dataset_joins(tenant_id: str, app_id: str): def _delete_app_workflows(tenant_id: str, app_id: str): def del_workflow(workflow_id: str): - db.session.query(WorkflowRun).filter(WorkflowRun.workflow_id == workflow_id).delete(synchronize_session=False) - db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.workflow_id == workflow_id).delete( - synchronize_session=False) - db.session.query(WorkflowAppLog).filter(WorkflowAppLog.workflow_id == workflow_id).delete( - synchronize_session=False) db.session.query(Workflow).filter(Workflow.id == workflow_id).delete(synchronize_session=False) _delete_records( @@ -177,6 +175,43 @@ def _delete_app_workflows(tenant_id: str, app_id: str): ) +def _delete_app_workflow_runs(tenant_id: str, app_id: str): + def del_workflow_run(workflow_run_id: str): + db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).delete(synchronize_session=False) + + _delete_records( + """select id from workflow_runs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_workflow_run, + "workflow run" + ) + + +def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): + def del_workflow_node_execution(workflow_node_execution_id: str): + db.session.query(WorkflowNodeExecution).filter( + WorkflowNodeExecution.id == workflow_node_execution_id).delete(synchronize_session=False) + + _delete_records( + """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_workflow_node_execution, + "workflow node execution" + ) + + +def _delete_app_workflow_app_logs(tenant_id: str, app_id: str): + def del_workflow_app_log(workflow_app_log_id: str): + db.session.query(WorkflowAppLog).filter(WorkflowAppLog.id == workflow_app_log_id).delete(synchronize_session=False) + + _delete_records( + """select id from workflow_app_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_workflow_app_log, + "workflow app log" + ) + + def _delete_app_conversations(tenant_id: str, app_id: str): def del_conversation(conversation_id: str): db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete(