You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Tested the following code with
python 3.11,
langchain 0.1.13
langchain-community 0.0.29
opensearch-py 2.4.2
from time import time
from typing import List, Optional
import json
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
)
from opensearchpy import OpenSearch
from logs.logger import Log #my custom logger class
class OpenSearchChatMessageHistory(Log, BaseChatMessageHistory):
"""Chat message history that stores history in OpenSearch.
Args:
index (str): Name of the index to use.
session_id (str): Arbitrary key that is used to store the messages
of a single chat session.
opensearch_url (Optional[str]): URL of the OpenSearch instance to connect to.
Defaults to "http://localhost:9200".
ensure_ascii (Optional[bool]): Used to escape ASCII symbols in json.dumps.
Defaults to True.
"""
def __init__(
self,
index: str,
session_id: str,
opensearch_url: Optional[str] = "http://localhost:9200",
ensure_ascii: Optional[bool] = True,
) -> None:
super().__init__()
self.log_info("Initializing the OpenSearchChatMessageHistory class.")
self.index: str = index
self.session_id: str = session_id
self.ensure_ascii: bool = ensure_ascii
self.client: OpenSearch = OpenSearch([opensearch_url])
if self.client.indices.exists(index=index):
self.log_info(
f"Chat history index '{index}' already exists, skipping creation."
)
else:
self.log_info(f"Creating index '{index}' for storing chat history.")
self.client.indices.create(
index=index,
body={
"mappings": {
"properties": {
"session_id": {"type": "keyword"},
"created_at": {"type": "date"},
"history": {"type": "text"},
}
}
},
)
self.log_info("OpenSearchChatMessageHistory class initialized successfully.")
@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from OpenSearch."""
self.log_info("Loading messages from OpenSearch to buffer.")
result = self.client.search(
index=self.index,
body={
"query": {
"term": {
"session_id": self.session_id
}
}
},
sort="created_at:asc",
)
items = [
json.loads(document["_source"]["history"])
for document in result.get("hits", {}).get("hits", [])
] if result else []
self.log_info("Messages loaded from OpenSearch to buffer.")
return [messages_from_dict(item) for item in items]
def add_message(self, message: BaseMessage) -> None:
"""Add a message to the chat session in OpenSearch."""
self.log_info("Adding messages to OpenSearch.")
self.client.index(
index=self.index,
body={
"session_id": self.session_id,
"created_at": round(time() * 1000),
"history": json.dumps(
message_to_dict(message),
ensure_ascii=self.ensure_ascii,
),
},
refresh=True,
)
self.log_info("Messages added to OpenSearch.")
def clear(self) -> None:
"""Clear session memory in OpenSearch."""
self.log_info("Purging data in OpenSearch started.")
self.client.delete_by_query(
index=self.index,
body={
"query": {
"term": {
"session_id": self.session_id
}
}
},
refresh=True,
)
self.log_info("OpenSearch data purged.")
please add this or similar feature to langchain. It is useful when performing Chat conversations ConversationSummaryBufferMemory or ConversationalRetrievalChain from the langchain implementations.
The text was updated successfully, but these errors were encountered:
Tested the following code with
python 3.11,
langchain 0.1.13
langchain-community 0.0.29
opensearch-py 2.4.2
please add this or similar feature to langchain. It is useful when performing Chat conversations ConversationSummaryBufferMemory or ConversationalRetrievalChain from the langchain implementations.
The text was updated successfully, but these errors were encountered: