diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py index 826842e52b..3d799bfd4e 100644 --- a/api/schedule/clean_unused_datasets_task.py +++ b/api/schedule/clean_unused_datasets_task.py @@ -9,9 +9,7 @@ import app from configs import dify_config from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db -from extensions.ext_redis import redis_client from models.dataset import Dataset, DatasetQuery, Document -from services.feature_service import FeatureService @app.celery.task(queue="dataset") @@ -20,7 +18,6 @@ def clean_unused_datasets_task(): clean_days = dify_config.CLEAN_DAY_SETTING start_at = time.perf_counter() thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) - seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=7) page = 1 while True: try: @@ -91,84 +88,5 @@ def clean_unused_datasets_task(): click.echo( click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") ) - page = 1 - while True: - try: - # Subquery for counting new documents - document_subquery_new = ( - db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) - .filter( - Document.indexing_status == "completed", - Document.enabled == True, - Document.archived == False, - Document.updated_at > seven_days_ago, - ) - .group_by(Document.dataset_id) - .subquery() - ) - - # Subquery for counting old documents - document_subquery_old = ( - db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) - .filter( - Document.indexing_status == "completed", - Document.enabled == True, - Document.archived == False, - Document.updated_at < seven_days_ago, - ) - .group_by(Document.dataset_id) - .subquery() - ) - - # Main query with join and filter - datasets = ( - db.session.query(Dataset) - .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) - .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) - .filter( - Dataset.created_at < seven_days_ago, - func.coalesce(document_subquery_new.c.document_count, 0) == 0, - func.coalesce(document_subquery_old.c.document_count, 0) > 0, - ) - .order_by(Dataset.created_at.desc()) - .paginate(page=page, per_page=50) - ) - - except NotFound: - break - if datasets.items is None or len(datasets.items) == 0: - break - page += 1 - for dataset in datasets: - dataset_query = ( - db.session.query(DatasetQuery) - .filter(DatasetQuery.created_at > seven_days_ago, DatasetQuery.dataset_id == dataset.id) - .all() - ) - if not dataset_query or len(dataset_query) == 0: - try: - features_cache_key = f"features:{dataset.tenant_id}" - plan = redis_client.get(features_cache_key) - if plan is None: - features = FeatureService.get_features(dataset.tenant_id) - redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) - plan = features.billing.subscription.plan - if plan == "sandbox": - # remove index - index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() - index_processor.clean(dataset, None) - - # update document - update_params = {Document.enabled: False} - - Document.query.filter_by(dataset_id=dataset.id).update(update_params) - db.session.commit() - click.echo( - click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green") - ) - except Exception as e: - click.echo( - click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") - ) end_at = time.perf_counter() click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))