diff --git a/api/events/event_handlers/clean_when_document_deleted.py b/api/events/event_handlers/clean_when_document_deleted.py index d0bec667a9..24022da15f 100644 --- a/api/events/event_handlers/clean_when_document_deleted.py +++ b/api/events/event_handlers/clean_when_document_deleted.py @@ -7,4 +7,5 @@ def handle(sender, **kwargs): document_id = sender dataset_id = kwargs.get('dataset_id') doc_form = kwargs.get('doc_form') - clean_document_task.delay(document_id, dataset_id, doc_form) + file_id = kwargs.get('file_id') + clean_document_task.delay(document_id, dataset_id, doc_form, file_id) diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index fbaf44c9a4..3d9f1851b7 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -524,7 +524,14 @@ class DocumentService: @staticmethod def delete_document(document): # trigger document_was_deleted signal - document_was_deleted.send(document.id, dataset_id=document.dataset_id, doc_form=document.doc_form) + file_id = None + if document.data_source_type == 'upload_file': + if document.data_source_info: + data_source_info = document.data_source_info_dict + if data_source_info and 'upload_file_id' in data_source_info: + file_id = data_source_info['upload_file_id'] + document_was_deleted.send(document.id, dataset_id=document.dataset_id, + doc_form=document.doc_form, file_id=file_id) db.session.delete(document) db.session.commit() diff --git a/api/tasks/clean_dataset_task.py b/api/tasks/clean_dataset_task.py index 4de587d26a..1f26c966c4 100644 --- a/api/tasks/clean_dataset_task.py +++ b/api/tasks/clean_dataset_task.py @@ -6,6 +6,7 @@ from celery import shared_task from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db +from extensions.ext_storage import storage from models.dataset import ( AppDatasetJoin, Dataset, @@ -14,6 +15,7 @@ from models.dataset import ( Document, DocumentSegment, ) +from models.model import UploadFile # Add import statement for ValueError @@ -65,8 +67,27 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, db.session.query(DatasetQuery).filter(DatasetQuery.dataset_id == dataset_id).delete() db.session.query(AppDatasetJoin).filter(AppDatasetJoin.dataset_id == dataset_id).delete() - db.session.commit() + # delete files + if documents: + for document in documents: + try: + if document.data_source_type == 'upload_file': + if document.data_source_info: + data_source_info = document.data_source_info_dict + if data_source_info and 'upload_file_id' in data_source_info: + file_id = data_source_info['upload_file_id'] + file = db.session.query(UploadFile).filter( + UploadFile.tenant_id == document.tenant_id, + UploadFile.id == file_id + ).first() + if not file: + continue + storage.delete(file.key) + db.session.delete(file) + except Exception: + continue + db.session.commit() end_at = time.perf_counter() logging.info( click.style('Cleaned dataset when dataset deleted: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) diff --git a/api/tasks/clean_document_task.py b/api/tasks/clean_document_task.py index 71ebad1da4..0fd05615b6 100644 --- a/api/tasks/clean_document_task.py +++ b/api/tasks/clean_document_task.py @@ -1,21 +1,25 @@ import logging import time +from typing import Optional import click from celery import shared_task from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db +from extensions.ext_storage import storage from models.dataset import Dataset, DocumentSegment +from models.model import UploadFile @shared_task(queue='dataset') -def clean_document_task(document_id: str, dataset_id: str, doc_form: str): +def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_id: Optional[str]): """ Clean document when document deleted. :param document_id: document id :param dataset_id: dataset id :param doc_form: doc_form + :param file_id: file id Usage: clean_document_task.delay(document_id, dataset_id) """ @@ -39,8 +43,20 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str): db.session.delete(segment) db.session.commit() - end_at = time.perf_counter() - logging.info( - click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) + if file_id: + file = db.session.query(UploadFile).filter( + UploadFile.id == file_id + ).first() + if file: + try: + storage.delete(file.key) + except Exception: + logging.exception("Delete file failed when document deleted, file_id: {}".format(file_id)) + db.session.delete(file) + db.session.commit() + + end_at = time.perf_counter() + logging.info( + click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) except Exception: logging.exception("Cleaned document when document deleted failed")