From a4ab8f225cb3070b2bda2d19cda6d59ff3ebc482 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 12 Nov 2024 17:21:44 +0800 Subject: [PATCH 1/2] add message clean task --- api/configs/feature/__init__.py | 5 ++ api/extensions/ext_celery.py | 5 ++ api/models/model.py | 1 + api/schedule/clean_messages.py | 64 ++++++++++++++++++++++ api/schedule/clean_unused_datasets_task.py | 14 ++--- 5 files changed, 81 insertions(+), 8 deletions(-) create mode 100644 api/schedule/clean_messages.py diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index f368a19469..99f86be12e 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -616,6 +616,11 @@ class DataSetConfig(BaseSettings): default=False, ) + PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING: PositiveInt = Field( + description="Interval in days for message cleanup operations - plan: sandbox", + default=30, + ) + class WorkspaceConfig(BaseSettings): """ diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 42012eee8e..1b78e36a57 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -68,6 +68,7 @@ def init_app(app: Flask) -> Celery: "schedule.clean_unused_datasets_task", "schedule.create_tidb_serverless_task", "schedule.update_tidb_serverless_status_task", + "schedule.clean_messages", ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME beat_schedule = { @@ -87,6 +88,10 @@ def init_app(app: Flask) -> Celery: "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", "schedule": crontab(minute="30", hour="*"), }, + "clean_messages": { + "task": "schedule.clean_messages.clean_messages", + "schedule": timedelta(days=day), + }, } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) diff --git a/api/models/model.py b/api/models/model.py index e909d53e3e..b7c89ce97c 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -719,6 +719,7 @@ class Message(db.Model): db.Index("message_end_user_idx", "app_id", "from_source", "from_end_user_id"), db.Index("message_account_idx", "app_id", "from_source", "from_account_id"), db.Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"), + db.Index("message_created_at_idx", "created_at"), ) id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()")) diff --git a/api/schedule/clean_messages.py b/api/schedule/clean_messages.py new file mode 100644 index 0000000000..60de154b41 --- /dev/null +++ b/api/schedule/clean_messages.py @@ -0,0 +1,64 @@ +import datetime +import time + +import click + +import app +from configs import dify_config +from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService +from extensions.ext_database import db +from models.account import Tenant +from models.model import App, Message, MessageAgentThought, MessageAnnotation, MessageChain, MessageFeedback, MessageFile +from models.web import SavedMessage +from services.feature_service import FeatureService +from extensions.ext_redis import redis_client +from werkzeug.exceptions import NotFound + +@app.celery.task(queue="dataset") +def clean_messages(): + click.echo(click.style("Start clean messages.", fg="green")) + start_at = time.perf_counter() + plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING) + page = 1 + while True: + try: + # Main query with join and filter + messages = ( + db.session.query(Message) + .filter(Message.created_at < plan_sandbox_clean_message_day) + .order_by(Message.created_at.desc()) + .paginate(page=page, per_page=100) + ) + + except NotFound: + break + if messages.items is None or len(messages.items) == 0: + break + for message in messages.items: + app = App.query.filter_by(id=message.app_id).first() + features_cache_key = f"features:{app.tenant_id}" + plan_cache = redis_client.get(features_cache_key) + if plan_cache is None: + features = FeatureService.get_features(app.tenant_id) + redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) + plan = features.billing.subscription.plan + else: + plan = plan_cache.decode() + if plan == "sandbox": + # clean related message + db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete( + synchronize_session=False + ) + db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete( + synchronize_session=False + ) + db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(synchronize_session=False) + db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete( + synchronize_session=False + ) + db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(synchronize_session=False) + db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(synchronize_session=False) + db.session.query(Message).filter(Message.id == message.id).delete() + db.session.commit() + end_at = time.perf_counter() + click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green")) diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py index 100fd8dfab..e12be649e4 100644 --- a/api/schedule/clean_unused_datasets_task.py +++ b/api/schedule/clean_unused_datasets_task.py @@ -22,7 +22,6 @@ def clean_unused_datasets_task(): start_at = time.perf_counter() plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting) plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting) - page = 1 while True: try: # Subquery for counting new documents @@ -62,14 +61,13 @@ def clean_unused_datasets_task(): func.coalesce(document_subquery_old.c.document_count, 0) > 0, ) .order_by(Dataset.created_at.desc()) - .paginate(page=page, per_page=50) + .paginate(page=1, per_page=50) ) except NotFound: break if datasets.items is None or len(datasets.items) == 0: break - page += 1 for dataset in datasets: dataset_query = ( db.session.query(DatasetQuery) @@ -92,7 +90,6 @@ def clean_unused_datasets_task(): click.echo( click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") ) - page = 1 while True: try: # Subquery for counting new documents @@ -132,14 +129,13 @@ def clean_unused_datasets_task(): func.coalesce(document_subquery_old.c.document_count, 0) > 0, ) .order_by(Dataset.created_at.desc()) - .paginate(page=page, per_page=50) + .paginate(page=1, per_page=50) ) except NotFound: break if datasets.items is None or len(datasets.items) == 0: break - page += 1 for dataset in datasets: dataset_query = ( db.session.query(DatasetQuery) @@ -149,11 +145,13 @@ def clean_unused_datasets_task(): if not dataset_query or len(dataset_query) == 0: try: features_cache_key = f"features:{dataset.tenant_id}" - plan = redis_client.get(features_cache_key) - if plan is None: + plan_cache = redis_client.get(features_cache_key) + if plan_cache is None: features = FeatureService.get_features(dataset.tenant_id) redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) plan = features.billing.subscription.plan + else: + plan = plan_cache.decode() if plan == "sandbox": # remove index index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() From 4f389dab9d9ef60e494725e57f2e72d5f2a035d3 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 12 Nov 2024 17:27:47 +0800 Subject: [PATCH 2/2] add message clean task --- ...832f7_add_created_at_index_for_messages.py | 31 ++++++++++++++++ api/schedule/clean_messages.py | 35 +++++++++++++------ 2 files changed, 56 insertions(+), 10 deletions(-) create mode 100644 api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py diff --git a/api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py b/api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py new file mode 100644 index 0000000000..d94508edcf --- /dev/null +++ b/api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py @@ -0,0 +1,31 @@ +"""add_created_at_index_for_messages + +Revision ID: 01d6889832f7 +Revises: 09a8d1878d9b +Create Date: 2024-11-12 09:25:05.527827 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '01d6889832f7' +down_revision = '09a8d1878d9b' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.create_index('message_created_at_idx', ['created_at'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.drop_index('message_created_at_idx') + # ### end Alembic commands ### diff --git a/api/schedule/clean_messages.py b/api/schedule/clean_messages.py index 60de154b41..72ee2a8901 100644 --- a/api/schedule/clean_messages.py +++ b/api/schedule/clean_messages.py @@ -2,23 +2,32 @@ import datetime import time import click +from werkzeug.exceptions import NotFound import app from configs import dify_config -from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService from extensions.ext_database import db -from models.account import Tenant -from models.model import App, Message, MessageAgentThought, MessageAnnotation, MessageChain, MessageFeedback, MessageFile +from extensions.ext_redis import redis_client +from models.model import ( + App, + Message, + MessageAgentThought, + MessageAnnotation, + MessageChain, + MessageFeedback, + MessageFile, +) from models.web import SavedMessage from services.feature_service import FeatureService -from extensions.ext_redis import redis_client -from werkzeug.exceptions import NotFound + @app.celery.task(queue="dataset") def clean_messages(): click.echo(click.style("Start clean messages.", fg="green")) start_at = time.perf_counter() - plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING) + plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta( + days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING + ) page = 1 while True: try: @@ -35,7 +44,7 @@ def clean_messages(): if messages.items is None or len(messages.items) == 0: break for message in messages.items: - app = App.query.filter_by(id=message.app_id).first() + app = App.query.filter_by(id=message.app_id).first() features_cache_key = f"features:{app.tenant_id}" plan_cache = redis_client.get(features_cache_key) if plan_cache is None: @@ -52,12 +61,18 @@ def clean_messages(): db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete( synchronize_session=False ) - db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(synchronize_session=False) + db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete( + synchronize_session=False + ) db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete( synchronize_session=False ) - db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(synchronize_session=False) - db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(synchronize_session=False) + db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete( + synchronize_session=False + ) + db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete( + synchronize_session=False + ) db.session.query(Message).filter(Message.id == message.id).delete() db.session.commit() end_at = time.perf_counter()