diff --git a/data/bson_utils.go b/data/bson_utils.go new file mode 100644 index 00000000000..4428e1df2af --- /dev/null +++ b/data/bson_utils.go @@ -0,0 +1,50 @@ +package data + +import ( + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "google.golang.org/protobuf/types/known/structpb" +) + +// pbStructToBSON converts a structpb.Struct to a bson.M. +func pbStructToBSON(s *structpb.Struct) (bson.M, error) { + bsonMap := make(bson.M) + for k, v := range s.Fields { + bsonValue, err := convertPBStructValueToBSON(v) + if err != nil { + return nil, err + } + bsonMap[k] = bsonValue + } + return bsonMap, nil +} + +func convertPBStructValueToBSON(v *structpb.Value) (interface{}, error) { + switch v.Kind.(type) { + case *structpb.Value_NullValue: + var ret interface{} + return ret, nil + case *structpb.Value_NumberValue: + return v.GetNumberValue(), nil + case *structpb.Value_StringValue: + return v.GetStringValue(), nil + case *structpb.Value_BoolValue: + return v.GetBoolValue(), nil + case *structpb.Value_StructValue: + return pbStructToBSON(v.GetStructValue()) + case *structpb.Value_ListValue: + list := v.GetListValue() + var slice bson.A + for _, item := range list.Values { + bsonValue, err := convertPBStructValueToBSON(item) + if err != nil { + return nil, err + } + slice = append(slice, bsonValue) + } + return slice, nil + default: + return nil, fmt.Errorf("unsupported value type: %T", v.Kind) + } +} diff --git a/data/bson_utils_test.go b/data/bson_utils_test.go new file mode 100644 index 00000000000..a43b5fc9b30 --- /dev/null +++ b/data/bson_utils_test.go @@ -0,0 +1,186 @@ +package data + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.viam.com/test" + "google.golang.org/protobuf/types/known/structpb" +) + +// bsonToStructPB converts a bson.M to a structpb.Struct. +func bsonToStructPB(bsonMap bson.M) (*structpb.Struct, error) { + s := &structpb.Struct{ + Fields: make(map[string]*structpb.Value), + } + for k, v := range bsonMap { + value, err := convertBSONValueToStructPBValue(v) + if err != nil { + return nil, err + } + s.Fields[k] = value + } + return s, nil +} + +func convertBSONValueToStructPBValue(v interface{}) (*structpb.Value, error) { + switch val := v.(type) { + case nil, primitive.Undefined: + return &structpb.Value{Kind: &structpb.Value_NullValue{}}, nil + case float64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: val}}, nil + case int64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(val)}}, nil + case int32: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(val)}}, nil + case string: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: val}}, nil + case bool: + return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: val}}, nil + case bson.M: + s, err := bsonToStructPB(val) + if err != nil { + return nil, err + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: s}}, nil + case bson.A: + list := &structpb.ListValue{} + for _, item := range val { + value, err := convertBSONValueToStructPBValue(item) + if err != nil { + return nil, err + } + list.Values = append(list.Values, value) + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: list}}, nil + case primitive.DateTime: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: val.Time().String()}}, nil + case primitive.Timestamp: + jsonStr, err := json.Marshal(val) + if err != nil { + return nil, err + } + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: string(jsonStr)}}, nil + case primitive.JavaScript: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: string(val)}}, nil + case primitive.Symbol: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: string(val)}}, nil + case primitive.DBPointer, primitive.CodeWithScope, primitive.Decimal128, primitive.Regex, primitive.ObjectID: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: val.(fmt.Stringer).String()}}, nil + case primitive.MinKey: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "MinKey"}}, nil + case primitive.MaxKey: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "MaxKey"}}, nil + case primitive.Binary: + // If it's a UUID, return the UUID as a hex string. + if val.Subtype == bson.TypeBinaryUUID { + data, err := uuid.FromBytes(val.Data) + if err != nil { + return nil, err + } + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: data.String()}}, nil + } + + // Otherwise return a list of the raw bytes. + list := make([]*structpb.Value, len(val.Data)) + for i, b := range val.Data { + list[i] = &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(b)}} + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: list}}}, nil + default: + return nil, fmt.Errorf("unsupported BSON type: %T", v) + } +} + +func TestBSONToStructPBAndBack(t *testing.T) { + tests := []struct { + name string + input *structpb.Struct + expectedBSON primitive.M + }{ + { + name: "Primitive fields are properly converted between structpb.Struct <-> BSON.", + input: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "name": {Kind: &structpb.Value_StringValue{StringValue: "John"}}, + "age": {Kind: &structpb.Value_NumberValue{NumberValue: 30}}, + "alive": {Kind: &structpb.Value_BoolValue{BoolValue: true}}, + "nullable": {Kind: &structpb.Value_NullValue{}}, + }, + }, + expectedBSON: bson.M{ + "name": "John", + "age": 30.0, + "alive": true, + "nullable": nil, + }, + }, + { + name: "Nested struct fields are properly converted between structpb.Struct <-> BSON.", + input: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "person": { + Kind: &structpb.Value_StructValue{ + StructValue: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "name": {Kind: &structpb.Value_StringValue{StringValue: "Alice"}}, + "age": {Kind: &structpb.Value_NumberValue{NumberValue: 25}}, + "alive": {Kind: &structpb.Value_BoolValue{BoolValue: true}}, + }, + }, + }, + }, + }, + }, + expectedBSON: bson.M{ + "person": bson.M{ + "name": "Alice", + "age": float64(25), + "alive": true, + }, + }, + }, + { + name: "List fields are properly converted between structpb.Struct <-> BSON.", + input: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "names": { + Kind: &structpb.Value_ListValue{ + ListValue: &structpb.ListValue{ + Values: []*structpb.Value{ + {Kind: &structpb.Value_StringValue{StringValue: "Bob"}}, + {Kind: &structpb.Value_StringValue{StringValue: "Charlie"}}, + }, + }, + }, + }, + }, + }, + expectedBSON: bson.M{ + "names": bson.A{"Bob", "Charlie"}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Convert StructPB to BSON + bsonMap, err := pbStructToBSON(tc.input) + test.That(t, err, test.ShouldBeNil) + + // Validate the BSON is structured as expected. + test.That(t, bsonMap, test.ShouldResemble, tc.expectedBSON) + + // Convert BSON back to StructPB + result, err := bsonToStructPB(bsonMap) + test.That(t, err, test.ShouldBeNil) + + // Check if the result matches the original input + test.That(t, result, test.ShouldResemble, tc.input) + }) + } +} diff --git a/data/capture_buffer.go b/data/capture_buffer.go index 0989efcae57..93edc43025e 100644 --- a/data/capture_buffer.go +++ b/data/capture_buffer.go @@ -6,6 +6,8 @@ import ( v1 "go.viam.com/api/app/datasync/v1" ) +const captureAllFromCamera = "CaptureAllFromCamera" + // CaptureBufferedWriter is a buffered, persistent queue of SensorData. type CaptureBufferedWriter interface { Write(item *v1.SensorData) error @@ -63,7 +65,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { // We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images // and their corresponding annotations. We want each image and its annotations to be stored in a // separate file. - } else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" { + } else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == captureAllFromCamera { if err := b.nextFile.Close(); err != nil { return err } diff --git a/data/collector.go b/data/collector.go index 948d74768b1..7070141147e 100644 --- a/data/collector.go +++ b/data/collector.go @@ -11,6 +11,8 @@ import ( "github.com/benbjohnson/clock" "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" "go.opencensus.io/trace" v1 "go.viam.com/api/app/datasync/v1" pb "go.viam.com/api/common/v1" @@ -58,9 +60,14 @@ type Collector interface { type collector struct { clock clock.Clock captureResults chan *v1.SensorData - captureErrors chan error - interval time.Duration - params map[string]*anypb.Any + + mongoCollection *mongo.Collection + componentName string + componentType string + methodName string + captureErrors chan error + interval time.Duration + params map[string]*anypb.Any // `lock` serializes calls to `Flush` and `Close`. lock sync.Mutex logger logging.Logger @@ -257,7 +264,11 @@ func NewCollector(captureFunc CaptureFunc, params CollectorParams) (Collector, e c = params.Clock } return &collector{ + componentName: params.ComponentName, + componentType: params.ComponentType, + methodName: params.MethodName, captureResults: make(chan *v1.SensorData, params.QueueSize), + mongoCollection: params.MongoCollection, captureErrors: make(chan error, params.QueueSize), interval: params.Interval, params: params.MethodParams, @@ -285,10 +296,64 @@ func (c *collector) writeCaptureResults() { c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write to collector %s", c.target.Path())).Error()) return } + + c.maybeWriteToMongo(msg) } } } +// TabularData is a denormalized sensor reading. +type TabularData struct { + TimeRequested time.Time `bson:"time_requested"` + TimeReceived time.Time `bson:"time_received"` + ComponentName string `bson:"component_name"` + ComponentType string `bson:"component_type"` + MethodName string `bson:"method_name"` + Data bson.M `bson:"data"` +} + +// maybeWriteToMongo will write to the mongoCollection +// if it is non-nil and the msg is tabular data +// logs errors on failure. +func (c *collector) maybeWriteToMongo(msg *v1.SensorData) { + if c.mongoCollection == nil { + return + } + + // DATA-3338: + // currently vision.CaptureAllFromCamera and camera.GetImages are stored in .capture files as VERY LARGE + // tabular sensor data + // That is a mistake which we are rectifying but in the meantime we don't want data captured from those methods to be synced + // to mongo + if getDataType(c.methodName) == v1.DataType_DATA_TYPE_BINARY_SENSOR || c.methodName == captureAllFromCamera { + return + } + + s := msg.GetStruct() + if s == nil { + return + } + + data, err := pbStructToBSON(s) + if err != nil { + c.logger.Error(errors.Wrap(err, "failed to convert sensor data into bson")) + return + } + + td := TabularData{ + TimeRequested: msg.Metadata.TimeRequested.AsTime(), + TimeReceived: msg.Metadata.TimeReceived.AsTime(), + ComponentName: c.componentName, + ComponentType: c.componentType, + MethodName: c.methodName, + Data: data, + } + + if _, err := c.mongoCollection.InsertOne(c.cancelCtx, td); err != nil { + c.logger.Error(errors.Wrap(err, "failed to write to mongo")) + } +} + func (c *collector) logCaptureErrs() { for err := range c.captureErrors { now := c.clock.Now().Unix() diff --git a/data/registry.go b/data/registry.go index d534e1b09c3..d48997893ed 100644 --- a/data/registry.go +++ b/data/registry.go @@ -7,6 +7,7 @@ import ( "github.com/benbjohnson/clock" "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/mongo" "google.golang.org/protobuf/types/known/anypb" "go.viam.com/rdk/logging" @@ -18,14 +19,17 @@ type CollectorConstructor func(resource interface{}, params CollectorParams) (Co // CollectorParams contain the parameters needed to construct a Collector. type CollectorParams struct { - ComponentName string - Interval time.Duration - MethodParams map[string]*anypb.Any - Target CaptureBufferedWriter - QueueSize int - BufferSize int - Logger logging.Logger - Clock clock.Clock + MongoCollection *mongo.Collection + ComponentName string + ComponentType string + MethodName string + Interval time.Duration + MethodParams map[string]*anypb.Any + Target CaptureBufferedWriter + QueueSize int + BufferSize int + Logger logging.Logger + Clock clock.Clock } // Validate validates that p contains all required parameters. diff --git a/etc/analyzecoverage/main.go b/etc/analyzecoverage/main.go index 8d10e919234..53806c19209 100644 --- a/etc/analyzecoverage/main.go +++ b/etc/analyzecoverage/main.go @@ -88,13 +88,10 @@ func mainWithArgs(ctx context.Context, _ []string, logger logging.Logger) error ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - client, err := mongo.NewClient(options.Client().ApplyURI(mongoURI)) + client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI)) if err != nil { return err } - if err := client.Connect(ctx); err != nil { - return err - } if err := client.Ping(ctx, readpref.Primary()); err != nil { return multierr.Combine(err, client.Disconnect(ctx)) } diff --git a/etc/analyzetests/main.go b/etc/analyzetests/main.go index 178665e5a99..3c8fb991efb 100644 --- a/etc/analyzetests/main.go +++ b/etc/analyzetests/main.go @@ -39,13 +39,10 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - client, err := mongo.NewClient(options.Client().ApplyURI(mongoURI)) + client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI)) if err != nil { return err } - if err := client.Connect(ctx); err != nil { - return err - } if err := client.Ping(ctx, readpref.Primary()); err != nil { return multierr.Combine(err, client.Disconnect(ctx)) } diff --git a/go.mod b/go.mod index 0258bd23bf7..b97ebffb1b2 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/viamrobotics/webrtc/v3 v3.99.10 github.com/xfmoulet/qoi v0.2.0 go-hep.org/x/hep v0.32.1 - go.mongodb.org/mongo-driver v1.11.6 + go.mongodb.org/mongo-driver v1.12.2 go.opencensus.io v0.24.0 go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 diff --git a/go.sum b/go.sum index 21bb07927fa..2b1bd15d364 100644 --- a/go.sum +++ b/go.sum @@ -1373,9 +1373,6 @@ github.com/tetafro/godot v1.4.17 h1:pGzu+Ye7ZUEFx7LHU0dAKmCOXWsPjl7qA6iMGndsjPs= github.com/tetafro/godot v1.4.17/go.mod h1:2oVxTBSftRTh4+MVfUaUXR6bn2GDXCaMcOG4Dk3rfio= github.com/tetratelabs/wazero v1.2.0 h1:I/8LMf4YkCZ3r2XaL9whhA0VMyAvF6QE+O7rco0DCeQ= github.com/tetratelabs/wazero v1.2.0/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= -github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= @@ -1429,10 +1426,8 @@ github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg= github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xen0n/gosmopolitan v1.2.2 h1:/p2KTnMzwRexIW8GlKawsTWOxn7UHA+jCMF/V8HHtvU= @@ -1477,8 +1472,8 @@ go-simpler.org/sloglint v0.7.2/go.mod h1:US+9C80ppl7VsThQclkM7BkCHQAzuz8kHLsW3pp go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= -go.mongodb.org/mongo-driver v1.11.6 h1:XM7G6PjiGAO5betLF13BIa5TlLUUE3uJ/2Ox3Lz1K+o= -go.mongodb.org/mongo-driver v1.11.6/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY= +go.mongodb.org/mongo-driver v1.12.2 h1:gbWY1bJkkmUB9jjZzcdhOL8O85N9H+Vvsf2yFN0RDws= +go.mongodb.org/mongo-driver v1.12.2/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index f04ef2ec2dc..eff4c215aee 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -129,13 +129,13 @@ func New( } // Close releases all resources managed by data_manager. -func (b *builtIn) Close(_ context.Context) error { +func (b *builtIn) Close(ctx context.Context) error { b.logger.Info("Close START") defer b.logger.Info("Close END") b.mu.Lock() defer b.mu.Unlock() b.diskSummaryLogger.close() - b.capture.Close() + b.capture.Close(ctx) b.sync.Close() return nil } diff --git a/services/datamanager/builtin/builtin_test.go b/services/datamanager/builtin/builtin_test.go index 274f0da8ea4..91bf269e592 100644 --- a/services/datamanager/builtin/builtin_test.go +++ b/services/datamanager/builtin/builtin_test.go @@ -324,7 +324,7 @@ func TestFileDeletion(t *testing.T) { // flush and close collectors to ensure we have exactly 4 files // close capture to stop it from writing more files - b.capture.Close() + b.capture.Close(ctx) // number of capture files is based on the number of unique // collectors in the robot config used in this test diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index ebb5038a54e..75112cba583 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -10,6 +10,8 @@ import ( "github.com/benbjohnson/clock" "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" goutils "go.viam.com/utils" "go.viam.com/rdk/data" @@ -26,8 +28,12 @@ import ( // writes this would be performing. const defaultCaptureQueueSize = 250 -// Default bufio.Writer buffer size in bytes. -const defaultCaptureBufferSize = 4096 +const ( + // Default bufio.Writer buffer size in bytes. + defaultCaptureBufferSize = 4096 + defaultMongoDatabaseName = "sensorData" + defaultMongoCollectionName = "readings" +) func generateMetadataKey(component, method string) string { return fmt.Sprintf("%s/%s", component, method) @@ -52,11 +58,20 @@ type Capture struct { collectorsMu sync.Mutex collectors collectors - // captureDir is only stored on Capture so that we can detect when it changs captureDir string // maxCaptureFileSize is only stored on Capture so that we can detect when it changs maxCaptureFileSize int64 + mongoMU sync.Mutex + mongo captureMongo +} + +type captureMongo struct { + // the struct members are protected by + // mu and are either all nil or all non nil + client *mongo.Client + collection *mongo.Collection + config *MongoConfig } type ( @@ -93,7 +108,11 @@ func format(c datamanager.DataCaptureConfig) string { c.Name, c.Method, c.CaptureFrequencyHz, c.CaptureQueueSize, c.AdditionalParams, c.Disabled, c.Tags, c.CaptureDirectory) } -func (c *Capture) newCollectors(collectorConfigsByResource CollectorConfigsByResource, config Config) collectors { +func (c *Capture) newCollectors( + collectorConfigsByResource CollectorConfigsByResource, + config Config, + collection *mongo.Collection, +) collectors { // Initialize or add collectors based on changes to the component configurations. newCollectors := make(map[collectorMetadata]*collectorAndConfig) for res, cfgs := range collectorConfigsByResource { @@ -112,7 +131,7 @@ func (c *Capture) newCollectors(collectorConfigsByResource CollectorConfigsByRes continue } - newCollectorAndConfig, err := c.initializeOrUpdateCollector(res, md, cfg, config) + newCollectorAndConfig, err := c.initializeOrUpdateCollector(res, md, cfg, config, collection) if err != nil { c.logger.Warnw("failed to initialize or update collector", "error", err, "resource_name", res.Name(), "metadata", md, "data capture config", format(cfg)) @@ -124,6 +143,65 @@ func (c *Capture) newCollectors(collectorConfigsByResource CollectorConfigsByRes return newCollectors } +func (c *Capture) mongoSetup(ctx context.Context, newConfig MongoConfig) *mongo.Collection { + oldConfig := c.mongo.config + if oldConfig != nil && oldConfig.Equal(newConfig) && c.mongo.client != nil { + // if we have a client & the configs are equal, reuse the existing collection + return c.mongo.collection + } + + // We now know we want a mongo connection and that we either don't have one, or we have one + // but the config has changed. + // In either case we need to close all collectors and the client connection (if one exists), + // create a new client & return the configured collection. + c.closeNoMongoMutex(ctx) + // Use the SetServerAPIOptions() method to set the Stable API version to 1 + serverAPI := options.ServerAPI(options.ServerAPIVersion1) + // Create a new client and connect to the server + client, err := mongo.Connect(ctx, options.Client().ApplyURI(newConfig.URI).SetServerAPIOptions(serverAPI)) + if err != nil { + c.logger.Warn("failed to create mongo connection with mongo_capture_config.uri") + return nil + } + database := defaultIfZeroVal(newConfig.Database, defaultMongoDatabaseName) + collection := defaultIfZeroVal(newConfig.Collection, defaultMongoCollectionName) + c.mongo = captureMongo{ + client: client, + collection: client.Database(database).Collection(collection), + config: &newConfig, + } + c.logger.Info("mongo client created") + return c.mongo.collection +} + +// mongoReconfigure shuts down the collectors when the mongo client is no longer being +// valid based on the new config and attempts to create a new mongo client when the new c +// config perscribes one. +// returns a *mongo.Collection when the new client is valid and nil when it is not. +func (c *Capture) mongoReconfigure(ctx context.Context, newConfig *MongoConfig) *mongo.Collection { + c.mongoMU.Lock() + defer c.mongoMU.Unlock() + noClient := c.mongo.client == nil + disabled := newConfig == nil || newConfig.URI == "" + + if noClient && disabled { + // if we don't have a client and the new config + // isn't asking for a mongo connection, no-op + return nil + } + + if disabled { + // if we currently have a client, and the new config is disabled + // call close to disconnect from mongo and close the collectors. + // They will be recreated later during Reconfigure without a collection. + c.closeNoMongoMutex(ctx) + return nil + } + + // If the config is enabled, setup mongo + return c.mongoSetup(ctx, *newConfig) +} + // Reconfigure reconfigures Capture. // It is only called by the builtin data manager. func (c *Capture) Reconfigure( @@ -136,7 +214,7 @@ func (c *Capture) Reconfigure( // Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service. if config.CaptureDisabled { c.logger.Info("Capture Disabled") - c.Close() + c.Close(ctx) return } @@ -148,7 +226,8 @@ func (c *Capture) Reconfigure( c.logger.Infof("maximum_capture_file_size_bytes old: %d, new: %d", c.maxCaptureFileSize, config.MaximumCaptureFileSizeBytes) } - newCollectors := c.newCollectors(collectorConfigsByResource, config) + collection := c.mongoReconfigure(ctx, config.MongoConfig) + newCollectors := c.newCollectors(collectorConfigsByResource, config, collection) // If a component/method has been removed from the config, close the collector. c.collectorsMu.Lock() for md, collAndConfig := range c.collectors { @@ -164,9 +243,28 @@ func (c *Capture) Reconfigure( } // Close closes the capture manager. -func (c *Capture) Close() { +func (c *Capture) Close(ctx context.Context) { c.FlushCollectors() c.closeCollectors() + c.mongoMU.Lock() + defer c.mongoMU.Unlock() + if c.mongo.client != nil { + c.logger.Info("closing mongo connection") + goutils.UncheckedError(c.mongo.client.Disconnect(ctx)) + c.mongo = captureMongo{} + } +} + +// closeNoMongoMutex exists for cases when we need to perform close actions in a function +// which is already holding the mongoMu. +func (c *Capture) closeNoMongoMutex(ctx context.Context) { + c.FlushCollectors() + c.closeCollectors() + if c.mongo.client != nil { + c.logger.Info("closing mongo connection") + goutils.UncheckedError(c.mongo.client.Disconnect(ctx)) + c.mongo = captureMongo{} + } } // Initialize a collector for the component/method or update it if it has previously been created. @@ -176,6 +274,7 @@ func (c *Capture) initializeOrUpdateCollector( md collectorMetadata, collectorConfig datamanager.DataCaptureConfig, config Config, + collection *mongo.Collection, ) (*collectorAndConfig, error) { // TODO(DATA-451): validate method params methodParams, err := protoutils.ConvertStringMapToAnyPBMap(collectorConfig.AdditionalParams) @@ -237,10 +336,13 @@ func (c *Capture) initializeOrUpdateCollector( queueSize := defaultIfZeroVal(collectorConfig.CaptureQueueSize, defaultCaptureQueueSize) bufferSize := defaultIfZeroVal(collectorConfig.CaptureBufferSize, defaultCaptureBufferSize) collector, err := collectorConstructor(res, data.CollectorParams{ - ComponentName: collectorConfig.Name.ShortName(), - Interval: data.GetDurationFromHz(collectorConfig.CaptureFrequencyHz), - MethodParams: methodParams, - Target: data.NewCaptureBuffer(targetDir, captureMetadata, config.MaximumCaptureFileSizeBytes), + MongoCollection: collection, + ComponentName: collectorConfig.Name.ShortName(), + ComponentType: collectorConfig.Name.API.String(), + MethodName: collectorConfig.Method, + Interval: data.GetDurationFromHz(collectorConfig.CaptureFrequencyHz), + MethodParams: methodParams, + Target: data.NewCaptureBuffer(targetDir, captureMetadata, config.MaximumCaptureFileSizeBytes), // Set queue size to defaultCaptureQueueSize if it was not set in the config. QueueSize: queueSize, BufferSize: bufferSize, diff --git a/services/datamanager/builtin/capture/config.go b/services/datamanager/builtin/capture/config.go index fe8b99edefe..50836dc2acf 100644 --- a/services/datamanager/builtin/capture/config.go +++ b/services/datamanager/builtin/capture/config.go @@ -1,5 +1,17 @@ package capture +// MongoConfig is the optional data capture mongo config. +type MongoConfig struct { + URI string `json:"uri"` + Database string `json:"database"` + Collection string `json:"collection"` +} + +// Equal returns true when both MongoConfigs are equal. +func (mc MongoConfig) Equal(o MongoConfig) bool { + return mc.URI == o.URI && mc.Database == o.Database && mc.Collection == o.Collection +} + // Config is the capture config. type Config struct { // CaptureDisabled if set to true disables all data capture collectors @@ -12,4 +24,6 @@ type Config struct { // (.prog) files should be allowed to grow to before they are convered into .capture // files MaximumCaptureFileSizeBytes int64 + + MongoConfig *MongoConfig } diff --git a/services/datamanager/builtin/config.go b/services/datamanager/builtin/config.go index 526429d9cdb..1f8b9ff6777 100644 --- a/services/datamanager/builtin/config.go +++ b/services/datamanager/builtin/config.go @@ -39,9 +39,10 @@ type Config struct { CaptureDir string `json:"capture_dir"` Tags []string `json:"tags"` // Capture - CaptureDisabled bool `json:"capture_disabled"` - DeleteEveryNthWhenDiskFull int `json:"delete_every_nth_when_disk_full"` - MaximumCaptureFileSizeBytes int64 `json:"maximum_capture_file_size_bytes"` + CaptureDisabled bool `json:"capture_disabled"` + DeleteEveryNthWhenDiskFull int `json:"delete_every_nth_when_disk_full"` + MaximumCaptureFileSizeBytes int64 `json:"maximum_capture_file_size_bytes"` + MongoCaptureConfig *capture.MongoConfig `json:"mongo_capture_config"` // Sync AdditionalSyncPaths []string `json:"additional_sync_paths"` FileLastModifiedMillis int `json:"file_last_modified_millis"` @@ -89,6 +90,7 @@ func (c *Config) captureConfig() capture.Config { CaptureDir: c.getCaptureDir(), Tags: c.Tags, MaximumCaptureFileSizeBytes: maximumCaptureFileSizeBytes, + MongoConfig: c.MongoCaptureConfig, } } diff --git a/vision/training.go b/vision/training.go index 0d6d8b56777..aebf4a8cc26 100644 --- a/vision/training.go +++ b/vision/training.go @@ -31,12 +31,7 @@ type ImageTrainingStore struct { // NewImageTrainingStore TODO. func NewImageTrainingStore(ctx context.Context, mongoURI, db, collection string) (*ImageTrainingStore, error) { - client, err := mongo.NewClient(options.Client().ApplyURI(mongoURI)) - if err != nil { - return nil, err - } - - err = client.Connect(ctx) + client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI)) if err != nil { return nil, err }