From 595e9b25baa1b967800a6462ba7eb4986890819e Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Tue, 2 Jan 2024 15:29:18 +0800 Subject: [PATCH] Add data clean schedule (#1859) Co-authored-by: jyong --- api/docker/entrypoint.sh | 2 + api/extensions/ext_celery.py | 23 ++++++++ api/requirements.txt | 2 +- api/schedule/clean_embedding_cache_task.py | 29 +++++++++ api/schedule/clean_unused_datasets_task.py | 69 ++++++++++++++++++++++ 5 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 api/schedule/clean_embedding_cache_task.py create mode 100644 api/schedule/clean_unused_datasets_task.py diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index 6d60e3d792..2dad52f8f6 100644 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -10,6 +10,8 @@ fi if [[ "${MODE}" == "worker" ]]; then celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \ -Q ${CELERY_QUEUES:-dataset,generation,mail} +elif [[ "${MODE}" == "beat" ]]; then + celery -A app.celery beat --loglevel INFO else if [[ "${DEBUG}" == "true" ]]; then flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 5750d77dba..b2ff70518e 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -1,3 +1,5 @@ +from datetime import timedelta + from celery import Task, Celery from flask import Flask @@ -35,4 +37,25 @@ def init_app(app: Flask) -> Celery: celery_app.set_default() app.extensions["celery"] = celery_app + + imports = [ + "schedule.clean_embedding_cache_task", + "schedule.clean_unused_datasets_task", + ] + + beat_schedule = { + 'clean_embedding_cache_task': { + 'task': 'schedule.clean_embedding_cache_task.clean_embedding_cache_task', + 'schedule': timedelta(minutes=1), + }, + 'clean_unused_datasets_task': { + 'task': 'schedule.clean_unused_datasets_task.clean_unused_datasets_task', + 'schedule': timedelta(minutes=10), + } + } + celery_app.conf.update( + beat_schedule=beat_schedule, + imports=imports + ) + return celery_app diff --git a/api/requirements.txt b/api/requirements.txt index b7f5b16a9e..f4dff1a5cf 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -57,4 +57,4 @@ cohere~=4.32 unstructured~=0.10.27 unstructured[docx,pptx,msg,md,ppt]~=0.10.27 bs4~=0.0.1 -markdown~=3.5.1 \ No newline at end of file +markdown~=3.5.1 diff --git a/api/schedule/clean_embedding_cache_task.py b/api/schedule/clean_embedding_cache_task.py new file mode 100644 index 0000000000..1caed9e02e --- /dev/null +++ b/api/schedule/clean_embedding_cache_task.py @@ -0,0 +1,29 @@ +import app +import datetime +import time +import click +from flask import current_app +from werkzeug.exceptions import NotFound +from extensions.ext_database import db +from models.dataset import Embedding + + +@app.celery.task(queue='dataset') +def clean_embedding_cache_task(): + click.echo(click.style('Start clean embedding cache.', fg='green')) + clean_days = int(current_app.config.get('CLEAN_DAY_SETTING')) + start_at = time.perf_counter() + thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) + page = 1 + while True: + try: + embeddings = db.session.query(Embedding).filter(Embedding.created_at < thirty_days_ago) \ + .order_by(Embedding.created_at.desc()).paginate(page=page, per_page=100) + except NotFound: + break + for embedding in embeddings: + db.session.delete(embedding) + db.session.commit() + page += 1 + end_at = time.perf_counter() + click.echo(click.style('Cleaned embedding cache from db success latency: {}'.format(end_at - start_at), fg='green')) diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py new file mode 100644 index 0000000000..eb95cc5da2 --- /dev/null +++ b/api/schedule/clean_unused_datasets_task.py @@ -0,0 +1,69 @@ +import logging +import app +import datetime +import time +import click +from flask import current_app +from werkzeug.exceptions import NotFound +from core.index.index import IndexBuilder +from extensions.ext_database import db +from models.dataset import Dataset, DatasetQuery, Document, DatasetCollectionBinding + + +@app.celery.task(queue='dataset') +def clean_unused_datasets_task(): + click.echo(click.style('Start clean unused datasets indexes.', fg='green')) + clean_days = int(current_app.config.get('CLEAN_DAY_SETTING')) + start_at = time.perf_counter() + thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) + page = 1 + while True: + try: + datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \ + .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50) + except NotFound: + break + page += 1 + for dataset in datasets: + dataset_query = db.session.query(DatasetQuery).filter( + DatasetQuery.created_at > thirty_days_ago, + DatasetQuery.dataset_id == dataset.id + ).all() + if not dataset_query or len(dataset_query) == 0: + documents = db.session.query(Document).filter( + Document.dataset_id == dataset.id, + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + Document.updated_at > thirty_days_ago + ).all() + if not documents or len(documents) == 0: + try: + # remove index + vector_index = IndexBuilder.get_index(dataset, 'high_quality') + kw_index = IndexBuilder.get_index(dataset, 'economy') + # delete from vector index + if vector_index: + if dataset.collection_binding_id: + vector_index.delete_by_group_id(dataset.id) + else: + if dataset.collection_binding_id: + vector_index.delete_by_group_id(dataset.id) + else: + vector_index.delete() + kw_index.delete() + # update document + update_params = { + Document.enabled: False + } + + Document.query.filter_by(dataset_id=dataset.id).update(update_params) + db.session.commit() + click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id), + fg='green')) + except Exception as e: + click.echo( + click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), + fg='red')) + end_at = time.perf_counter() + click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))