diff --git a/build.gradle b/build.gradle index edabb70..274040c 100644 --- a/build.gradle +++ b/build.gradle @@ -103,6 +103,7 @@ dependencies { testImplementation "io.grpc:grpc-testing:${grpcVersion}" testImplementation "org.junit.jupiter:junit-jupiter-api:${jUnitVersion}" testImplementation "org.mockito:mockito-core:3.4.0" + testImplementation "org.assertj:assertj-core:3.25.3" testImplementation "org.slf4j:slf4j-nop:${slf4jVersion}" testImplementation "org.testcontainers:qdrant:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" diff --git a/src/main/java/io/qdrant/client/QdrantClient.java b/src/main/java/io/qdrant/client/QdrantClient.java index b311843..31a6e71 100644 --- a/src/main/java/io/qdrant/client/QdrantClient.java +++ b/src/main/java/io/qdrant/client/QdrantClient.java @@ -1,125 +1,32 @@ package io.qdrant.client; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; - -import io.qdrant.client.grpc.Collections.AliasDescription; -import io.qdrant.client.grpc.Collections.AliasOperations; -import io.qdrant.client.grpc.Collections.ChangeAliases; -import io.qdrant.client.grpc.Collections.CollectionDescription; -import io.qdrant.client.grpc.Collections.CollectionExistsRequest; -import io.qdrant.client.grpc.Collections.CollectionExistsResponse; -import io.qdrant.client.grpc.Collections.CollectionInfo; -import io.qdrant.client.grpc.Collections.CollectionOperationResponse; -import io.qdrant.client.grpc.Collections.CreateAlias; -import io.qdrant.client.grpc.Collections.CreateCollection; -import io.qdrant.client.grpc.Collections.CreateShardKeyRequest; -import io.qdrant.client.grpc.Collections.CreateShardKeyResponse; -import io.qdrant.client.grpc.Collections.DeleteAlias; -import io.qdrant.client.grpc.Collections.DeleteCollection; -import io.qdrant.client.grpc.Collections.DeleteShardKeyRequest; -import io.qdrant.client.grpc.Collections.DeleteShardKeyResponse; -import io.qdrant.client.grpc.Collections.GetCollectionInfoRequest; -import io.qdrant.client.grpc.Collections.GetCollectionInfoResponse; -import io.qdrant.client.grpc.Collections.ListAliasesRequest; -import io.qdrant.client.grpc.Collections.ListAliasesResponse; -import io.qdrant.client.grpc.Collections.ListCollectionAliasesRequest; -import io.qdrant.client.grpc.Collections.ListCollectionsRequest; -import io.qdrant.client.grpc.Collections.ListCollectionsResponse; -import io.qdrant.client.grpc.Collections.PayloadIndexParams; -import io.qdrant.client.grpc.Collections.PayloadSchemaType; -import io.qdrant.client.grpc.Collections.RenameAlias; -import io.qdrant.client.grpc.Collections.ShardKey; -import io.qdrant.client.grpc.Collections.UpdateCollection; -import io.qdrant.client.grpc.Collections.VectorParams; -import io.qdrant.client.grpc.Collections.VectorParamsMap; -import io.qdrant.client.grpc.Collections.VectorsConfig; +import io.qdrant.client.grpc.Collections.*; import io.qdrant.client.grpc.CollectionsGrpc; import io.qdrant.client.grpc.JsonWithInt.Value; -import io.qdrant.client.grpc.Points.BatchResult; -import io.qdrant.client.grpc.Points.ClearPayloadPoints; -import io.qdrant.client.grpc.Points.CountPoints; -import io.qdrant.client.grpc.Points.CountResponse; -import io.qdrant.client.grpc.Points.CreateFieldIndexCollection; -import io.qdrant.client.grpc.Points.DeleteFieldIndexCollection; -import io.qdrant.client.grpc.Points.DeletePayloadPoints; -import io.qdrant.client.grpc.Points.DeletePointVectors; -import io.qdrant.client.grpc.Points.DeletePoints; -import io.qdrant.client.grpc.Points.DiscoverBatchPoints; -import io.qdrant.client.grpc.Points.DiscoverBatchResponse; -import io.qdrant.client.grpc.Points.DiscoverPoints; -import io.qdrant.client.grpc.Points.DiscoverResponse; -import io.qdrant.client.grpc.Points.FieldType; -import io.qdrant.client.grpc.Points.Filter; -import io.qdrant.client.grpc.Points.GetPoints; -import io.qdrant.client.grpc.Points.GetResponse; -import io.qdrant.client.grpc.Points.PointGroup; -import io.qdrant.client.grpc.Points.PointId; -import io.qdrant.client.grpc.Points.PointStruct; -import io.qdrant.client.grpc.Points.PointVectors; -import io.qdrant.client.grpc.Points.PointsIdsList; -import io.qdrant.client.grpc.Points.PointsOperationResponse; -import io.qdrant.client.grpc.Points.PointsSelector; -import io.qdrant.client.grpc.Points.PointsUpdateOperation; -import io.qdrant.client.grpc.Points.ReadConsistency; -import io.qdrant.client.grpc.Points.RecommendBatchPoints; -import io.qdrant.client.grpc.Points.RecommendBatchResponse; -import io.qdrant.client.grpc.Points.RecommendGroupsResponse; -import io.qdrant.client.grpc.Points.RecommendPointGroups; -import io.qdrant.client.grpc.Points.RecommendPoints; -import io.qdrant.client.grpc.Points.RecommendResponse; -import io.qdrant.client.grpc.Points.RetrievedPoint; -import io.qdrant.client.grpc.Points.ScoredPoint; -import io.qdrant.client.grpc.Points.ScrollPoints; -import io.qdrant.client.grpc.Points.ScrollResponse; -import io.qdrant.client.grpc.Points.SearchBatchPoints; -import io.qdrant.client.grpc.Points.SearchBatchResponse; -import io.qdrant.client.grpc.Points.SearchGroupsResponse; -import io.qdrant.client.grpc.Points.SearchPointGroups; -import io.qdrant.client.grpc.Points.SearchPoints; -import io.qdrant.client.grpc.Points.SearchResponse; -import io.qdrant.client.grpc.Points.SetPayloadPoints; -import io.qdrant.client.grpc.Points.UpdateBatchPoints; -import io.qdrant.client.grpc.Points.UpdateBatchResponse; -import io.qdrant.client.grpc.Points.UpdatePointVectors; -import io.qdrant.client.grpc.Points.UpdateResult; -import io.qdrant.client.grpc.Points.UpsertPoints; -import io.qdrant.client.grpc.Points.VectorsSelector; -import io.qdrant.client.grpc.Points.WithPayloadSelector; -import io.qdrant.client.grpc.Points.WithVectorsSelector; -import io.qdrant.client.grpc.Points.WriteOrdering; -import io.qdrant.client.grpc.Points.WriteOrderingType; +import io.qdrant.client.grpc.Points.*; import io.qdrant.client.grpc.PointsGrpc; import io.qdrant.client.grpc.QdrantGrpc.QdrantFutureStub; import io.qdrant.client.grpc.QdrantOuterClass.HealthCheckReply; import io.qdrant.client.grpc.QdrantOuterClass.HealthCheckRequest; import io.qdrant.client.grpc.SnapshotsGrpc; -import io.qdrant.client.grpc.SnapshotsService.CreateFullSnapshotRequest; -import io.qdrant.client.grpc.SnapshotsService.CreateSnapshotRequest; -import io.qdrant.client.grpc.SnapshotsService.CreateSnapshotResponse; -import io.qdrant.client.grpc.SnapshotsService.DeleteFullSnapshotRequest; -import io.qdrant.client.grpc.SnapshotsService.DeleteSnapshotRequest; -import io.qdrant.client.grpc.SnapshotsService.DeleteSnapshotResponse; -import io.qdrant.client.grpc.SnapshotsService.ListFullSnapshotsRequest; -import io.qdrant.client.grpc.SnapshotsService.ListSnapshotsRequest; -import io.qdrant.client.grpc.SnapshotsService.ListSnapshotsResponse; -import io.qdrant.client.grpc.SnapshotsService.SnapshotDescription; +import io.qdrant.client.grpc.SnapshotsService.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static io.qdrant.client.futureconverter.FutureConverter.toCompletableFuture; /** * Client for the Qdrant vector database. @@ -153,9 +60,9 @@ public QdrantGrpcClient grpcClient() { /** * Gets detailed information about the qdrant cluster. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture healthCheckAsync() { + public CompletableFuture healthCheckAsync() { return healthCheckAsync(null); } @@ -163,13 +70,13 @@ public ListenableFuture healthCheckAsync() { * Gets detailed information about the qdrant cluster. * * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture healthCheckAsync(@Nullable Duration timeout) { + public CompletableFuture healthCheckAsync(@Nullable Duration timeout) { QdrantFutureStub qdrant = timeout != null ? this.grpcClient.qdrant().withDeadlineAfter(timeout.toMillis(), TimeUnit.MILLISECONDS) : this.grpcClient.qdrant(); - return qdrant.healthCheck(HealthCheckRequest.getDefaultInstance()); + return toCompletableFuture(qdrant.healthCheck(HealthCheckRequest.getDefaultInstance())); } //region Collections @@ -179,9 +86,9 @@ public ListenableFuture healthCheckAsync(@Nullable Duration ti * * @param collectionName The name of the collection. * @param vectorParams The vector parameters - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createCollectionAsync( + public CompletableFuture createCollectionAsync( String collectionName, VectorParams vectorParams) { return createCollectionAsync(collectionName, vectorParams, null); @@ -193,9 +100,9 @@ public ListenableFuture createCollectionAsync( * @param collectionName The name of the collection. * @param vectorParams The vector parameters * @param timeout The timeout for the call - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createCollectionAsync( + public CompletableFuture createCollectionAsync( String collectionName, VectorParams vectorParams, @Nullable Duration timeout) { @@ -213,9 +120,9 @@ public ListenableFuture createCollectionAsync( * * @param collectionName The name of the collection. * @param namedVectorParams The named vector parameters - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createCollectionAsync( + public CompletableFuture createCollectionAsync( String collectionName, Map namedVectorParams) { return createCollectionAsync(collectionName, namedVectorParams, null); @@ -227,9 +134,9 @@ public ListenableFuture createCollectionAsync( * @param collectionName The name of the collection. * @param namedVectorParams The named vector parameters * @param timeout The timeout for the call - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createCollectionAsync( + public CompletableFuture createCollectionAsync( String collectionName, Map namedVectorParams, @Nullable Duration timeout) { @@ -246,9 +153,9 @@ public ListenableFuture createCollectionAsync( * Creates a new collection with the given parameters * * @param createCollection The collection creation parameters - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createCollectionAsync(CreateCollection createCollection) { + public CompletableFuture createCollectionAsync(CreateCollection createCollection) { return createCollectionAsync(createCollection, null); } @@ -257,15 +164,15 @@ public ListenableFuture createCollectionAsync(Creat * * @param createCollection The collection creation parameters * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture } */ - public ListenableFuture createCollectionAsync(CreateCollection createCollection, @Nullable Duration timeout) { + public CompletableFuture createCollectionAsync(CreateCollection createCollection, @Nullable Duration timeout) { String collectionName = createCollection.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Create collection '{}'", collectionName); - ListenableFuture future = getCollections(timeout).create(createCollection); + CompletableFuture future = toCompletableFuture(getCollections(timeout).create(createCollection)); addLogFailureCallback(future, "Create collection"); - return Futures.transform(future, response -> { + return future.thenApplyAsync(response -> { if (!response.getResult()) { logger.error("Collection '{}' could not be created", collectionName); throw new QdrantException("Collection '" + collectionName + "' could not be created"); @@ -279,9 +186,9 @@ public ListenableFuture createCollectionAsync(Creat * * @param collectionName The name of the collection. * @param vectorParams The vector parameters - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture recreateCollectionAsync( + public CompletableFuture recreateCollectionAsync( String collectionName, VectorParams vectorParams) { return recreateCollectionAsync(collectionName, vectorParams, null); @@ -293,9 +200,9 @@ public ListenableFuture recreateCollectionAsync( * @param collectionName The name of the collection. * @param vectorParams The vector parameters * @param timeout The timeout for the call - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture recreateCollectionAsync( + public CompletableFuture recreateCollectionAsync( String collectionName, VectorParams vectorParams, @Nullable Duration timeout) { @@ -313,9 +220,9 @@ public ListenableFuture recreateCollectionAsync( * * @param collectionName The name of the collection. * @param namedVectorParams The named vector parameters - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture recreateCollectionAsync( + public CompletableFuture recreateCollectionAsync( String collectionName, Map namedVectorParams) { return recreateCollectionAsync(collectionName, namedVectorParams, null); @@ -327,9 +234,9 @@ public ListenableFuture recreateCollectionAsync( * @param collectionName The name of the collection. * @param namedVectorParams The named vector parameters * @param timeout The timeout for the call - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture recreateCollectionAsync( + public CompletableFuture recreateCollectionAsync( String collectionName, Map namedVectorParams, @Nullable Duration timeout) { @@ -348,7 +255,7 @@ public ListenableFuture recreateCollectionAsync( * @param createCollection The collection creation parameters * @return a new instance of {@link CollectionOperationResponse} */ - public ListenableFuture recreateCollectionAsync(CreateCollection createCollection) { + public CompletableFuture recreateCollectionAsync(CreateCollection createCollection) { return recreateCollectionAsync(createCollection, null); } @@ -359,9 +266,8 @@ public ListenableFuture recreateCollectionAsync(Cre * @param timeout The timeout for the call. * @return a new instance of {@link CollectionOperationResponse} */ - public ListenableFuture recreateCollectionAsync(CreateCollection createCollection, @Nullable Duration timeout) { - return Futures.transformAsync( - deleteCollectionAsync(createCollection.getCollectionName(), timeout), + public CompletableFuture recreateCollectionAsync(CreateCollection createCollection, @Nullable Duration timeout) { + return deleteCollectionAsync(createCollection.getCollectionName(), timeout).thenComposeAsync( input -> createCollectionAsync(createCollection, timeout), MoreExecutors.directExecutor()); } @@ -370,9 +276,9 @@ public ListenableFuture recreateCollectionAsync(Cre * Gets detailed information about an existing collection. * * @param collectionName The name of the collection. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture getCollectionInfoAsync(String collectionName) { + public CompletableFuture getCollectionInfoAsync(String collectionName) { return getCollectionInfoAsync(collectionName, null); } @@ -381,25 +287,25 @@ public ListenableFuture getCollectionInfoAsync(String collection * * @param collectionName The name of the collection. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture getCollectionInfoAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture getCollectionInfoAsync(String collectionName, @Nullable Duration timeout) { logger.debug("Get collection info for '{}'", collectionName); GetCollectionInfoRequest request = GetCollectionInfoRequest.newBuilder() .setCollectionName(collectionName) .build(); - ListenableFuture future = getCollections(timeout).get(request); + CompletableFuture future = toCompletableFuture(getCollections(timeout).get(request)); addLogFailureCallback(future, "Get collection info"); - return Futures.transform(future, GetCollectionInfoResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(GetCollectionInfoResponse::getResult, MoreExecutors.directExecutor()); } /** * Deletes a collection and all its associated data. * * @param collectionName The name of the collection - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteCollectionAsync(String collectionName) { + public CompletableFuture deleteCollectionAsync(String collectionName) { return deleteCollectionAsync(collectionName, null); } @@ -408,19 +314,19 @@ public ListenableFuture deleteCollectionAsync(Strin * * @param collectionName The name of the collection * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteCollectionAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture deleteCollectionAsync(String collectionName, @Nullable Duration timeout) { Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Delete collection '{}'", collectionName); DeleteCollection deleteCollection = DeleteCollection.newBuilder() .setCollectionName(collectionName) .build(); - ListenableFuture future = getCollections(timeout).delete(deleteCollection); + CompletableFuture future = toCompletableFuture(getCollections(timeout).delete(deleteCollection)); addLogFailureCallback(future, "Delete collection"); - return Futures.transform(future, response -> { + return future.thenApplyAsync(response -> { if (!response.getResult()) { logger.error("Collection '{}' could not be deleted", collectionName); throw new QdrantException("Collection '" + collectionName + "' could not be deleted"); @@ -432,9 +338,9 @@ public ListenableFuture deleteCollectionAsync(Strin /** * Gets the names of all existing collections * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listCollectionsAsync() { + public CompletableFuture> listCollectionsAsync() { return listCollectionsAsync(null); } @@ -442,16 +348,16 @@ public ListenableFuture> listCollectionsAsync() { * Gets the names of all existing collections * * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listCollectionsAsync(@Nullable Duration timeout) { + public CompletableFuture> listCollectionsAsync(@Nullable Duration timeout) { logger.debug("List collections"); - ListenableFuture future = - getCollections(timeout).list(ListCollectionsRequest.getDefaultInstance()); + CompletableFuture future = + toCompletableFuture(getCollections(timeout).list(ListCollectionsRequest.getDefaultInstance())); addLogFailureCallback(future, "List collection"); - return Futures.transform(future, response -> + return future.thenApplyAsync(response -> response.getCollectionsList() .stream() .map(CollectionDescription::getName) @@ -462,9 +368,9 @@ public ListenableFuture> listCollectionsAsync(@Nullable Duration ti * Update parameters of the collection * * @param updateCollection The update parameters. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateCollectionAsync(UpdateCollection updateCollection) { + public CompletableFuture updateCollectionAsync(UpdateCollection updateCollection) { return updateCollectionAsync(updateCollection, null); } @@ -473,16 +379,16 @@ public ListenableFuture updateCollectionAsync(Updat * * @param updateCollection The update parameters. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateCollectionAsync(UpdateCollection updateCollection, @Nullable Duration timeout) { + public CompletableFuture updateCollectionAsync(UpdateCollection updateCollection, @Nullable Duration timeout) { String collectionName = updateCollection.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Update collection '{}'", collectionName); - ListenableFuture future = getCollections(timeout).update(updateCollection); + CompletableFuture future = toCompletableFuture(getCollections(timeout).update(updateCollection)); addLogFailureCallback(future, "Update collection"); - return Futures.transform(future, response -> { + return future.thenApplyAsync(response -> { if (!response.getResult()) { logger.error("Collection '{}' could not be updated", collectionName); throw new QdrantException("Collection '" + collectionName + "' could not be updated"); @@ -495,9 +401,9 @@ public ListenableFuture updateCollectionAsync(Updat * Check if a collection exists * * @param collectionName The name of the collection. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture collectionExistsAsync(String collectionName) { + public CompletableFuture collectionExistsAsync(String collectionName) { return collectionExistsAsync(collectionName, null); } @@ -506,16 +412,16 @@ public ListenableFuture collectionExistsAsync(String collectionName) { * * @param collectionName The name of the collection. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture collectionExistsAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture collectionExistsAsync(String collectionName, @Nullable Duration timeout) { Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Collection exists '{}'", collectionName); - ListenableFuture future = getCollections(timeout) - .collectionExists(CollectionExistsRequest.newBuilder().setCollectionName(collectionName).build()); + CompletableFuture future = toCompletableFuture(getCollections(timeout) + .collectionExists(CollectionExistsRequest.newBuilder().setCollectionName(collectionName).build())); addLogFailureCallback(future, "Collection exists"); - return Futures.transform(future, response -> response.getResult().getExists(), MoreExecutors.directExecutor()); + return future.thenApplyAsync(response -> response.getResult().getExists(), MoreExecutors.directExecutor()); } //endregion @@ -527,9 +433,9 @@ public ListenableFuture collectionExistsAsync(String collectionName, @N * * @param aliasName The alias to be created. * @param collectionName The collection for which the alias is to be created. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createAliasAsync(String aliasName, String collectionName) { + public CompletableFuture createAliasAsync(String aliasName, String collectionName) { return createAliasAsync(aliasName, collectionName, null); } @@ -539,9 +445,9 @@ public ListenableFuture createAliasAsync(String ali * @param aliasName The alias to be created. * @param collectionName The collection for which the alias is to be created. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createAliasAsync(String aliasName, String collectionName, @Nullable Duration timeout) { + public CompletableFuture createAliasAsync(String aliasName, String collectionName, @Nullable Duration timeout) { return updateAliasesAsync(ImmutableList.of(AliasOperations.newBuilder() .setCreateAlias(CreateAlias.newBuilder() .setAliasName(aliasName) @@ -556,9 +462,9 @@ public ListenableFuture createAliasAsync(String ali * * @param oldAliasName The old alias name. * @param newAliasName The new alias name. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture renameAliasAsync(String oldAliasName, String newAliasName) { + public CompletableFuture renameAliasAsync(String oldAliasName, String newAliasName) { return renameAliasAsync(oldAliasName, newAliasName, null); } @@ -568,9 +474,9 @@ public ListenableFuture renameAliasAsync(String old * @param oldAliasName The old alias name. * @param newAliasName The new alias name. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture renameAliasAsync(String oldAliasName, String newAliasName, @Nullable Duration timeout) { + public CompletableFuture renameAliasAsync(String oldAliasName, String newAliasName, @Nullable Duration timeout) { return updateAliasesAsync(ImmutableList.of(AliasOperations.newBuilder() .setRenameAlias(RenameAlias.newBuilder() .setOldAliasName(oldAliasName) @@ -584,9 +490,9 @@ public ListenableFuture renameAliasAsync(String old * Deletes an alias. * * @param aliasName The alias to be deleted. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAliasAsync(String aliasName) { + public CompletableFuture deleteAliasAsync(String aliasName) { return deleteAliasAsync(aliasName, null); } @@ -595,9 +501,9 @@ public ListenableFuture deleteAliasAsync(String ali * * @param aliasName The alias to be deleted. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAliasAsync(String aliasName, @Nullable Duration timeout) { + public CompletableFuture deleteAliasAsync(String aliasName, @Nullable Duration timeout) { return updateAliasesAsync(ImmutableList.of(AliasOperations.newBuilder() .setDeleteAlias(DeleteAlias.newBuilder() .setAliasName(aliasName) @@ -610,9 +516,9 @@ public ListenableFuture deleteAliasAsync(String ali * Update the aliases of existing collections. * * @param aliasOperations The list of operations to perform. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateAliasesAsync(List aliasOperations) { + public CompletableFuture updateAliasesAsync(List aliasOperations) { return updateAliasesAsync(aliasOperations, null); } @@ -621,9 +527,9 @@ public ListenableFuture updateAliasesAsync(List updateAliasesAsync(List aliasOperations, @Nullable Duration timeout) { + public CompletableFuture updateAliasesAsync(List aliasOperations, @Nullable Duration timeout) { ChangeAliases request = ChangeAliases.newBuilder() .addAllActions(aliasOperations) .build(); @@ -653,9 +559,9 @@ public ListenableFuture updateAliasesAsync(List future = getCollections(timeout).updateAliases(request); + CompletableFuture future = toCompletableFuture(getCollections(timeout).updateAliases(request)); addLogFailureCallback(future, "Update aliases"); - return Futures.transform(future, response -> { + return future.thenApplyAsync(response -> { if (!response.getResult()) { logger.error("Alias update operation could not be performed"); throw new QdrantException("Alias update could not be performed"); @@ -668,9 +574,9 @@ public ListenableFuture updateAliasesAsync(List> listCollectionAliasesAsync(String collectionName) { + public CompletableFuture> listCollectionAliasesAsync(String collectionName) { return listCollectionAliasesAsync(collectionName, null); } @@ -679,9 +585,9 @@ public ListenableFuture> listCollectionAliasesAsync(String collecti * * @param collectionName The name of the collection. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listCollectionAliasesAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture> listCollectionAliasesAsync(String collectionName, @Nullable Duration timeout) { Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("List aliases for collection '{}'", collectionName); @@ -689,9 +595,9 @@ public ListenableFuture> listCollectionAliasesAsync(String collecti .setCollectionName(collectionName) .build(); - ListenableFuture future = getCollections(timeout).listCollectionAliases(request); + CompletableFuture future = toCompletableFuture(getCollections(timeout).listCollectionAliases(request)); addLogFailureCallback(future, "List collection aliases"); - return Futures.transform(future, response -> response.getAliasesList() + return future.thenApplyAsync(response -> response.getAliasesList() .stream() .map(AliasDescription::getAliasName) .collect(Collectors.toList()), MoreExecutors.directExecutor()); @@ -700,9 +606,9 @@ public ListenableFuture> listCollectionAliasesAsync(String collecti /** * Gets a list of all aliases for all existing collections. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listAliasesAsync() { + public CompletableFuture> listAliasesAsync() { return listAliasesAsync(null); } @@ -710,13 +616,13 @@ public ListenableFuture> listAliasesAsync() { * Gets a list of all aliases for all existing collections. * * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listAliasesAsync(@Nullable Duration timeout) { + public CompletableFuture> listAliasesAsync(@Nullable Duration timeout) { logger.debug("List all aliases"); - ListenableFuture future = getCollections(timeout).listAliases(ListAliasesRequest.getDefaultInstance()); + CompletableFuture future = toCompletableFuture(getCollections(timeout).listAliases(ListAliasesRequest.getDefaultInstance())); addLogFailureCallback(future, "List aliases"); - return Futures.transform(future, ListAliasesResponse::getAliasesList, MoreExecutors.directExecutor()); + return future.thenApplyAsync(ListAliasesResponse::getAliasesList, MoreExecutors.directExecutor()); } //endregion @@ -727,9 +633,9 @@ public ListenableFuture> listAliasesAsync(@Nullable Durat * Creates a shard key for a collection. * * @param createShardKey The request object for the operation. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createShardKeyAsync(CreateShardKeyRequest createShardKey) { + public CompletableFuture createShardKeyAsync(CreateShardKeyRequest createShardKey) { return createShardKeyAsync(createShardKey, null); } @@ -738,17 +644,17 @@ public ListenableFuture createShardKeyAsync(CreateShardK * * @param createShardKey The request object for the operation. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createShardKeyAsync(CreateShardKeyRequest createShardKey, @Nullable Duration timeout) { + public CompletableFuture createShardKeyAsync(CreateShardKeyRequest createShardKey, @Nullable Duration timeout) { String collectionName = createShardKey.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); ShardKey shardKey = createShardKey.getRequest().getShardKey(); logger.debug("Create shard key '{}' for '{}'", shardKey, collectionName); - ListenableFuture future = getCollections(timeout).createShardKey(createShardKey); + CompletableFuture future = toCompletableFuture(getCollections(timeout).createShardKey(createShardKey)); addLogFailureCallback(future, "Create shard key"); - return Futures.transform(future, response -> { + return future.thenApplyAsync(response -> { if (!response.getResult()) { logger.error("Shard key could not be created for '{}'", collectionName); throw new QdrantException("Shard key " + shardKey + " could not be created for " + collectionName); @@ -761,9 +667,9 @@ public ListenableFuture createShardKeyAsync(CreateShardK * Deletes a shard key for a collection. * * @param deleteShardKey The request object for the operation. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteShardKeyAsync(DeleteShardKeyRequest deleteShardKey) { + public CompletableFuture deleteShardKeyAsync(DeleteShardKeyRequest deleteShardKey) { return deleteShardKeyAsync(deleteShardKey, null); } @@ -772,17 +678,17 @@ public ListenableFuture deleteShardKeyAsync(DeleteShardK * * @param deleteShardKey The request object for the operation. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteShardKeyAsync(DeleteShardKeyRequest deleteShardKey, @Nullable Duration timeout) { + public CompletableFuture deleteShardKeyAsync(DeleteShardKeyRequest deleteShardKey, @Nullable Duration timeout) { String collectionName = deleteShardKey.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); ShardKey shardKey = deleteShardKey.getRequest().getShardKey(); logger.debug("Delete shard key '{}' for '{}'", shardKey, collectionName); - ListenableFuture future = getCollections(timeout).deleteShardKey(deleteShardKey); + CompletableFuture future = toCompletableFuture(getCollections(timeout).deleteShardKey(deleteShardKey)); addLogFailureCallback(future, "Delete shard key"); - return Futures.transform(future, response -> { + return future.thenApplyAsync(response -> { if (!response.getResult()) { logger.error("Shard key '{}' could not be deleted for '{}'", shardKey, collectionName); throw new QdrantException("Shard key " + shardKey + " could not be created for " + collectionName); @@ -800,9 +706,9 @@ public ListenableFuture deleteShardKeyAsync(DeleteShardK * * @param collectionName The name of the collection. * @param points The points to be upserted - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture upsertAsync( + public CompletableFuture upsertAsync( String collectionName, List points) { return upsertAsync(collectionName, points, null); @@ -815,9 +721,9 @@ public ListenableFuture upsertAsync( * @param collectionName The name of the collection. * @param points The points to be upserted * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture upsertAsync( + public CompletableFuture upsertAsync( String collectionName, List points, @Nullable Duration timeout) { @@ -834,9 +740,9 @@ public ListenableFuture upsertAsync( * Perform insert and updates on points. If a point with a given ID already exists, it will be overwritten. * * @param request The upsert points request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture upsertAsync(UpsertPoints request) { + public CompletableFuture upsertAsync(UpsertPoints request) { return upsertAsync(request, null); } @@ -845,17 +751,17 @@ public ListenableFuture upsertAsync(UpsertPoints request) { * * @param request The upsert points request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture upsertAsync( + public CompletableFuture upsertAsync( UpsertPoints request, @Nullable Duration timeout) { String collectionName = request.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Upsert {} points into '{}'", request.getPointsList().size(), collectionName); - ListenableFuture future = getPoints(timeout).upsert(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).upsert(request)); addLogFailureCallback(future, "Upsert"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } /** @@ -864,9 +770,9 @@ public ListenableFuture upsertAsync( * * @param collectionName The name of the collection. * @param ids The ids of points to delete. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAsync( + public CompletableFuture deleteAsync( String collectionName, List ids) { return deleteAsync(collectionName, ids, null); @@ -879,9 +785,9 @@ public ListenableFuture deleteAsync( * @param collectionName The name of the collection. * @param ids The ids of points to delete. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAsync( + public CompletableFuture deleteAsync( String collectionName, List ids, @Nullable Duration timeout) { @@ -899,9 +805,9 @@ public ListenableFuture deleteAsync( * * @param collectionName The name of the collection. * @param filter A filter selecting the points to be deleted. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAsync( + public CompletableFuture deleteAsync( String collectionName, Filter filter) { return deleteAsync(collectionName, filter, null); @@ -913,9 +819,9 @@ public ListenableFuture deleteAsync( * @param collectionName The name of the collection. * @param filter A filter selecting the points to be deleted. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAsync( + public CompletableFuture deleteAsync( String collectionName, Filter filter, @Nullable Duration timeout) { @@ -931,9 +837,9 @@ public ListenableFuture deleteAsync( * Deletes points. * * @param request The delete points request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAsync(DeletePoints request) { + public CompletableFuture deleteAsync(DeletePoints request) { return deleteAsync(request, null); } @@ -942,17 +848,17 @@ public ListenableFuture deleteAsync(DeletePoints request) { * * @param request The delete points request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteAsync( + public CompletableFuture deleteAsync( DeletePoints request, @Nullable Duration timeout) { String collectionName = request.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Delete from '{}'", collectionName); - ListenableFuture future = getPoints(timeout).delete(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).delete(request)); addLogFailureCallback(future, "Delete"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } /** @@ -961,9 +867,9 @@ public ListenableFuture deleteAsync( * @param collectionName The name of the collection. * @param id The id of a point to retrieve * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( String collectionName, PointId id, @Nullable ReadConsistency readConsistency @@ -985,9 +891,9 @@ public ListenableFuture> retrieveAsync( * @param withPayload Whether to include the payload or not. * @param withVectors Whether to include the vectors or not. * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( String collectionName, PointId id, boolean withPayload, @@ -1009,9 +915,9 @@ public ListenableFuture> retrieveAsync( * @param collectionName The name of the collection. * @param ids The list of ids of points to retrieve * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( String collectionName, List ids, @Nullable ReadConsistency readConsistency @@ -1033,9 +939,9 @@ public ListenableFuture> retrieveAsync( * @param withPayload Whether to include the payload or not. * @param withVectors Whether to include the vectors or not. * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( String collectionName, List ids, boolean withPayload, @@ -1059,9 +965,9 @@ public ListenableFuture> retrieveAsync( * @param payloadSelector Options for specifying which payload to include or not. * @param vectorsSelector Options for specifying which vectors to include into response. * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( String collectionName, List ids, WithPayloadSelector payloadSelector, @@ -1080,9 +986,9 @@ public ListenableFuture> retrieveAsync( * @param vectorsSelector Options for specifying which vectors to include into response. * @param readConsistency Options for specifying read consistency guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( String collectionName, List ids, WithPayloadSelector payloadSelector, @@ -1109,17 +1015,17 @@ public ListenableFuture> retrieveAsync( * * @param request The get points request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> retrieveAsync( + public CompletableFuture> retrieveAsync( GetPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Retrieve points from '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).get(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).get(request)); addLogFailureCallback(future, "Retrieve"); - return Futures.transform(future, GetResponse::getResultList, MoreExecutors.directExecutor()); + return future.thenApplyAsync(GetResponse::getResultList, MoreExecutors.directExecutor()); } //region Update Vectors @@ -1129,9 +1035,9 @@ public ListenableFuture> retrieveAsync( * * @param collectionName The name of the collection. * @param points The list of points and vectors to update. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateVectorsAsync( + public CompletableFuture updateVectorsAsync( String collectionName, List points ) { @@ -1144,9 +1050,9 @@ public ListenableFuture updateVectorsAsync( * @param collectionName The name of the collection. * @param points The list of points and vectors to update. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateVectorsAsync( + public CompletableFuture updateVectorsAsync( String collectionName, List points, @Nullable Duration timeout @@ -1162,9 +1068,9 @@ public ListenableFuture updateVectorsAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateVectorsAsync( + public CompletableFuture updateVectorsAsync( String collectionName, List points, @Nullable Boolean wait, @@ -1189,16 +1095,16 @@ public ListenableFuture updateVectorsAsync( * * @param request The update point vectors request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture updateVectorsAsync( + public CompletableFuture updateVectorsAsync( UpdatePointVectors request, @Nullable Duration timeout) { Preconditions.checkArgument(!request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Update vectors in '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).updateVectors(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).updateVectors(request)); addLogFailureCallback(future, "Update vectors"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } //endregion @@ -1211,9 +1117,9 @@ public ListenableFuture updateVectorsAsync( * @param collectionName The name of the collection. * @param vectors The list of vector names to delete. * @param filter A filter selecting the points to be deleted. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, Filter filter @@ -1235,9 +1141,9 @@ public ListenableFuture deleteVectorsAsync( * @param vectors The list of vector names to delete. * @param filter A filter selecting the points to be deleted. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, Filter filter, @@ -1262,9 +1168,9 @@ public ListenableFuture deleteVectorsAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, Filter filter, @@ -1288,9 +1194,9 @@ public ListenableFuture deleteVectorsAsync( * @param collectionName The name of the collection. * @param vectors The list of vector names to delete. * @param ids The list of ids to delete. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, List ids @@ -1314,9 +1220,9 @@ public ListenableFuture deleteVectorsAsync( * @param vectors The list of vector names to delete. * @param ids The list of ids to delete. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, List ids, @@ -1343,9 +1249,9 @@ public ListenableFuture deleteVectorsAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, List ids, @@ -1374,9 +1280,9 @@ public ListenableFuture deleteVectorsAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( String collectionName, List vectors, PointsSelector pointsSelector, @@ -1405,18 +1311,18 @@ public ListenableFuture deleteVectorsAsync( * * @param request The delete point vectors request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteVectorsAsync( + public CompletableFuture deleteVectorsAsync( DeletePointVectors request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Delete vectors in '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).deleteVectors(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).deleteVectors(request)); addLogFailureCallback(future, "Delete vectors"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } //endregion @@ -1433,9 +1339,9 @@ public ListenableFuture deleteVectorsAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( String collectionName, Map payload, @Nullable Boolean wait, @@ -1461,9 +1367,9 @@ public ListenableFuture setPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( String collectionName, Map payload, PointId id, @@ -1490,9 +1396,9 @@ public ListenableFuture setPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( String collectionName, Map payload, List ids, @@ -1519,9 +1425,9 @@ public ListenableFuture setPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( String collectionName, Map payload, Filter filter, @@ -1548,9 +1454,9 @@ public ListenableFuture setPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( String collectionName, Map payload, @Nullable PointsSelector pointsSelector, @@ -1579,9 +1485,9 @@ public ListenableFuture setPayloadAsync( * @param key The key for which to set the payload if nested * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( String collectionName, Map payload, @Nullable PointsSelector pointsSelector, @@ -1615,18 +1521,18 @@ public ListenableFuture setPayloadAsync( * * @param request The set payload request. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture setPayloadAsync( + public CompletableFuture setPayloadAsync( SetPayloadPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Set payload in '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).setPayload(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).setPayload(request)); addLogFailureCallback(future, "Set payload"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } //endregion @@ -1641,9 +1547,9 @@ public ListenableFuture setPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( String collectionName, Map payload, @Nullable Boolean wait, @@ -1669,9 +1575,9 @@ public ListenableFuture overwritePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( String collectionName, Map payload, PointId id, @@ -1698,9 +1604,9 @@ public ListenableFuture overwritePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( String collectionName, Map payload, List ids, @@ -1727,9 +1633,9 @@ public ListenableFuture overwritePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( String collectionName, Map payload, Filter filter, @@ -1756,9 +1662,9 @@ public ListenableFuture overwritePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( String collectionName, Map payload, @Nullable PointsSelector pointsSelector, @@ -1787,9 +1693,9 @@ public ListenableFuture overwritePayloadAsync( * @param key The key for which to overwrite the payload if nested * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( String collectionName, Map payload, @Nullable PointsSelector pointsSelector, @@ -1822,18 +1728,18 @@ public ListenableFuture overwritePayloadAsync( * * @param request The overwrite payload request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture overwritePayloadAsync( + public CompletableFuture overwritePayloadAsync( SetPayloadPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Set payload in '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).overwritePayload(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).overwritePayload(request)); addLogFailureCallback(future, "Overwrite payload"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } //endregion @@ -1848,9 +1754,9 @@ public ListenableFuture overwritePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadAsync( + public CompletableFuture deletePayloadAsync( String collectionName, List keys, @Nullable Boolean wait, @@ -1876,9 +1782,9 @@ public ListenableFuture deletePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadAsync( + public CompletableFuture deletePayloadAsync( String collectionName, List keys, PointId id, @@ -1905,9 +1811,9 @@ public ListenableFuture deletePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadAsync( + public CompletableFuture deletePayloadAsync( String collectionName, List keys, List ids, @@ -1934,9 +1840,9 @@ public ListenableFuture deletePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadAsync( + public CompletableFuture deletePayloadAsync( String collectionName, List keys, Filter filter, @@ -1963,9 +1869,9 @@ public ListenableFuture deletePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadAsync( + public CompletableFuture deletePayloadAsync( String collectionName, List keys, @Nullable PointsSelector pointsSelector, @@ -1994,18 +1900,18 @@ public ListenableFuture deletePayloadAsync( * * @param request The delete payload request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadAsync( + public CompletableFuture deletePayloadAsync( DeletePayloadPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Delete payload in '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).deletePayload(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).deletePayload(request)); addLogFailureCallback(future, "Delete payload"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } //endregion @@ -2019,9 +1925,9 @@ public ListenableFuture deletePayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture clearPayloadAsync( + public CompletableFuture clearPayloadAsync( String collectionName, @Nullable Boolean wait, @Nullable WriteOrderingType ordering, @@ -2044,9 +1950,9 @@ public ListenableFuture clearPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture clearPayloadAsync( + public CompletableFuture clearPayloadAsync( String collectionName, PointId id, @Nullable Boolean wait, @@ -2070,9 +1976,9 @@ public ListenableFuture clearPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture clearPayloadAsync( + public CompletableFuture clearPayloadAsync( String collectionName, List ids, @Nullable Boolean wait, @@ -2096,9 +2002,9 @@ public ListenableFuture clearPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture clearPayloadAsync( + public CompletableFuture clearPayloadAsync( String collectionName, Filter filter, @Nullable Boolean wait, @@ -2122,9 +2028,9 @@ public ListenableFuture clearPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture clearPayloadAsync( + public CompletableFuture clearPayloadAsync( String collectionName, @Nullable PointsSelector pointsSelector, @Nullable Boolean wait, @@ -2151,18 +2057,18 @@ public ListenableFuture clearPayloadAsync( * * @param request The clear payload request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture clearPayloadAsync( + public CompletableFuture clearPayloadAsync( ClearPayloadPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Clear payload in '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).clearPayload(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).clearPayload(request)); addLogFailureCallback(future, "Clear payload"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } //endregion @@ -2177,9 +2083,9 @@ public ListenableFuture clearPayloadAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createPayloadIndexAsync( + public CompletableFuture createPayloadIndexAsync( String collectionName, String field, PayloadSchemaType schemaType, @@ -2225,9 +2131,9 @@ public ListenableFuture createPayloadIndexAsync( } logger.debug("Create payload field index for '{}' in '{}'", field, collectionName); - ListenableFuture future = getPoints(timeout).createFieldIndex(requestBuilder.build()); + CompletableFuture future = toCompletableFuture(getPoints(timeout).createFieldIndex(requestBuilder.build())); addLogFailureCallback(future, "Create payload field index"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } /** @@ -2238,9 +2144,9 @@ public ListenableFuture createPayloadIndexAsync( * @param wait Whether to wait until the changes have been applied. Defaults to true. * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deletePayloadIndexAsync( + public CompletableFuture deletePayloadIndexAsync( String collectionName, String field, @Nullable Boolean wait, @@ -2257,18 +2163,18 @@ public ListenableFuture deletePayloadIndexAsync( } logger.debug("Delete payload field index for '{}' in '{}'", field, collectionName); - ListenableFuture future = getPoints(timeout).deleteFieldIndex(requestBuilder.build()); + CompletableFuture future = toCompletableFuture(getPoints(timeout).deleteFieldIndex(requestBuilder.build())); addLogFailureCallback(future, "Delete payload field index"); - return Futures.transform(future, PointsOperationResponse::getResult, MoreExecutors.directExecutor()); + return future.thenApplyAsync(PointsOperationResponse::getResult, MoreExecutors.directExecutor()); } /** * Retrieves closest points based on vector similarity and the given filtering conditions. * * @param request the search request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> searchAsync(SearchPoints request) { + public CompletableFuture> searchAsync(SearchPoints request) { return searchAsync(request, null); } @@ -2277,9 +2183,9 @@ public ListenableFuture> searchAsync(SearchPoints request) { * * @param request the search request * @param timeout the timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> searchAsync(SearchPoints request, @Nullable Duration timeout) { + public CompletableFuture> searchAsync(SearchPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); @@ -2288,9 +2194,9 @@ public ListenableFuture> searchAsync(SearchPoints request, @Nu "Vector must not be empty"); logger.debug("Search on '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).search(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).search(request)); addLogFailureCallback(future, "Search"); - return Futures.transform(future, SearchResponse::getResultList, MoreExecutors.directExecutor()); + return future.thenApplyAsync(SearchResponse::getResultList, MoreExecutors.directExecutor()); } /** @@ -2299,9 +2205,9 @@ public ListenableFuture> searchAsync(SearchPoints request, @Nu * @param collectionName The name of the collection * @param searches The searches to be performed in the batch. * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> searchBatchAsync( + public CompletableFuture> searchBatchAsync( String collectionName, List searches, @Nullable ReadConsistency readConsistency @@ -2316,9 +2222,9 @@ public ListenableFuture> searchBatchAsync( * @param searches The searches to be performed in the batch. * @param readConsistency Options for specifying read consistency guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> searchBatchAsync( + public CompletableFuture> searchBatchAsync( String collectionName, List searches, @Nullable ReadConsistency readConsistency, @@ -2338,18 +2244,18 @@ public ListenableFuture> searchBatchAsync( } logger.debug("Search batch on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).searchBatch(requestBuilder.build()); + CompletableFuture future = toCompletableFuture(getPoints(timeout).searchBatch(requestBuilder.build())); addLogFailureCallback(future, "Search batch"); - return Futures.transform(future, SearchBatchResponse::getResultList, MoreExecutors.directExecutor()); + return future.thenApplyAsync(SearchBatchResponse::getResultList, MoreExecutors.directExecutor()); } /** * Retrieves closest points based on vector similarity and the given filtering conditions, grouped by a given field. * * @param request The search group request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> searchGroupsAsync(SearchPointGroups request) { + public CompletableFuture> searchGroupsAsync(SearchPointGroups request) { return searchGroupsAsync(request, null); } @@ -2358,17 +2264,16 @@ public ListenableFuture> searchGroupsAsync(SearchPointGroups re * * @param request The search group request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> searchGroupsAsync(SearchPointGroups request, @Nullable Duration timeout) { + public CompletableFuture> searchGroupsAsync(SearchPointGroups request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Search groups on '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).searchGroups(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).searchGroups(request)); addLogFailureCallback(future, "Search groups"); - return Futures.transform( - future, + return future.thenApplyAsync( response -> response.getResult().getGroupsList(), MoreExecutors.directExecutor()); } @@ -2377,9 +2282,9 @@ public ListenableFuture> searchGroupsAsync(SearchPointGroups re * Iterates over all or filtered points. * * @param request The scroll request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture scrollAsync(ScrollPoints request) { + public CompletableFuture scrollAsync(ScrollPoints request) { return scrollAsync(request, null); } @@ -2388,14 +2293,14 @@ public ListenableFuture scrollAsync(ScrollPoints request) { * * @param request The scroll request. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture scrollAsync(ScrollPoints request, @Nullable Duration timeout) { + public CompletableFuture scrollAsync(ScrollPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Scroll on '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).scroll(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).scroll(request)); addLogFailureCallback(future, "Scroll"); return future; } @@ -2405,9 +2310,9 @@ public ListenableFuture scrollAsync(ScrollPoints request, @Nulla * examples. * * @param request The recommend request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> recommendAsync(RecommendPoints request) { + public CompletableFuture> recommendAsync(RecommendPoints request) { return recommendAsync(request, null); } @@ -2417,17 +2322,16 @@ public ListenableFuture> recommendAsync(RecommendPoints reques * * @param request The recommend request. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> recommendAsync(RecommendPoints request, @Nullable Duration timeout) { + public CompletableFuture> recommendAsync(RecommendPoints request, @Nullable Duration timeout) { Preconditions.checkArgument( !request.getCollectionName().isEmpty(), "Collection name must not be empty"); logger.debug("Recommend on '{}'", request.getCollectionName()); - ListenableFuture future = getPoints(timeout).recommend(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).recommend(request)); addLogFailureCallback(future, "Recommend"); - return Futures.transform( - future, + return future.thenApplyAsync( RecommendResponse::getResultList, MoreExecutors.directExecutor()); } @@ -2439,9 +2343,9 @@ public ListenableFuture> recommendAsync(RecommendPoints reques * @param collectionName The name of the collection. * @param recommendSearches The list of recommendation searches. * @param readConsistency Options for specifying read consistency guarantees. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> recommendBatchAsync( + public CompletableFuture> recommendBatchAsync( String collectionName, List recommendSearches, @Nullable ReadConsistency readConsistency) { @@ -2456,9 +2360,9 @@ public ListenableFuture> recommendBatchAsync( * @param recommendSearches The list of recommendation searches. * @param readConsistency Options for specifying read consistency guarantees. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> recommendBatchAsync( + public CompletableFuture> recommendBatchAsync( String collectionName, List recommendSearches, @Nullable ReadConsistency readConsistency, @@ -2479,10 +2383,9 @@ public ListenableFuture> recommendBatchAsync( } logger.debug("Recommend batch on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).recommendBatch(requestBuilder.build()); + CompletableFuture future = toCompletableFuture(getPoints(timeout).recommendBatch(requestBuilder.build())); addLogFailureCallback(future, "Recommend batch"); - return Futures.transform( - future, + return future.thenApplyAsync( RecommendBatchResponse::getResultList, MoreExecutors.directExecutor()); } @@ -2493,9 +2396,9 @@ public ListenableFuture> recommendBatchAsync( * @param collectionName The name of the collection. * @param operations The list of point update operations. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> batchUpdateAsync(String collectionName, List operations) { + public CompletableFuture> batchUpdateAsync(String collectionName, List operations) { return batchUpdateAsync(collectionName, operations, null, null, null); } @@ -2508,9 +2411,9 @@ public ListenableFuture> batchUpdateAsync(String collectionNa * @param ordering Write ordering guarantees. * @param timeout The timeout for the call. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> batchUpdateAsync( + public CompletableFuture> batchUpdateAsync( String collectionName, List operations, @Nullable Boolean wait, @@ -2535,16 +2438,15 @@ public ListenableFuture> batchUpdateAsync( * @param request The update batch request. * @param timeout The timeout for the call. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> batchUpdateAsync(UpdateBatchPoints request, @Nullable Duration timeout) { + public CompletableFuture> batchUpdateAsync(UpdateBatchPoints request, @Nullable Duration timeout) { String collectionName = request.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Batch update points on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).updateBatch(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).updateBatch(request)); addLogFailureCallback(future, "Batch update points"); - return Futures.transform( - future, + return future.thenApplyAsync( UpdateBatchResponse::getResultList, MoreExecutors.directExecutor()); } @@ -2554,9 +2456,9 @@ public ListenableFuture> batchUpdateAsync(UpdateBatchPoints r * examples, grouped by a given field * * @param request The recommend groups request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> recommendGroupsAsync(RecommendPointGroups request) { + public CompletableFuture> recommendGroupsAsync(RecommendPointGroups request) { return recommendGroupsAsync(request, null); } @@ -2566,16 +2468,15 @@ public ListenableFuture> recommendGroupsAsync(RecommendPointGro * * @param request The recommend groups request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> recommendGroupsAsync(RecommendPointGroups request, @Nullable Duration timeout) { + public CompletableFuture> recommendGroupsAsync(RecommendPointGroups request, @Nullable Duration timeout) { String collectionName = request.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Recommend groups on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).recommendGroups(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).recommendGroups(request)); addLogFailureCallback(future, "Recommend groups"); - return Futures.transform( - future, + return future.thenApplyAsync( response -> response.getResult().getGroupsList(), MoreExecutors.directExecutor()); } @@ -2585,9 +2486,9 @@ public ListenableFuture> recommendGroupsAsync(RecommendPointGro * Constraints by the context. * * @param request The discover points request - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> discoverAsync(DiscoverPoints request) { + public CompletableFuture> discoverAsync(DiscoverPoints request) { return discoverAsync(request, null); } @@ -2597,16 +2498,15 @@ public ListenableFuture> discoverAsync(DiscoverPoints request) * * @param request The discover points request * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> discoverAsync(DiscoverPoints request, @Nullable Duration timeout) { + public CompletableFuture> discoverAsync(DiscoverPoints request, @Nullable Duration timeout) { String collectionName = request.getCollectionName(); Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Discover on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).discover(request); + CompletableFuture future = toCompletableFuture(getPoints(timeout).discover(request)); addLogFailureCallback(future, "Discover"); - return Futures.transform( - future, + return future.thenApplyAsync( DiscoverResponse::getResultList, MoreExecutors.directExecutor()); } @@ -2619,9 +2519,9 @@ public ListenableFuture> discoverAsync(DiscoverPoints request, * @param collectionName The name of the collection * @param discoverSearches The list for discover point searches * @param readConsistency Options for specifying read consistency guarantees - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> discoverBatchAsync( + public CompletableFuture> discoverBatchAsync( String collectionName, List discoverSearches, @Nullable ReadConsistency readConsistency) { @@ -2637,9 +2537,9 @@ public ListenableFuture> discoverBatchAsync( * @param discoverSearches The list for discover point searches * @param readConsistency Options for specifying read consistency guarantees * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> discoverBatchAsync( + public CompletableFuture> discoverBatchAsync( String collectionName, List discoverSearches, @Nullable ReadConsistency readConsistency, @@ -2654,10 +2554,9 @@ public ListenableFuture> discoverBatchAsync( requestBuilder.setReadConsistency(readConsistency); } logger.debug("Discover batch on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).discoverBatch(requestBuilder.build()); + CompletableFuture future = toCompletableFuture(getPoints(timeout).discoverBatch(requestBuilder.build())); addLogFailureCallback(future, "Discover batch"); - return Futures.transform( - future, + return future.thenApplyAsync( DiscoverBatchResponse::getResultList, MoreExecutors.directExecutor()); } @@ -2666,9 +2565,9 @@ public ListenableFuture> discoverBatchAsync( * Count the points in a collection. The count is exact * * @param collectionName The name of the collection. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture countAsync(String collectionName) { + public CompletableFuture countAsync(String collectionName) { return countAsync(collectionName, null, null, null); } @@ -2677,9 +2576,9 @@ public ListenableFuture countAsync(String collectionName) { * * @param collectionName The name of the collection. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture countAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture countAsync(String collectionName, @Nullable Duration timeout) { return countAsync(collectionName, null, null, timeout); } @@ -2690,9 +2589,9 @@ public ListenableFuture countAsync(String collectionName, @Nullable Durati * @param filter Filter conditions - return only those points that satisfy the specified conditions. * @param exact If true, returns the exact count, * if false, returns an approximate count. Defaults to true. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture countAsync( + public CompletableFuture countAsync( String collectionName, @Nullable Filter filter, @Nullable Boolean exact) { @@ -2707,9 +2606,9 @@ public ListenableFuture countAsync( * @param exact If true, returns the exact count, * if false, returns an approximate count. Defaults to true. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture countAsync( + public CompletableFuture countAsync( String collectionName, @Nullable Filter filter, @Nullable Boolean exact, @@ -2724,9 +2623,9 @@ public ListenableFuture countAsync( } logger.debug("Count on '{}'", collectionName); - ListenableFuture future = getPoints(timeout).count(requestBuilder.build()); + CompletableFuture future = toCompletableFuture(getPoints(timeout).count(requestBuilder.build())); addLogFailureCallback(future, "Count"); - return Futures.transform(future, response -> response.getResult().getCount(), MoreExecutors.directExecutor()); + return future.thenApplyAsync(response -> response.getResult().getCount(), MoreExecutors.directExecutor()); } //region Snapshot Management @@ -2735,9 +2634,9 @@ public ListenableFuture countAsync( * Create snapshot for a given collection. * * @param collectionName The name of the collection. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createSnapshotAsync(String collectionName) { + public CompletableFuture createSnapshotAsync(String collectionName) { return createSnapshotAsync(collectionName, null); } @@ -2746,26 +2645,26 @@ public ListenableFuture createSnapshotAsync(String collecti * * @param collectionName The name of the collection. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createSnapshotAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture createSnapshotAsync(String collectionName, @Nullable Duration timeout) { Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("Create snapshot of '{}'", collectionName); - ListenableFuture future = getSnapshots(timeout).create( + CompletableFuture future = toCompletableFuture(getSnapshots(timeout).create( CreateSnapshotRequest.newBuilder() .setCollectionName(collectionName) - .build()); + .build())); addLogFailureCallback(future, "Create snapshot"); - return Futures.transform(future, CreateSnapshotResponse::getSnapshotDescription, MoreExecutors.directExecutor()); + return future.thenApplyAsync(CreateSnapshotResponse::getSnapshotDescription, MoreExecutors.directExecutor()); } /** * Get list of snapshots for a collection. * * @param collectionName The name of the collection. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listSnapshotAsync(String collectionName) { + public CompletableFuture> listSnapshotAsync(String collectionName) { return listSnapshotAsync(collectionName, null); } @@ -2774,16 +2673,16 @@ public ListenableFuture> listSnapshotAsync(String coll * * @param collectionName The name of the collection. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listSnapshotAsync(String collectionName, @Nullable Duration timeout) { + public CompletableFuture> listSnapshotAsync(String collectionName, @Nullable Duration timeout) { Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); logger.debug("List snapshots of '{}'", collectionName); - ListenableFuture future = getSnapshots(timeout).list(ListSnapshotsRequest.newBuilder() + CompletableFuture future = toCompletableFuture(getSnapshots(timeout).list(ListSnapshotsRequest.newBuilder() .setCollectionName(collectionName) - .build()); + .build())); addLogFailureCallback(future, "List snapshots"); - return Futures.transform(future, ListSnapshotsResponse::getSnapshotDescriptionsList, MoreExecutors.directExecutor()); + return future.thenApplyAsync(ListSnapshotsResponse::getSnapshotDescriptionsList, MoreExecutors.directExecutor()); } /** @@ -2791,9 +2690,9 @@ public ListenableFuture> listSnapshotAsync(String coll * * @param collectionName The name of the collection. * @param snapshotName The name of the snapshot. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteSnapshotAsync(String collectionName, String snapshotName) { + public CompletableFuture deleteSnapshotAsync(String collectionName, String snapshotName) { return deleteSnapshotAsync(collectionName, snapshotName, null); } @@ -2803,16 +2702,16 @@ public ListenableFuture deleteSnapshotAsync(String colle * @param collectionName The name of the collection. * @param snapshotName The name of the snapshot. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteSnapshotAsync(String collectionName, String snapshotName, @Nullable Duration timeout) { + public CompletableFuture deleteSnapshotAsync(String collectionName, String snapshotName, @Nullable Duration timeout) { Preconditions.checkArgument(!collectionName.isEmpty(), "Collection name must not be empty"); Preconditions.checkArgument(!snapshotName.isEmpty(), "Snapshot name must not be empty"); logger.debug("Delete snapshot '{}' of '{}'", snapshotName, collectionName); - ListenableFuture future = getSnapshots(timeout).delete(DeleteSnapshotRequest.newBuilder() + CompletableFuture future = toCompletableFuture(getSnapshots(timeout).delete(DeleteSnapshotRequest.newBuilder() .setCollectionName(collectionName) .setSnapshotName(snapshotName) - .build()); + .build())); addLogFailureCallback(future, "Delete snapshot"); return future; } @@ -2820,9 +2719,9 @@ public ListenableFuture deleteSnapshotAsync(String colle /** * Create snapshot for a whole storage. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createFullSnapshotAsync() { + public CompletableFuture createFullSnapshotAsync() { return createFullSnapshotAsync(null); } @@ -2830,22 +2729,22 @@ public ListenableFuture createFullSnapshotAsync() { * Create snapshot for a whole storage. * * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture createFullSnapshotAsync(@Nullable Duration timeout) { + public CompletableFuture createFullSnapshotAsync(@Nullable Duration timeout) { logger.debug("Create full snapshot for a whole storage"); - ListenableFuture future = - getSnapshots(timeout).createFull(CreateFullSnapshotRequest.getDefaultInstance()); + CompletableFuture future = + toCompletableFuture(getSnapshots(timeout).createFull(CreateFullSnapshotRequest.getDefaultInstance())); addLogFailureCallback(future, "Create full snapshot"); - return Futures.transform(future, CreateSnapshotResponse::getSnapshotDescription, MoreExecutors.directExecutor()); + return future.thenApplyAsync(CreateSnapshotResponse::getSnapshotDescription, MoreExecutors.directExecutor()); } /** * Get list of snapshots for a whole storage. * - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listFullSnapshotAsync() { + public CompletableFuture> listFullSnapshotAsync() { return listFullSnapshotAsync(null); } @@ -2853,23 +2752,23 @@ public ListenableFuture> listFullSnapshotAsync() { * Get list of snapshots for a whole storage. * * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture> listFullSnapshotAsync(@Nullable Duration timeout) { + public CompletableFuture> listFullSnapshotAsync(@Nullable Duration timeout) { logger.debug("List full snapshots for a whole storage"); - ListenableFuture future = - getSnapshots(timeout).listFull(ListFullSnapshotsRequest.getDefaultInstance()); + CompletableFuture future = + toCompletableFuture(getSnapshots(timeout).listFull(ListFullSnapshotsRequest.getDefaultInstance())); addLogFailureCallback(future, "List full snapshots"); - return Futures.transform(future, ListSnapshotsResponse::getSnapshotDescriptionsList, MoreExecutors.directExecutor()); + return future.thenApplyAsync(ListSnapshotsResponse::getSnapshotDescriptionsList, MoreExecutors.directExecutor()); } /** * Delete snapshot for a whole storage. * * @param snapshotName The name of the snapshot. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteFullSnapshotAsync(String snapshotName) { + public CompletableFuture deleteFullSnapshotAsync(String snapshotName) { return deleteFullSnapshotAsync(snapshotName, null); } @@ -2878,14 +2777,14 @@ public ListenableFuture deleteFullSnapshotAsync(String s * * @param snapshotName The name of the snapshot. * @param timeout The timeout for the call. - * @return a new instance of {@link ListenableFuture} + * @return a new instance of {@link CompletableFuture} */ - public ListenableFuture deleteFullSnapshotAsync(String snapshotName, @Nullable Duration timeout) { + public CompletableFuture deleteFullSnapshotAsync(String snapshotName, @Nullable Duration timeout) { Preconditions.checkArgument(!snapshotName.isEmpty(), "Snapshot name must not be empty"); logger.debug("Delete full snapshot '{}'", snapshotName); - ListenableFuture future = getSnapshots(timeout).deleteFull(DeleteFullSnapshotRequest.newBuilder() + CompletableFuture future = toCompletableFuture(getSnapshots(timeout).deleteFull(DeleteFullSnapshotRequest.newBuilder() .setSnapshotName(snapshotName) - .build()); + .build())); addLogFailureCallback(future, "Delete full snapshot"); return future; } @@ -2897,14 +2796,9 @@ public void close() { grpcClient.close(); } - private void addLogFailureCallback(ListenableFuture future, String message) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(V result) { - } - - @Override - public void onFailure(Throwable t) { + private void addLogFailureCallback(CompletionStage future, String message) { + future.whenCompleteAsync((result, t) -> { + if (t != null) { logger.error(message + " operation failed", t); } }, MoreExecutors.directExecutor()); diff --git a/src/main/java/io/qdrant/client/futureconverter/FutureConverter.java b/src/main/java/io/qdrant/client/futureconverter/FutureConverter.java new file mode 100644 index 0000000..4ee4082 --- /dev/null +++ b/src/main/java/io/qdrant/client/futureconverter/FutureConverter.java @@ -0,0 +1,41 @@ +/* + * Copyright © 2014-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qdrant.client.futureconverter; + + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.CompletableFuture; + +/** + * Converts between {@link java.util.concurrent.CompletableFuture} and Guava {@link com.google.common.util.concurrent.ListenableFuture}. + */ +public class FutureConverter { + + /** + * Converts {@link java.util.concurrent.CompletableFuture} to {@link com.google.common.util.concurrent.ListenableFuture}. + */ + public static ListenableFuture toListenableFuture(CompletableFuture completableFuture) { + return GuavaFutureUtils.createListenableFuture(Java8FutureUtils.createValueSourceFuture(completableFuture)); + } + + /** + * Converts {@link com.google.common.util.concurrent.ListenableFuture} to {@link java.util.concurrent.CompletableFuture}. + */ + public static CompletableFuture toCompletableFuture(ListenableFuture listenableFuture) { + return Java8FutureUtils.createCompletableFuture(GuavaFutureUtils.createValueSourceFuture(listenableFuture)); + } +} \ No newline at end of file diff --git a/src/main/java/io/qdrant/client/futureconverter/FutureWrapper.java b/src/main/java/io/qdrant/client/futureconverter/FutureWrapper.java new file mode 100644 index 0000000..83769a0 --- /dev/null +++ b/src/main/java/io/qdrant/client/futureconverter/FutureWrapper.java @@ -0,0 +1,64 @@ +/* + * Copyright © 2014-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qdrant.client.futureconverter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Wraps future and delegates to wrapped. + */ +public class FutureWrapper implements Future { + private final Future wrappedFuture; + + protected FutureWrapper(Future wrappedFuture) { + if (wrappedFuture == null) { + throw new NullPointerException("Wrapped future can not be null"); + } + this.wrappedFuture = wrappedFuture; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return wrappedFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return wrappedFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return wrappedFuture.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return wrappedFuture.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return wrappedFuture.get(timeout, unit); + } + + protected Future getWrappedFuture() { + return wrappedFuture; + } +} \ No newline at end of file diff --git a/src/main/java/io/qdrant/client/futureconverter/GuavaFutureUtils.java b/src/main/java/io/qdrant/client/futureconverter/GuavaFutureUtils.java new file mode 100644 index 0000000..9e93a35 --- /dev/null +++ b/src/main/java/io/qdrant/client/futureconverter/GuavaFutureUtils.java @@ -0,0 +1,153 @@ +/* + * Copyright © 2014-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qdrant.client.futureconverter; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import java.util.concurrent.Executor; +import java.util.function.Consumer; + + +public class GuavaFutureUtils { + // *************************************** Converting to ListenableFuture ****************************************** + + /** + * Creates listenable future from ValueSourceFuture. We have to send all Future API calls to ValueSourceFuture. + */ + public static ListenableFuture createListenableFuture(ValueSourceFuture valueSourceFuture) { + if (valueSourceFuture instanceof ListenableFutureBackedValueSourceFuture) { + return ((ListenableFutureBackedValueSourceFuture) valueSourceFuture).getWrappedFuture(); + } else { + return new ValueSourceFutureBackedListenableFuture<>(valueSourceFuture); + } + } + + public static ListenableFuture createListenableFuture(ValueSource valueSource) { + if (valueSource instanceof ListenableFutureBackedValueSourceFuture) { + return ((ListenableFutureBackedValueSourceFuture) valueSource).getWrappedFuture(); + } else { + return new ValueSourceBackedListenableFuture<>(valueSource); + } + } + + /** + * If we have ValueSourceFuture, we can use it as the implementation and this class only converts + * listener registration. + */ + private static class ValueSourceFutureBackedListenableFuture extends FutureWrapper implements ListenableFuture { + ValueSourceFutureBackedListenableFuture(ValueSourceFuture valueSourceFuture) { + super(valueSourceFuture); + } + + @Override + protected ValueSourceFuture getWrappedFuture() { + return (ValueSourceFuture) super.getWrappedFuture(); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + getWrappedFuture().addCallbacks(value -> executor.execute(listener), ex -> executor.execute(listener)); + } + } + + + /** + * If we only get ValueSource we have to create a ValueSourceFuture. Here we wrap Guavas SettableFuture + * and use it for listener handling and value storage. + */ + private static class ValueSourceBackedListenableFuture extends FutureWrapper implements ListenableFuture { + private final ValueSource valueSource; + + private ValueSourceBackedListenableFuture(ValueSource valueSource) { + super(com.google.common.util.concurrent.SettableFuture.create()); + this.valueSource = valueSource; + valueSource.addCallbacks(value -> getWrappedFuture().set(value), ex -> getWrappedFuture().setException(ex)); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + getWrappedFuture().addListener(listener, executor); + } + + @Override + protected com.google.common.util.concurrent.SettableFuture getWrappedFuture() { + return (com.google.common.util.concurrent.SettableFuture) super.getWrappedFuture(); + } + + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + valueSource.cancel(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + + private ValueSource getValueSource() { + return valueSource; + } + } + + + // *************************************** Converting from ListenableFuture ****************************************** + public static ValueSourceFuture createValueSourceFuture(ListenableFuture listenableFuture) { + if (listenableFuture instanceof ValueSourceFutureBackedListenableFuture) { + return ((ValueSourceFutureBackedListenableFuture) listenableFuture).getWrappedFuture(); + } else { + return new ListenableFutureBackedValueSourceFuture<>(listenableFuture); + } + } + + public static ValueSource createValueSource(ListenableFuture listenableFuture) { + if (listenableFuture instanceof ValueSourceBackedListenableFuture) { + return ((ValueSourceBackedListenableFuture) listenableFuture).getValueSource(); + } else { + return new ListenableFutureBackedValueSourceFuture<>(listenableFuture); + } + } + + /** + * Wraps ListenableFuture and exposes it as ValueSourceFuture. + */ + private static class ListenableFutureBackedValueSourceFuture extends ValueSourceFuture { + private ListenableFutureBackedValueSourceFuture(ListenableFuture wrappedFuture) { + super(wrappedFuture); + } + + @Override + public void addCallbacks(Consumer successCallback, Consumer failureCallback) { + Futures.addCallback(getWrappedFuture(), new FutureCallback() { + @Override + public void onSuccess(T result) { + successCallback.accept(result); + } + + @Override + public void onFailure(Throwable t) { + failureCallback.accept(t); + + } + }, MoreExecutors.directExecutor()); + } + + + @Override + protected ListenableFuture getWrappedFuture() { + return (ListenableFuture) super.getWrappedFuture(); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/qdrant/client/futureconverter/Java8FutureUtils.java b/src/main/java/io/qdrant/client/futureconverter/Java8FutureUtils.java new file mode 100644 index 0000000..7f898d7 --- /dev/null +++ b/src/main/java/io/qdrant/client/futureconverter/Java8FutureUtils.java @@ -0,0 +1,97 @@ +/* + * Copyright © 2014-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qdrant.client.futureconverter; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +public class Java8FutureUtils { + + public static CompletableFuture createCompletableFuture(ValueSource valueSource) { + if (valueSource instanceof CompletableFuturebackedValueSource) { + return ((CompletableFuturebackedValueSource) valueSource).getWrappedFuture(); + } else { + return new ValueSourcebackedCompletableFuture(valueSource); + } + } + + public static ValueSourceFuture createValueSourceFuture(CompletableFuture completableFuture) { + if (completableFuture instanceof ValueSourcebackedCompletableFuture && + ((ValueSourcebackedCompletableFuture) completableFuture).getValueSource() instanceof ValueSourceFuture) { + return (ValueSourceFuture) ((ValueSourcebackedCompletableFuture) completableFuture).getValueSource(); + } else { + return new CompletableFuturebackedValueSource<>(completableFuture); + } + } + + public static ValueSource createValueSource(CompletableFuture completableFuture) { + if (completableFuture instanceof ValueSourcebackedCompletableFuture) { + return ((ValueSourcebackedCompletableFuture) completableFuture).getValueSource(); + } else { + return new CompletableFuturebackedValueSource<>(completableFuture); + } + } + + /** + * CompletableFuture that takes values from the ValueSource. CompletableFuture is a class, not + * an interface so we can not just forward events from the ValueSource, we to always instantiate the class. + */ + private static final class ValueSourcebackedCompletableFuture extends CompletableFuture { + private final ValueSource valueSource; + + private ValueSourcebackedCompletableFuture(ValueSource valueSource) { + this.valueSource = valueSource; + valueSource.addCallbacks(this::complete, this::completeExceptionally); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (isDone()) { + return false; + } + boolean result = valueSource.cancel(mayInterruptIfRunning); + super.cancel(mayInterruptIfRunning); + return result; + } + + private ValueSource getValueSource() { + return valueSource; + } + } + + private static final class CompletableFuturebackedValueSource extends ValueSourceFuture { + private CompletableFuturebackedValueSource(CompletableFuture completableFuture) { + super(completableFuture); + } + + + @Override + public void addCallbacks(Consumer successCallback, Consumer failureCallback) { + getWrappedFuture().whenComplete((v, t) -> { + if (t == null) { + successCallback.accept(v); + } else { + failureCallback.accept(t); + } + }); + } + + @Override + protected CompletableFuture getWrappedFuture() { + return (CompletableFuture) super.getWrappedFuture(); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/qdrant/client/futureconverter/ValueSource.java b/src/main/java/io/qdrant/client/futureconverter/ValueSource.java new file mode 100644 index 0000000..999cac1 --- /dev/null +++ b/src/main/java/io/qdrant/client/futureconverter/ValueSource.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2014-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qdrant.client.futureconverter; + +import java.util.function.Consumer; + +/** + * Source of values. Let's say we are converting from RxJava Single to CompletableFuture. In such case Single is + * value source (the original object) and the library registers CompletableFuture (target object) to listen on Singles + * events. When someone calls cancel on the CompletableFuture, we want to unsubscribe from the Single, that's why we do + * have cancel method here. + */ +public interface ValueSource { + /** + * Used to notify target object about changes in the original object. + */ + void addCallbacks(Consumer successCallback, Consumer failureCallback); + + /** + * Cancels execution of the original object if cancel is called on the target object + */ + boolean cancel(boolean mayInterruptIfRunning); +} \ No newline at end of file diff --git a/src/main/java/io/qdrant/client/futureconverter/ValueSourceFuture.java b/src/main/java/io/qdrant/client/futureconverter/ValueSourceFuture.java new file mode 100644 index 0000000..4d2a498 --- /dev/null +++ b/src/main/java/io/qdrant/client/futureconverter/ValueSourceFuture.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2014-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qdrant.client.futureconverter; + +import java.util.concurrent.Future; + +/** + * Some ValueSources are already futures, let's wrap it and use its implementation. + */ +public abstract class ValueSourceFuture extends FutureWrapper implements ValueSource { + protected ValueSourceFuture(Future wrappedFuture) { + super(wrappedFuture); + } +} \ No newline at end of file diff --git a/src/test/java/io/qdrant/client/QdrantClientTest.java b/src/test/java/io/qdrant/client/QdrantClientTest.java index 94ec215..4d6d2f7 100644 --- a/src/test/java/io/qdrant/client/QdrantClientTest.java +++ b/src/test/java/io/qdrant/client/QdrantClientTest.java @@ -3,6 +3,9 @@ import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.qdrant.client.grpc.Collections; +import io.qdrant.client.grpc.Points.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -11,6 +14,19 @@ import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.qdrant.QdrantContainer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static io.qdrant.client.ConditionFactory.matchKeyword; +import static io.qdrant.client.PointIdFactory.id; +import static io.qdrant.client.ValueFactory.value; +import static io.qdrant.client.VectorsFactory.vectors; + +import static io.qdrant.client.WithPayloadSelectorFactory.enable; +import static org.assertj.core.api.Assertions.assertThat; + @Testcontainers class QdrantClientTest { @@ -37,4 +53,78 @@ public void teardown() { void canAccessChannelOnGrpcClient() { Assertions.assertTrue(client.grpcClient().channel().authority().startsWith("localhost")); } + + @Test + void createListDeleteCollectionAsyncReturnsExpectedResponse() { + // When + CompletableFuture future = client.createCollectionAsync("test_collection", + Collections.VectorParams.newBuilder().setDistance(Collections.Distance.Dot).setSize(4).build()); + + // Then + // creation + assertThat(future).succeedsWithin(Duration.ofMillis(500)); + assertThat(future).isCompletedWithValueMatching(Collections.CollectionOperationResponse::getResult); + + // list + client.listCollectionsAsync().thenCompose(collections -> { + assertThat(collections).contains("test_collection"); + + // delete + return client.deleteCollectionAsync("test_collection"); + }).thenAccept(response -> { + assertThat(response.getResult()).isTrue(); + }); + } + + @Test + void upsertListDeleteVectorAsyncReturnsExpectedResponse() { + // When + CompletableFuture collectionFuture = client.createCollectionAsync("test_collection", + Collections.VectorParams.newBuilder().setDistance(Collections.Distance.Dot).setSize(4).build()); + + // Then + assertThat(collectionFuture).succeedsWithin(Duration.ofMillis(500)); + assertThat(collectionFuture).isCompletedWithValueMatching(Collections.CollectionOperationResponse::getResult); + + // When + CompletableFuture vectorFuture = client.upsertAsync("test_collection", List.of( + PointStruct.newBuilder().setId(id(1)) + .setVectors(vectors(0.05f, 0.61f, 0.76f, 0.74f)) + .putAllPayload(Map.of("city", value("Berlin"))) + .build(), + PointStruct.newBuilder() + .setId(id(2)) + .setVectors(vectors(0.19f, 0.81f, 0.75f, 0.11f)) + .putAllPayload(Map.of("city", value("London"))) + .build(), + PointStruct.newBuilder() + .setId(id(3)) + .setVectors(vectors(0.36f, 0.55f, 0.47f, 0.94f)) + .putAllPayload(Map.of("city", value("Moscow"))) + .build())); + + // Then + assertThat(vectorFuture).succeedsWithin(Duration.ofMillis(500)); + assertThat(vectorFuture).isCompletedWithValueMatching(response -> response.getStatus().equals(UpdateStatus.Completed)); + + // When + CompletableFuture> searchFuture = client.searchAsync(SearchPoints.newBuilder() + .setCollectionName("test_collection") + .setLimit(3) + .setFilter(Filter.newBuilder().addMust(matchKeyword("city", "London"))) + .addAllVector(List.of(0.2f, 0.1f, 0.9f, 0.7f)) + .setWithPayload(enable(true)) + .build()); + + // Then + assertThat(searchFuture).succeedsWithin(Duration.ofMillis(500)); + assertThat(searchFuture).isCompletedWithValueMatching(list -> list.size() == 1 && list.get(0).getId().getNum() == 2); + + // When + CompletableFuture deleteFuture = client.deleteAsync("test_collection", List.of(id(1), id(2), id(3))); + + // Then + assertThat(deleteFuture).succeedsWithin(Duration.ofMillis(500)); + assertThat(deleteFuture).isCompletedWithValueMatching(response -> response.getStatus().equals(UpdateStatus.Completed)); + } }