From b0b0cc045f637893629efded805d465277ff1b8a Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Thu, 28 Mar 2024 17:02:35 +0800 Subject: [PATCH] add mutil-thread document embedding (#3016) Co-authored-by: jyong --- api/core/indexing_runner.py | 49 +++++++++++++------ .../unstructured_doc_extractor.py | 2 +- .../unstructured_eml_extractor.py | 2 +- .../unstructured_markdown_extractor.py | 2 +- .../unstructured_msg_extractor.py | 2 +- .../unstructured_text_extractor.py | 2 +- .../unstructured_xml_extractor.py | 2 +- 7 files changed, 39 insertions(+), 22 deletions(-) diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index dd46aa27dc..94c7d18c55 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -1,3 +1,4 @@ +import concurrent.futures import datetime import json import logging @@ -650,17 +651,44 @@ class IndexingRunner: # chunk nodes by chunk size indexing_start_at = time.perf_counter() tokens = 0 - chunk_size = 100 + chunk_size = 10 embedding_model_type_instance = None if embedding_model_instance: embedding_model_type_instance = embedding_model_instance.model_type_instance embedding_model_type_instance = cast(TextEmbeddingModel, embedding_model_type_instance) + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + futures = [] + for i in range(0, len(documents), chunk_size): + chunk_documents = documents[i:i + chunk_size] + futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor, + chunk_documents, dataset, + dataset_document, embedding_model_instance, + embedding_model_type_instance)) - for i in range(0, len(documents), chunk_size): + for future in futures: + tokens += future.result() + + indexing_end_at = time.perf_counter() + + # update document status to completed + self._update_document_index_status( + document_id=dataset_document.id, + after_indexing_status="completed", + extra_update_params={ + DatasetDocument.tokens: tokens, + DatasetDocument.completed_at: datetime.datetime.utcnow(), + DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at, + } + ) + + def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document, + embedding_model_instance, embedding_model_type_instance): + with flask_app.app_context(): # check document is paused self._check_document_paused_status(dataset_document.id) - chunk_documents = documents[i:i + chunk_size] + + tokens = 0 if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance: tokens += sum( embedding_model_type_instance.get_num_tokens( @@ -670,9 +698,9 @@ class IndexingRunner: ) for document in chunk_documents ) + # load index index_processor.load(dataset, chunk_documents) - db.session.add(dataset) document_ids = [document.metadata['doc_id'] for document in chunk_documents] db.session.query(DocumentSegment).filter( @@ -687,18 +715,7 @@ class IndexingRunner: db.session.commit() - indexing_end_at = time.perf_counter() - - # update document status to completed - self._update_document_index_status( - document_id=dataset_document.id, - after_indexing_status="completed", - extra_update_params={ - DatasetDocument.tokens: tokens, - DatasetDocument.completed_at: datetime.datetime.utcnow(), - DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at, - } - ) + return tokens def _check_document_paused_status(self, document_id: str): indexing_cache_key = 'document_{}_is_paused'.format(document_id) diff --git a/api/core/rag/extractor/unstructured/unstructured_doc_extractor.py b/api/core/rag/extractor/unstructured/unstructured_doc_extractor.py index b37981a30d..34a4e85e97 100644 --- a/api/core/rag/extractor/unstructured/unstructured_doc_extractor.py +++ b/api/core/rag/extractor/unstructured/unstructured_doc_extractor.py @@ -53,7 +53,7 @@ class UnstructuredWordExtractor(BaseExtractor): elements = partition_docx(filename=self._file_path) from unstructured.chunking.title import chunk_by_title - chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=0) + chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=2000) documents = [] for chunk in chunks: text = chunk.text.strip() diff --git a/api/core/rag/extractor/unstructured/unstructured_eml_extractor.py b/api/core/rag/extractor/unstructured/unstructured_eml_extractor.py index 1d92bbbee6..f6ae8fad53 100644 --- a/api/core/rag/extractor/unstructured/unstructured_eml_extractor.py +++ b/api/core/rag/extractor/unstructured/unstructured_eml_extractor.py @@ -43,7 +43,7 @@ class UnstructuredEmailExtractor(BaseExtractor): pass from unstructured.chunking.title import chunk_by_title - chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=0) + chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=2000) documents = [] for chunk in chunks: text = chunk.text.strip() diff --git a/api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py b/api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py index 3ac04ddc17..3d63446fef 100644 --- a/api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py +++ b/api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py @@ -38,7 +38,7 @@ class UnstructuredMarkdownExtractor(BaseExtractor): elements = partition_md(filename=self._file_path, api_url=self._api_url) from unstructured.chunking.title import chunk_by_title - chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=0) + chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=2000) documents = [] for chunk in chunks: text = chunk.text.strip() diff --git a/api/core/rag/extractor/unstructured/unstructured_msg_extractor.py b/api/core/rag/extractor/unstructured/unstructured_msg_extractor.py index d4b72e37eb..34d3e8021a 100644 --- a/api/core/rag/extractor/unstructured/unstructured_msg_extractor.py +++ b/api/core/rag/extractor/unstructured/unstructured_msg_extractor.py @@ -28,7 +28,7 @@ class UnstructuredMsgExtractor(BaseExtractor): elements = partition_msg(filename=self._file_path, api_url=self._api_url) from unstructured.chunking.title import chunk_by_title - chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=0) + chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=2000) documents = [] for chunk in chunks: text = chunk.text.strip() diff --git a/api/core/rag/extractor/unstructured/unstructured_text_extractor.py b/api/core/rag/extractor/unstructured/unstructured_text_extractor.py index 5af21b2b1d..cc67f2b866 100644 --- a/api/core/rag/extractor/unstructured/unstructured_text_extractor.py +++ b/api/core/rag/extractor/unstructured/unstructured_text_extractor.py @@ -28,7 +28,7 @@ class UnstructuredTextExtractor(BaseExtractor): elements = partition_text(filename=self._file_path, api_url=self._api_url) from unstructured.chunking.title import chunk_by_title - chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=0) + chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=2000) documents = [] for chunk in chunks: text = chunk.text.strip() diff --git a/api/core/rag/extractor/unstructured/unstructured_xml_extractor.py b/api/core/rag/extractor/unstructured/unstructured_xml_extractor.py index b08ff63a1c..5600fb075d 100644 --- a/api/core/rag/extractor/unstructured/unstructured_xml_extractor.py +++ b/api/core/rag/extractor/unstructured/unstructured_xml_extractor.py @@ -28,7 +28,7 @@ class UnstructuredXmlExtractor(BaseExtractor): elements = partition_xml(filename=self._file_path, xml_keep_tags=True, api_url=self._api_url) from unstructured.chunking.title import chunk_by_title - chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=0) + chunks = chunk_by_title(elements, max_characters=2000, combine_text_under_n_chars=2000) documents = [] for chunk in chunks: text = chunk.text.strip()