Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Nov 15, 2024
1 parent 4d88edf commit dbd44c9
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 119 deletions.
72 changes: 72 additions & 0 deletions internal/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage"
Expand Down Expand Up @@ -718,6 +719,77 @@ func TestClient(t *testing.T) {
})
}
})
mtBulkWriteOpts := mtest.NewOptions().MinServerVersion("8.0").AtlasDataLake(false).ClientType(mtest.Pinned)
mt.RunOpts("bulk write with nil filter", mtBulkWriteOpts, func(mt *mtest.T) {
mt.Parallel()

testCases := []struct {
name string
models *mongo.ClientWriteModels
}{
{
name: "DeleteOne",
models: (&mongo.ClientWriteModels{}).AppendDeleteOne("foo", "bar", mongo.NewClientDeleteOneModel()),
},
{
name: "DeleteMany",
models: (&mongo.ClientWriteModels{}).AppendDeleteMany("foo", "bar", mongo.NewClientDeleteManyModel()),
},
{
name: "UpdateOne",
models: (&mongo.ClientWriteModels{}).AppendUpdateOne("foo", "bar", mongo.NewClientUpdateOneModel()),
},
{
name: "UpdateMany",
models: (&mongo.ClientWriteModels{}).AppendUpdateMany("foo", "bar", mongo.NewClientUpdateManyModel()),
},
}
for _, tc := range testCases {
tc := tc

mt.Run(tc.name, func(mt *mtest.T) {
mt.Parallel()

_, err := mt.Client.BulkWrite(context.Background(), tc.models)
require.ErrorContains(mt, err, "filter is required")
})
}
})
mt.RunOpts("bulk write with write concern", mtBulkWriteOpts, func(mt *mtest.T) {
mt.Parallel()

testCases := []struct {
name string
opts *options.ClientBulkWriteOptionsBuilder
want bool
}{
{
name: "unacknowledged",
opts: options.ClientBulkWrite().SetWriteConcern(writeconcern.Unacknowledged()).SetOrdered(false),
want: false,
},
{
name: "acknowledged",
want: true,
},
}
for _, tc := range testCases {
tc := tc

mt.Run(tc.name, func(mt *mtest.T) {
mt.Parallel()

var models *mongo.ClientWriteModels

insertOneModel := mongo.NewClientInsertOneModel().SetDocument(bson.D{{"x", 1}})
models = (&mongo.ClientWriteModels{}).AppendInsertOne("foo", "bar", insertOneModel)
res, err := mt.Client.BulkWrite(context.Background(), models, tc.opts)
require.NoError(mt, err, "BulkWrite error: %v", err)
require.NotNil(mt, res, "expected a ClientBulkWriteResult")
assert.Equal(mt, res.Acknowledged, tc.want, "expected Acknowledged: %v, got: %v", tc.want, res.Acknowledged)
})
}
})
}

func TestClient_BSONOptions(t *testing.T) {
Expand Down
35 changes: 35 additions & 0 deletions internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,41 @@ func TestCollection(t *testing.T) {
})
}
})
mt.Run("error on nil filter", func(mt *mtest.T) {
mt.Parallel()

testCases := []struct {
name string
model mongo.WriteModel
}{
{
name: "DeleteOne",
model: mongo.NewDeleteOneModel(),
},
{
name: "DeleteMany",
model: mongo.NewDeleteManyModel(),
},
{
name: "UpdateOne",
model: mongo.NewUpdateOneModel().SetUpdate(bson.D{{"$set", bson.D{{"x", 1}}}}),
},
{
name: "UpdateMany",
model: mongo.NewUpdateManyModel().SetUpdate(bson.D{{"$set", bson.D{{"x", 1}}}}),
},
}
for _, tc := range testCases {
tc := tc

mt.Run(tc.name, func(mt *mtest.T) {
mt.Parallel()

_, err := mt.Coll.BulkWrite(context.Background(), []mongo.WriteModel{tc.model})
assert.ErrorContains(mt, err, "filter is required")
})
}
})
mt.Run("correct model in errors", func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewUpdateOneModel().SetFilter(bson.M{}).SetUpdate(bson.M{
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/crud_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func TestClientBulkWrite(t *testing.T) {
}
result, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetOrdered(false).SetWriteConcern(writeconcern.Unacknowledged()))
require.NoError(mt, err, "BulkWrite error: %v", err)
assert.Nil(mt, result, "expected a nil result, got: %v", result)
assert.False(mt, result.Acknowledged)
require.Len(mt, bwCmd, 2, "expected %d bulkWrite calls, got: %d", 2, len(bwCmd))

assert.Len(mt, bwCmd[0].Ops, numModels-1, "expected %d ops, got: %d", numModels-1, len(bwCmd[0].Ops))
Expand Down
10 changes: 5 additions & 5 deletions internal/integration/unified/client_operation_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
}

res, err := client.BulkWrite(ctx, wirteModels, opts)
if res == nil {
var bwe mongo.ClientBulkWriteException
if !errors.As(err, &bwe) || bwe.PartialResult == nil {
return newDocumentResult(emptyCoreDocument, err), nil
}
var bwe mongo.ClientBulkWriteException
if errors.As(err, &bwe) {
res = bwe.PartialResult
}
if res == nil || !res.Acknowledged {
return newDocumentResult(emptyCoreDocument, err), nil
}
rawBuilder := bsoncore.NewDocumentBuilder().
AppendInt64("deletedCount", res.DeletedCount).
AppendInt64("insertedCount", res.InsertedCount).
Expand Down
7 changes: 7 additions & 0 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package mongo
import (
"context"
"errors"
"fmt"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo/options"
Expand Down Expand Up @@ -296,6 +297,9 @@ func createDeleteDoc(
) (bsoncore.Document, error) {
f, err := marshal(filter, bsonOpts, registry)
if err != nil {
if filter == nil {
return nil, fmt.Errorf("%w: filter is required", err)
}
return nil, err
}

Expand Down Expand Up @@ -428,6 +432,9 @@ type updateDoc struct {
func (doc updateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (bsoncore.Document, error) {
f, err := marshal(doc.filter, bsonOpts, registry)
if err != nil {
if doc.filter == nil {
return nil, fmt.Errorf("%w: filter is required", err)
}
return nil, err
}

Expand Down
12 changes: 12 additions & 0 deletions mongo/bulk_write_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type WriteModel interface {
}

// InsertOneModel is used to insert a single document in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type InsertOneModel struct {
Document interface{}
}
Expand All @@ -40,6 +42,8 @@ func (iom *InsertOneModel) SetDocument(doc interface{}) *InsertOneModel {
func (*InsertOneModel) writeModel() {}

// DeleteOneModel is used to delete at most one document in a BulkWriteOperation.
//
// See corresponding setter methods for documentation.
type DeleteOneModel struct {
Filter interface{}
Collation *options.Collation
Expand Down Expand Up @@ -80,6 +84,8 @@ func (dom *DeleteOneModel) SetHint(hint interface{}) *DeleteOneModel {
func (*DeleteOneModel) writeModel() {}

// DeleteManyModel is used to delete multiple documents in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type DeleteManyModel struct {
Filter interface{}
Collation *options.Collation
Expand Down Expand Up @@ -119,6 +125,8 @@ func (dmm *DeleteManyModel) SetHint(hint interface{}) *DeleteManyModel {
func (*DeleteManyModel) writeModel() {}

// ReplaceOneModel is used to replace at most one document in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type ReplaceOneModel struct {
Collation *options.Collation
Upsert *bool
Expand Down Expand Up @@ -176,6 +184,8 @@ func (rom *ReplaceOneModel) SetUpsert(upsert bool) *ReplaceOneModel {
func (*ReplaceOneModel) writeModel() {}

// UpdateOneModel is used to update at most one document in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type UpdateOneModel struct {
Collation *options.Collation
Upsert *bool
Expand Down Expand Up @@ -241,6 +251,8 @@ func (uom *UpdateOneModel) SetUpsert(upsert bool) *UpdateOneModel {
func (*UpdateOneModel) writeModel() {}

// UpdateManyModel is used to update multiple documents in a BulkWrite operation.
//
// See corresponding setter methods for documentation.
type UpdateManyModel struct {
Collation *options.Collation
Upsert *bool
Expand Down
17 changes: 8 additions & 9 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,14 @@ func (c *Client) createBaseCursorOptions() driver.CursorOptions {
// BulkWrite performs a client-level bulk write operation.
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
opts ...options.Lister[options.ClientBulkWriteOptions]) (*ClientBulkWriteResult, error) {
// TODO: Remove once DRIVERS-2888 is implemented.
// TODO(GODRIVER-3403): Remove after support for QE with Client.bulkWrite.
if c.isAutoEncryptionSet {
return nil, errors.New("bulkWrite does not currently support automatic encryption")
}

if models == nil {
return nil, ErrNilValue
}
bwo, err := mongoutil.NewOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -962,14 +966,9 @@ func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
} else if !acknowledged {
return nil, errors.New("cannot request unacknowledged write concern and verbose results")
}
if err = op.execute(ctx); err != nil {
return nil, replaceErrors(err)
}
var results *ClientBulkWriteResult
if acknowledged {
results = &op.result
}
return results, nil
op.result.Acknowledged = acknowledged
err = op.execute(ctx)
return &op.result, replaceErrors(err)
}

// newLogger will use the LoggerOptions to create an internal logger and publish
Expand Down
Loading

0 comments on commit dbd44c9

Please sign in to comment.