diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index ae9a075340..adfb94408c 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -51,7 +51,7 @@ def init_app(app: Flask) -> Celery: }, 'clean_unused_datasets_task': { 'task': 'schedule.clean_unused_datasets_task.clean_unused_datasets_task', - 'schedule': timedelta(days=day), + 'schedule': timedelta(minutes=1), } } celery_app.conf.update( diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py index 2033791ace..f2c75bc825 100644 --- a/api/schedule/clean_unused_datasets_task.py +++ b/api/schedule/clean_unused_datasets_task.py @@ -9,7 +9,7 @@ from configs import dify_config from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db from models.dataset import Dataset, DatasetQuery, Document - +from sqlalchemy import func @app.celery.task(queue='dataset') def clean_unused_datasets_task(): @@ -20,10 +20,46 @@ def clean_unused_datasets_task(): page = 1 while True: try: - datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \ - .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50) + # Subquery for counting new documents + document_subquery_new = db.session.query( + Document.dataset_id, + func.count(Document.id).label('document_count') + ).filter( + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + Document.updated_at > thirty_days_ago + ).group_by(Document.dataset_id).subquery() + + # Subquery for counting old documents + document_subquery_old = db.session.query( + Document.dataset_id, + func.count(Document.id).label('document_count') + ).filter( + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + Document.updated_at < thirty_days_ago + ).group_by(Document.dataset_id).subquery() + + # Main query with join and filter + datasets = (db.session.query(Dataset) + .outerjoin( + document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id + ).outerjoin( + document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id + ).filter( + Dataset.created_at < thirty_days_ago, + func.coalesce(document_subquery_new.c.document_count, 0) == 0, + func.coalesce(document_subquery_old.c.document_count, 0) > 0 + ).order_by( + Dataset.created_at.desc() + ).paginate(page=page, per_page=50)) + except NotFound: break + if datasets.items is None or len(datasets.items) == 0: + break page += 1 for dataset in datasets: dataset_query = db.session.query(DatasetQuery).filter( @@ -31,31 +67,23 @@ def clean_unused_datasets_task(): DatasetQuery.dataset_id == dataset.id ).all() if not dataset_query or len(dataset_query) == 0: - documents = db.session.query(Document).filter( - Document.dataset_id == dataset.id, - Document.indexing_status == 'completed', - Document.enabled == True, - Document.archived == False, - Document.updated_at > thirty_days_ago - ).all() - if not documents or len(documents) == 0: - try: - # remove index - index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() - index_processor.clean(dataset, None) + try: + # remove index + index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() + index_processor.clean(dataset, None) - # update document - update_params = { - Document.enabled: False - } + # update document + update_params = { + Document.enabled: False + } - Document.query.filter_by(dataset_id=dataset.id).update(update_params) - db.session.commit() - click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id), - fg='green')) - except Exception as e: - click.echo( - click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), - fg='red')) + Document.query.filter_by(dataset_id=dataset.id).update(update_params) + db.session.commit() + click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id), + fg='green')) + except Exception as e: + click.echo( + click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), + fg='red')) end_at = time.perf_counter() click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))