From 76052b6cf9a9dbb8ce5eb07bc84dcb9cd8e731a8 Mon Sep 17 00:00:00 2001 From: Eduard van Valkenburg Date: Fri, 15 Nov 2024 16:33:36 +0100 Subject: [PATCH] Python: Introducing search for CosmosDB NoSQL Collections (#9698) ### Motivation and Context Adds the search functions for Cosmos DB NoSQL. Closes #6835 ### Description Adds the search pieces and turns out the upsert cannot work cross partition key with execute batch, so refactored to using gather(upsert). ### Contribution Checklist - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone :smile: --------- Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com> --- python/samples/concepts/memory/new_memory.py | 3 +- .../azure_cosmos_db_no_sql_base.py | 4 +- .../azure_cosmos_db_no_sql_collection.py | 127 ++++++++++++++++-- .../azure_cosmos_db_no_sql_settings.py | 2 +- .../memory/azure_cosmos_db/utils.py | 37 ----- .../test_azure_cosmos_db_no_sql_collection.py | 14 +- 6 files changed, 125 insertions(+), 62 deletions(-) diff --git a/python/samples/concepts/memory/new_memory.py b/python/samples/concepts/memory/new_memory.py index 783650b92219..11f8d3b20b51 100644 --- a/python/samples/concepts/memory/new_memory.py +++ b/python/samples/concepts/memory/new_memory.py @@ -105,7 +105,7 @@ class DataModelList: collection_name = "test" # Depending on the vector database, the index kind and distance function may need to be adjusted, # since not all combinations are supported by all databases. -DataModel = get_data_model_array(IndexKind.HNSW, DistanceFunction.COSINE_DISTANCE) +DataModel = get_data_model_array(IndexKind.HNSW, DistanceFunction.COSINE_SIMILARITY) # A list of VectorStoreRecordCollection that can be used. # Available collections are: @@ -159,7 +159,6 @@ class DataModelList: ), "azure_cosmos_nosql": lambda: AzureCosmosDBNoSQLCollection( data_model_type=DataModel, - database_name="sample_database", collection_name=collection_name, create_database=True, ), diff --git a/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_base.py b/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_base.py index 153fbf6cba1e..61688a462d7e 100644 --- a/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_base.py +++ b/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_base.py @@ -91,7 +91,9 @@ async def _does_database_exist(self) -> bool: except CosmosResourceNotFoundError: return False except Exception as e: - raise MemoryConnectorResourceNotFound(f"Failed to check if database '{self.database_name}' exists.") from e + raise MemoryConnectorResourceNotFound( + f"Failed to check if database '{self.database_name}' exists, with message {e}" + ) from e async def _get_database_proxy(self, **kwargs) -> DatabaseProxy: """Gets the database proxy.""" diff --git a/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_collection.py b/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_collection.py index 780d874929d8..00a6813c1802 100644 --- a/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_collection.py +++ b/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_collection.py @@ -20,14 +20,22 @@ ) from semantic_kernel.connectors.memory.azure_cosmos_db.const import COSMOS_ITEM_ID_PROPERTY_NAME from semantic_kernel.connectors.memory.azure_cosmos_db.utils import ( - build_query_parameters, create_default_indexing_policy, create_default_vector_embedding_policy, get_key, get_partition_key, ) +from semantic_kernel.data.filter_clauses.any_tags_equal_to_filter_clause import AnyTagsEqualTo +from semantic_kernel.data.filter_clauses.equal_to_filter_clause import EqualTo +from semantic_kernel.data.kernel_search_results import KernelSearchResults from semantic_kernel.data.record_definition.vector_store_model_definition import VectorStoreRecordDefinition -from semantic_kernel.data.vector_storage.vector_store_record_collection import VectorStoreRecordCollection +from semantic_kernel.data.record_definition.vector_store_record_fields import VectorStoreRecordDataField +from semantic_kernel.data.vector_search.vector_search import VectorSearchBase +from semantic_kernel.data.vector_search.vector_search_filter import VectorSearchFilter +from semantic_kernel.data.vector_search.vector_search_options import VectorSearchOptions +from semantic_kernel.data.vector_search.vector_search_result import VectorSearchResult +from semantic_kernel.data.vector_search.vector_text_search import VectorTextSearchMixin +from semantic_kernel.data.vector_search.vectorized_search import VectorizedSearchMixin from semantic_kernel.exceptions.memory_connector_exceptions import ( MemoryConnectorException, MemoryConnectorResourceNotFound, @@ -41,7 +49,12 @@ @experimental_class -class AzureCosmosDBNoSQLCollection(AzureCosmosDBNoSQLBase, VectorStoreRecordCollection[TKey, TModel]): +class AzureCosmosDBNoSQLCollection( + AzureCosmosDBNoSQLBase, + VectorSearchBase[TKey, TModel], + VectorizedSearchMixin[TModel], + VectorTextSearchMixin[TModel], +): """An Azure Cosmos DB NoSQL collection stores documents in a Azure Cosmos DB NoSQL account.""" partition_key: PartitionKey @@ -106,12 +119,10 @@ async def _inner_upsert( records: Sequence[Any], **kwargs: Any, ) -> Sequence[TKey]: - batch_operations = [("upsert", (record,)) for record in records] - partition_key = [record[self.partition_key.path.strip("/")] for record in records] + container_proxy = await self._get_container_proxy(self.collection_name, **kwargs) try: - container_proxy = await self._get_container_proxy(self.collection_name, **kwargs) - results = await container_proxy.execute_item_batch(batch_operations, partition_key, **kwargs) - return [result["resourceBody"][COSMOS_ITEM_ID_PROPERTY_NAME] for result in results] + results = await asyncio.gather(*(container_proxy.upsert_item(record) for record in records)) + return [result[COSMOS_ITEM_ID_PROPERTY_NAME] for result in results] except CosmosResourceNotFoundError as e: raise MemoryConnectorResourceNotFound( "The collection does not exist yet. Create the collection first." @@ -122,12 +133,15 @@ async def _inner_upsert( @override async def _inner_get(self, keys: Sequence[TKey], **kwargs: Any) -> OneOrMany[Any] | None: include_vectors = kwargs.pop("include_vectors", False) - query, parameters = build_query_parameters(self.data_model_definition, keys, include_vectors) + query = ( + f"SELECT {self._build_select_clause(include_vectors)} FROM c WHERE " # nosec: B608 + f"c.id IN ({', '.join([f'@id{i}' for i in range(len(keys))])})" # nosec: B608 + ) # nosec: B608 + parameters: list[dict[str, Any]] = [{"name": f"@id{i}", "value": get_key(key)} for i, key in enumerate(keys)] + container_proxy = await self._get_container_proxy(self.collection_name, **kwargs) try: - container_proxy = await self._get_container_proxy(self.collection_name, **kwargs) - results = container_proxy.query_items(query=query, parameters=parameters) - return [item async for item in results] + return [item async for item in container_proxy.query_items(query=query, parameters=parameters)] except CosmosResourceNotFoundError as e: raise MemoryConnectorResourceNotFound( "The collection does not exist yet. Create the collection first." @@ -146,6 +160,95 @@ async def _inner_delete(self, keys: Sequence[TKey], **kwargs: Any) -> None: if exceptions: raise MemoryConnectorException("Failed to delete item(s).", exceptions) + @override + async def _inner_search( + self, + options: VectorSearchOptions, + search_text: str | None = None, + vectorizable_text: str | None = None, + vector: list[float | int] | None = None, + **kwargs: Any, + ) -> KernelSearchResults[VectorSearchResult[TModel]]: + params = [{"name": "@top", "value": options.top}] + if search_text is not None: + query = self._build_search_text_query(options) + params.append({"name": "@search_text", "value": search_text}) + elif vector is not None: + query = self._build_vector_query(options) + params.append({"name": "@vector", "value": vector}) + else: + raise ValueError("Either search_text or vector must be provided.") + container_proxy = await self._get_container_proxy(self.collection_name, **kwargs) + try: + results = container_proxy.query_items(query, parameters=params) + except Exception as e: + raise MemoryConnectorException("Failed to search items.") from e + return KernelSearchResults( + results=self._get_vector_search_results_from_results(results, options), + total_count=None, + ) + + def _build_search_text_query(self, options: VectorSearchOptions) -> str: + where_clauses = self._build_where_clauses_from_filter(options.filter) + contains_clauses = " OR ".join( + f"CONTAINS(c.{field}, @search_text)" + for field in self.data_model_definition.fields + if isinstance(field, VectorStoreRecordDataField) and field.is_full_text_searchable + ) + return ( + f"SELECT TOP @top {self._build_select_clause(options.include_vectors)} " # nosec: B608 + f"FROM c WHERE ({contains_clauses}) AND {where_clauses}" # nosec: B608 + ) + + def _build_vector_query(self, options: VectorSearchOptions) -> str: + where_clauses = self._build_where_clauses_from_filter(options.filter) + if where_clauses: + where_clauses = f"WHERE {where_clauses}" + vector_field_name: str = self.data_model_definition.try_get_vector_field(options.vector_field_name).name # type: ignore + return ( + f"SELECT TOP @top {self._build_select_clause(options.include_vectors)}," # nosec: B608 + f" VectorDistance(c.{vector_field_name}, @vector) AS distance FROM c ORDER " # nosec: B608 + f"BY VectorDistance(c.{vector_field_name}, @vector) {where_clauses}" # nosec: B608 + ) + + def _build_select_clause(self, include_vectors: bool) -> str: + """Create the select clause for a CosmosDB query.""" + included_fields = [ + field + for field in self.data_model_definition.field_names + if include_vectors or field not in self.data_model_definition.vector_field_names + ] + if self.data_model_definition.key_field_name != COSMOS_ITEM_ID_PROPERTY_NAME: + # Replace the key field name with the Cosmos item id property name + included_fields = [ + field if field != self.data_model_definition.key_field_name else COSMOS_ITEM_ID_PROPERTY_NAME + for field in included_fields + ] + + return ", ".join(f"c.{field}" for field in included_fields) + + def _build_where_clauses_from_filter(self, filters: VectorSearchFilter | None) -> str: + if filters is None: + return "" + clauses = [] + for filter in filters.filters: + match filter: + case EqualTo(): + clauses.append(f"c.{filter.field_name} = {filter.value}") + case AnyTagsEqualTo(): + clauses.append(f"{filter.value} IN c.{filter.field_name}") + case _: + raise ValueError(f"Unsupported filter: {filter}") + return " AND ".join(clauses) + + @override + def _get_record_from_result(self, result: dict[str, Any]) -> dict[str, Any]: + return result + + @override + def _get_score_from_result(self, result: dict[str, Any]) -> float | None: + return result.get("distance") + @override def _serialize_dicts_to_store_models(self, records: Sequence[dict[str, Any]], **kwargs: Any) -> Sequence[Any]: serialized_records = [] diff --git a/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_settings.py b/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_settings.py index dc098fc7735d..cbdac0036d13 100644 --- a/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_settings.py +++ b/python/semantic_kernel/connectors/memory/azure_cosmos_db/azure_cosmos_db_no_sql_settings.py @@ -13,7 +13,7 @@ class AzureCosmosDBNoSQLSettings(KernelBaseSettings): """Azure CosmosDB NoSQL settings. The settings are first loaded from environment variables with - the prefix 'COSMOS_DB_NOSQL_'. + the prefix 'AZURE_COSMOS_DB_NO_SQL_'. If the environment variables are not found, the settings can be loaded from a .env file with the encoding 'utf-8'. If the settings are not found in the .env file, the settings diff --git a/python/semantic_kernel/connectors/memory/azure_cosmos_db/utils.py b/python/semantic_kernel/connectors/memory/azure_cosmos_db/utils.py index 0ab95951db4d..18b8250a5cc7 100644 --- a/python/semantic_kernel/connectors/memory/azure_cosmos_db/utils.py +++ b/python/semantic_kernel/connectors/memory/azure_cosmos_db/utils.py @@ -2,7 +2,6 @@ import asyncio import contextlib -from collections.abc import Sequence from typing import Any from azure.cosmos.aio import CosmosClient @@ -12,7 +11,6 @@ AzureCosmosDBNoSQLCompositeKey, ) from semantic_kernel.connectors.memory.azure_cosmos_db.const import ( - COSMOS_ITEM_ID_PROPERTY_NAME, DATATYPES_MAPPING, DISTANCE_FUNCTION_MAPPING, INDEX_KIND_MAPPING, @@ -174,41 +172,6 @@ def get_partition_key(key: str | AzureCosmosDBNoSQLCompositeKey) -> str: return key -def build_query_parameters( - data_model_definition: VectorStoreRecordDefinition, - keys: Sequence[str | AzureCosmosDBNoSQLCompositeKey], - include_vectors: bool, -) -> tuple[str, list[dict[str, Any]]]: - """Builds the query and parameters for the Azure Cosmos DB NoSQL query item operation. - - Args: - data_model_definition (VectorStoreRecordDefinition): The definition of the data model. - keys (Sequence[str | AzureCosmosDBNoSQLCompositeKey]): The keys. - include_vectors (bool): Whether to include the vectors in the query. - - Returns: - tuple[str, list[dict[str, str]]]: The query and parameters. - """ - included_fields = [ - field - for field in data_model_definition.field_names - if include_vectors or field not in data_model_definition.vector_field_names - ] - if data_model_definition.key_field_name != COSMOS_ITEM_ID_PROPERTY_NAME: - # Replace the key field name with the Cosmos item id property name - included_fields = [ - field if field != data_model_definition.key_field_name else COSMOS_ITEM_ID_PROPERTY_NAME - for field in included_fields - ] - - select_clause = ", ".join(f"c.{field}" for field in included_fields) - - return ( - f"SELECT {select_clause} FROM c WHERE c.id IN ({', '.join([f'@id{i}' for i in range(len(keys))])})", # nosec: B608 - [{"name": f"@id{i}", "value": get_key(key)} for i, key in enumerate(keys)], - ) - - class CosmosClientWrapper(CosmosClient): """Wrapper to make sure the CosmosClient is closed properly.""" diff --git a/python/tests/unit/connectors/memory/azure_cosmos_db/test_azure_cosmos_db_no_sql_collection.py b/python/tests/unit/connectors/memory/azure_cosmos_db/test_azure_cosmos_db_no_sql_collection.py index d796f2150d00..e0ea3e09be05 100644 --- a/python/tests/unit/connectors/memory/azure_cosmos_db/test_azure_cosmos_db_no_sql_collection.py +++ b/python/tests/unit/connectors/memory/azure_cosmos_db/test_azure_cosmos_db_no_sql_collection.py @@ -9,8 +9,8 @@ from semantic_kernel.connectors.memory.azure_cosmos_db.azure_cosmos_db_no_sql_collection import ( AzureCosmosDBNoSQLCollection, ) +from semantic_kernel.connectors.memory.azure_cosmos_db.const import COSMOS_ITEM_ID_PROPERTY_NAME from semantic_kernel.connectors.memory.azure_cosmos_db.utils import ( - COSMOS_ITEM_ID_PROPERTY_NAME, CosmosClientWrapper, create_default_indexing_policy, create_default_vector_embedding_policy, @@ -397,13 +397,11 @@ async def test_azure_cosmos_db_no_sql_upsert( vector_collection._get_container_proxy = AsyncMock(return_value=mock_container_proxy) - mock_container_proxy.execute_item_batch = AsyncMock( - return_value=[{"resourceBody": {COSMOS_ITEM_ID_PROPERTY_NAME: item["id"]}}] - ) + mock_container_proxy.upsert_item = AsyncMock(return_value={COSMOS_ITEM_ID_PROPERTY_NAME: item["id"]}) result = await vector_collection.upsert(item) - mock_container_proxy.execute_item_batch.assert_called_once_with([("upsert", (item,))], [item["id"]]) + mock_container_proxy.upsert_item.assert_called_once_with(item) assert result == item["id"] @@ -426,13 +424,11 @@ async def test_azure_cosmos_db_no_sql_upsert_without_id( vector_collection._get_container_proxy = AsyncMock(return_value=mock_container_proxy) - mock_container_proxy.execute_item_batch = AsyncMock( - return_value=[{"resourceBody": {COSMOS_ITEM_ID_PROPERTY_NAME: item["key"]}}] - ) + mock_container_proxy.upsert_item = AsyncMock(return_value={COSMOS_ITEM_ID_PROPERTY_NAME: item["key"]}) result = await vector_collection.upsert(item) - mock_container_proxy.execute_item_batch.assert_called_once_with([("upsert", (item_with_id,))], [item["key"]]) + mock_container_proxy.upsert_item.assert_called_once_with(item_with_id) assert result == item["key"]