Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/sigkill temp fix #46

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions app/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,36 @@
)


@celery.task(name="index_full_file_content")
def index_full_file_content(content_base):
from app.main import main_app
print(f"[+ Index Full File Content: {content_base} +]")

file_downloader = S3FileDownloader(
os.environ.get("AWS_STORAGE_ACCESS_KEY"),
os.environ.get("AWS_STORAGE_SECRET_KEY")
)
content_base_indexer = main_app.content_base_indexer
text_splitter = TextSplitter(character_text_splitter())
manager = IndexerFileManager(
file_downloader,
content_base_indexer,
text_splitter,
)
index_result: bool = manager.index_full_text(content_base)
print(f"[+ Index Full File Result: {index_result} +]")
# TODO: retry indexing full text or delete embeddings
NexusRESTClient().index_succedded(
task_succeded=index_result,
nexus_task_uuid=content_base.get("task_uuid"),
file_type=content_base.get("extension_file")
)


@celery.task(name="index_file")
def index_file_data(content_base: Dict) -> bool:
from app.main import main_app
print(f"[+ Index File Data: {content_base} +]")

file_downloader = S3FileDownloader(
os.environ.get("AWS_STORAGE_ACCESS_KEY"),
Expand All @@ -33,16 +60,22 @@ def index_file_data(content_base: Dict) -> bool:
content_base_indexer,
text_splitter,
)
index_result: bool = manager.index_file_url(content_base)
embbed_result: bool = content_base_indexer.check_if_doc_was_embedded_document(
file_uuid=content_base.get("file_uuid"),
content_base_uuid=str(content_base.get('content_base')),
)
index_result: bool = manager.index(content_base)

index_result = index_result and embbed_result
print(f"[+ Index File URL result: {index_result} +]")

if index_result:
embbed_result: bool = content_base_indexer.check_if_doc_was_embedded_document(
file_uuid=content_base.get("file_uuid"),
content_base_uuid=str(content_base.get('content_base')),
)
print(f"[+ File was embbeded: {embbed_result} +]")
if embbed_result:
index_full_file_content.delay(content_base)
return

NexusRESTClient().index_succedded(
task_succeded=index_result,
task_succeded=False,
nexus_task_uuid=content_base.get("task_uuid"),
file_type=content_base.get("extension_file")
)
Expand Down
17 changes: 15 additions & 2 deletions app/indexer/content_bases.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import os

from langchain.docstore.document import Document

from app.handlers.products import Product
from app.indexer import IDocumentIndexer
from app.store import IStorage
from typing import List
from uuid import UUID
from app.rerank import rerank_chunks


class ContentBaseIndexer(IDocumentIndexer):
def __init__(self, storage: IStorage):
self.storage = storage

def index_documents(self, docs: List[Document]):
DOCUMENTS_BATCH_SIZE: int = os.environ.get("DOCUMENTS_BATCH_SIZE", 500)
docs_size: int = len(docs)

file_uuid = docs[0].metadata["file_uuid"]
content_base_uuid = docs[0].metadata["content_base_uuid"]

Expand All @@ -24,7 +30,10 @@ def index_documents(self, docs: List[Document]):
ids = [item["_id"] for item in results]
self.storage.delete(ids=ids)

return self.storage.save(docs)
for i in range(0, docs_size, DOCUMENTS_BATCH_SIZE):
self.storage.save(docs[i:DOCUMENTS_BATCH_SIZE + i])

return

def index(self, texts: List, metadatas: dict):
results = self._search_docs_by_content_base_uuid(
Expand Down Expand Up @@ -53,15 +62,19 @@ def search(self, search, filter=None, threshold=0.1) -> list[Product]:

for doc in matched_responses:
full_page = doc.metadata.get("full_page")

if full_page not in seen:
seen.add(full_page)
return_list.append({
"text": full_page,
"full_page": full_page,
"filename": doc.metadata.get("filename"),
"file_uuid": doc.metadata.get("file_uuid"),
})

return return_list
chunks = rerank_chunks(search, return_list, 0.4, 5)

return chunks

def _search_docs_by_content_base_uuid(
self,
Expand Down
67 changes: 55 additions & 12 deletions app/indexer/indexer_file_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from app.loaders import (
load_file_and_get_raw_text,
load_file_url_and_get_raw_text,
load_file_url_and_split_text
load_file_url_and_split_text,
load_file_and_get_chunks
)
from app.text_splitters import get_split_text
from typing import Dict, List
Expand All @@ -14,8 +15,9 @@

def get_file_metadata(content_base: Dict) -> Dict[str, str]:
return {
'source': content_base.get("filename"),
"content_base_uuid": str(content_base.get('content_base'))
"content_base_uuid": str(content_base.get('content_base')),
"filename": content_base.get("filename"),
"file_uuid": content_base.get("file_uuid")
}


Expand Down Expand Up @@ -48,13 +50,36 @@ def __init__(self,
self.content_base_indexer = content_base_indexer
self.text_splitter = text_splitter

def index(self, content_base, **kwargs) -> bool:
print(f"[+ File {content_base.get('file_uuid')} +]")
try:
load_type = content_base.get("load_type")
metadata = get_file_metadata(content_base)
docs: List[Document]
docs, _ = load_file_and_get_chunks(
content_base.get("file"),
content_base.get('extension_file'),
metadata,
load_type=load_type
)

try:
self.content_base_indexer.index_documents(docs)
return True
except Exception as e:
logger.exception(e)
return False
except Exception as e: # TODO: handle exceptions
logger.exception(e)
return False

def index_file_url(self, content_base, **kwargs) -> bool:
print(f"[+ File {content_base.get('file_uuid')} +]")

load_type = content_base.get("load_type")

docs: List[Document]
full_content: str

docs, full_content = load_file_url_and_split_text(
docs, _ = load_file_url_and_split_text(
content_base.get("file"),
content_base.get('extension_file'),
self.text_splitter,
Expand All @@ -63,12 +88,6 @@ def index_file_url(self, content_base, **kwargs) -> bool:
document_pages: List[Document] = add_file_metadata(docs, content_base)
try:
self.content_base_indexer.index_documents(document_pages)
self.content_base_indexer.index_doc_content(
full_content=full_content,
content_base_uuid=str(content_base.get('content_base')),
filename=content_base.get("filename"),
file_uuid=content_base.get("file_uuid"),
)
return True
except Exception as e: # TODO: handle exceptions
logger.exception(e)
Expand Down Expand Up @@ -109,3 +128,27 @@ def index_file(self, content_base):
except Exception as e: # TODO: handle exceptions
logger.exception(e)
return False

def index_full_text(self, content_base, **kwargs):
full_content: str
load_type = content_base.get("load_type")

try:
_, full_content = load_file_url_and_split_text(
content_base.get("file"),
content_base.get('extension_file'),
self.text_splitter,
load_type=load_type,
return_split_text=False,
return_full_content=True,
)
self.content_base_indexer.index_doc_content(
full_content=full_content,
content_base_uuid=str(content_base.get('content_base')),
filename=content_base.get("filename"),
file_uuid=content_base.get("file_uuid"),
)
return True
except Exception as e:
logger.exception(e)
return False
41 changes: 38 additions & 3 deletions app/loaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
URLsLoader,
)
from langchain.schema.document import Document
from typing import List
from typing import List, Dict
from app.text_splitters import ITextSplitter
from typing import Tuple

Expand Down Expand Up @@ -73,9 +73,10 @@ def load_file_url_and_split_text(
file_url: str,
file_type: str,
text_splitter: ITextSplitter,
return_split_text: bool = True,
return_full_content: bool = False,
**kwargs
) -> Tuple[List[Document], str]:

load_type = kwargs.get("load_type", None)

loader = supported_loaders_cls.get(file_type)
Expand All @@ -84,4 +85,38 @@ def load_file_url_and_split_text(
file=file_url,
load_type=load_type
)
return data_loader.load_and_split_text(text_splitter)
return data_loader.load_and_split_text(
text_splitter,
return_split_text,
return_full_content,
)


def load_file_and_get_chunks(
file_url: str,
file_type: str,
metadata: Dict,
return_split_text: bool = True,
return_full_content: bool = False,
**kwargs
) -> Tuple[List[Document], str]:

print("=================================================")
print("[+ load_file_and_get_chunks + ]")
print(f"[+ return_split_text: {return_split_text} +]")
print(f"[+ return_full_content: {return_full_content} +]")
print("=================================================")

load_type = kwargs.get("load_type", None)
loader = supported_loaders_cls.get(file_type)

data_loader = DataLoaderCls(
loader=loader,
file=file_url,
load_type=load_type
)
return data_loader.load_and_get_chunks(
metadata,
return_split_text,
return_full_content
)
Loading
Loading