diff --git a/datastore/postgres/deltaupdatevulnerabilities.go b/datastore/postgres/deltaupdatevulnerabilities.go deleted file mode 100644 index 649e5410c..000000000 --- a/datastore/postgres/deltaupdatevulnerabilities.go +++ /dev/null @@ -1,230 +0,0 @@ -package postgres - -import ( - "context" - "fmt" - "strconv" - "time" - - "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/quay/zlog" - - "github.com/quay/claircore" - "github.com/quay/claircore/libvuln/driver" - "github.com/quay/claircore/pkg/microbatch" -) - -var ( - deltaUpdateVulnerabilitiesCounter = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "claircore", - Subsystem: "vulnstore", - Name: "deltaupdatevulnerabilities_total", - Help: "Total number of database queries issued in the DeltaUpdateVulnerabilities method.", - }, - []string{"query"}, - ) - deltaUpdateVulnerabilitiesDuration = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "claircore", - Subsystem: "vulnstore", - Name: "deltaupdatevulnerabilities_duration_seconds", - Help: "The duration of all queries issued in the DeltaUpdateVulnerabilities method", - }, - []string{"query"}, - ) -) - -// DeltaUpdateVulnerabilities implements vulnstore.Updater. -// -// It creates a new UpdateOperation for this update call, inserts the -// provided vulnerabilities and computes a diff comprising the removed -// and added vulnerabilities for this UpdateOperation. -func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { - const ( - existingQuery = ` - SELECT - "name", - "vuln"."id" - FROM - "vuln" - INNER JOIN "uo_vuln" ON ("vuln"."id" = "uo_vuln"."vuln") - INNER JOIN "latest_update_operations" ON ( - "latest_update_operations"."id" = "uo_vuln"."uo" - ) - WHERE - ( - "latest_update_operations"."kind" = 'vulnerability' - ) - AND - ( - "vuln"."updater" = $1 - )` - // Create makes a new update operation and returns the reference and ID. - create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;` - // Insert attempts to create a new vulnerability. It fails silently. - insert = ` - INSERT INTO vuln ( - hash_kind, hash, - name, updater, description, issued, links, severity, normalized_severity, - package_name, package_version, package_module, package_arch, package_kind, - dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name, - repo_name, repo_key, repo_uri, - fixed_in_version, arch_operation, version_kind, vulnerable_range - ) VALUES ( - $1, $2, - $3, $4, $5, $6, $7, $8, $9, - $10, $11, $12, $13, $14, - $15, $16, $17, $18, $19, $20, $21, $22, - $23, $24, $25, - $26, $27, $28, VersionRange($29, $30) - ) - ON CONFLICT (hash_kind, hash) DO NOTHING;` - // Assoc associates an update operation and a vulnerability. It fails - // silently. - assoc = ` - INSERT INTO uo_vuln (uo, vuln) VALUES ( - $3, - (SELECT id FROM vuln WHERE hash_kind = $1 AND hash = $2)) - ON CONFLICT DO NOTHING;` - assocExisting = `INSERT INTO uo_vuln (uo, vuln) VALUES ($1, $2) ON CONFLICT DO NOTHING;` - undo = `DELETE FROM update_operation WHERE id = $1;` - refreshView = `REFRESH MATERIALIZED VIEW CONCURRENTLY latest_update_operations;` - ) - ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/updateVulnerabilities") - - // Get existing vulns - rows, err := s.pool.Query(ctx, existingQuery, updater) - if err != nil { - return uuid.Nil, fmt.Errorf("failed to get existing vulns: %w", err) - } - oldVulns := make(map[string]string) - for rows.Next() { - var tmpID int64 - var ID, name string - err := rows.Scan( - &name, - &tmpID, - ) - - ID = strconv.FormatInt(tmpID, 10) - if err != nil { - return uuid.Nil, fmt.Errorf("failed to scan vulnerability: %w", err) - } - oldVulns[name] = ID - } - - if len(oldVulns) > 0 { - for _, v := range vulns { - // If we have an existing vuln in the new batch - // Delete it from the oldVulns map so it doesn't - // get associated with the new uo. - delete(oldVulns, v.Name) - } - } - - var uoID uint64 - var ref uuid.UUID - - start := time.Now() - - if err := s.pool.QueryRow(ctx, create, updater, string(fingerprint)).Scan(&uoID, &ref); err != nil { - return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err) - } - var success bool - defer func() { - if !success { - if _, err := s.pool.Exec(ctx, undo, uoID); err != nil { - zlog.Error(ctx). - Err(err). - Stringer("ref", ref). - Msg("unable to remove update operation") - } - } - }() - - deltaUpdateVulnerabilitiesCounter.WithLabelValues("create").Add(1) - deltaUpdateVulnerabilitiesDuration.WithLabelValues("create").Observe(time.Since(start).Seconds()) - - tx, err := s.pool.Begin(ctx) - if err != nil { - return uuid.Nil, fmt.Errorf("unable to start transaction: %w", err) - } - defer tx.Rollback(ctx) - - zlog.Debug(ctx). - Str("ref", ref.String()). - Msg("update_operation created") - - // Update oldVuln's uos - for _, vID := range oldVulns { - _, err := tx.Exec(ctx, assocExisting, uoID, vID) - if err != nil { - return uuid.Nil, fmt.Errorf("could not update old vulnerability with new UO: %w", err) - } - } - - // batch insert vulnerabilities - skipCt := 0 - - start = time.Now() - - mBatcher := microbatch.NewInsert(tx, 2000, time.Minute) - for _, vuln := range vulns { - if vuln.Package == nil || vuln.Package.Name == "" { - skipCt++ - continue - } - - pkg := vuln.Package - dist := vuln.Dist - repo := vuln.Repo - if dist == nil { - dist = &zeroDist - } - if repo == nil { - repo = &zeroRepo - } - hashKind, hash := md5Vuln(vuln) - vKind, vrLower, vrUpper := rangefmt(vuln.Range) - - err := mBatcher.Queue(ctx, insert, - hashKind, hash, - vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity, - pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind, - dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName, - repo.Name, repo.Key, repo.URI, - vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper, - ) - if err != nil { - return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err) - } - - if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil { - return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) - } - } - if err := mBatcher.Done(ctx); err != nil { - return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err) - } - - updateVulnerabilitiesCounter.WithLabelValues("insert_batch").Add(1) - updateVulnerabilitiesDuration.WithLabelValues("insert_batch").Observe(time.Since(start).Seconds()) - - if err := tx.Commit(ctx); err != nil { - return uuid.Nil, fmt.Errorf("failed to commit transaction: %w", err) - } - if _, err = s.pool.Exec(ctx, refreshView); err != nil { - return uuid.Nil, fmt.Errorf("could not refresh latest_update_operations: %w", err) - } - - success = true - zlog.Debug(ctx). - Str("ref", ref.String()). - Int("skipped", skipCt). - Int("inserted", len(vulns)-skipCt). - Msg("update_operation committed") - return ref, nil -} diff --git a/datastore/postgres/deltaupdatevulnerabilities_test.go b/datastore/postgres/deltaupdatevulnerabilities_test.go deleted file mode 100644 index 1f8bb7ff3..000000000 --- a/datastore/postgres/deltaupdatevulnerabilities_test.go +++ /dev/null @@ -1,194 +0,0 @@ -package postgres - -import ( - "context" - "testing" - - "github.com/google/uuid" - "github.com/quay/zlog" - - "github.com/quay/claircore" - "github.com/quay/claircore/datastore" - "github.com/quay/claircore/libvuln/driver" - "github.com/quay/claircore/test/integration" - pgtest "github.com/quay/claircore/test/postgres" -) - -type latestVulnTestCase struct { - TestName string - Updater string - VulnCount int - FirstOp, SecondOp []*claircore.Vulnerability - Records []*claircore.IndexRecord -} - -func TestGetLatestVulnerabilities(t *testing.T) { - integration.NeedDB(t) - ctx := zlog.Test(context.Background(), t) - - cases := []latestVulnTestCase{ - { - TestName: "test initial op vuln still relevant", - Updater: updater, - VulnCount: 1, - FirstOp: []*claircore.Vulnerability{ - { - Updater: updater, - Name: "CVE-123", - Package: &claircore.Package{ - Name: "vi", - }, - }, - }, - SecondOp: []*claircore.Vulnerability{ - { - Updater: updater, - Name: "CVE-456", - Package: &claircore.Package{ - Name: "vim", - }, - }, - { - Updater: updater, - Name: "CVE-789", - Package: &claircore.Package{ - Name: "nano", - }, - }, - }, - Records: []*claircore.IndexRecord{ - { - Package: &claircore.Package{ - Name: "vi", - Source: &claircore.Package{ - Name: "vi", - Version: "v1.0.0", - }, - }, - }, - }, - }, - { - TestName: "test vuln is overwritten not duped", - Updater: updater, - VulnCount: 1, - FirstOp: []*claircore.Vulnerability{ - { - Updater: updater, - Name: "CVE-123", - Package: &claircore.Package{ - Name: "grep", - }, - Severity: "BAD", - }, - { - Updater: updater, - Name: "CVE-456", - Package: &claircore.Package{ - Name: "sed", - }, - }, - }, - SecondOp: []*claircore.Vulnerability{ - { - Updater: updater, - Name: "CVE-123", - Package: &claircore.Package{ - Name: "grep", - }, - Severity: "NOT AS BAD AS WE THOUGHT", - }, - }, - Records: []*claircore.IndexRecord{ - { - Package: &claircore.Package{ - Name: "grep", - Source: &claircore.Package{ - Name: "grep", - Version: "v1.0.0", - }, - }, - }, - }, - }, - { - TestName: "test two vulns same package different uo", - Updater: updater, - VulnCount: 2, - FirstOp: []*claircore.Vulnerability{ - { - Updater: updater, - Name: "CVE-000", - Package: &claircore.Package{ - Name: "python3", - }, - }, - }, - SecondOp: []*claircore.Vulnerability{ - { - Updater: updater, - Name: "CVE-123", - Package: &claircore.Package{ - Name: "python3", - }, - }, - { - Updater: updater, - Name: "CVE-456", - Package: &claircore.Package{ - Name: "python3-crypto", - }, - }, - { - Updater: updater, - Name: "CVE-789", - Package: &claircore.Package{ - Name: "python3-urllib3", - }, - }, - }, - Records: []*claircore.IndexRecord{ - { - Package: &claircore.Package{ - Name: "python3", - Source: &claircore.Package{ - Name: "python3", - Version: "v1.0.0", - }, - }, - }, - }, - }, - } - - // prepare DB - pool := pgtest.TestMatcherDB(ctx, t) - store := NewMatcherStore(pool) - - // run test cases - for _, tc := range cases { - t.Run(tc.TestName, func(t *testing.T) { - _, err := store.DeltaUpdateVulnerabilities(ctx, tc.Updater, driver.Fingerprint(uuid.New().String()), tc.FirstOp) - if err != nil { - t.Fatalf("failed to perform update for first op: %v", err) - } - _, err = store.DeltaUpdateVulnerabilities(ctx, tc.Updater, driver.Fingerprint(uuid.New().String()), tc.SecondOp) - if err != nil { - t.Fatalf("failed to perform update for second op: %v", err) - } - - res, err := store.Get(ctx, tc.Records, datastore.GetOpts{}) - if err != nil { - t.Fatalf("failed to get vulns: %v", err) - } - ct := 0 - for _, vs := range res { - ct = ct + len(vs) - } - - if ct != tc.VulnCount { - t.Fatalf("got %d vulns, want %d", ct, tc.VulnCount) - } - }) - } -} diff --git a/datastore/postgres/updatevulnerabilities.go b/datastore/postgres/updatevulnerabilities.go index 5fa09d1ef..eb65dc0ac 100644 --- a/datastore/postgres/updatevulnerabilities.go +++ b/datastore/postgres/updatevulnerabilities.go @@ -51,9 +51,48 @@ var ( // provided vulnerabilities and computes a diff comprising the removed // and added vulnerabilities for this UpdateOperation. func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { + return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false) +} + +// DeltaUpdateVulnerabilities implements vulnstore.Updater. +// +// It shares functionality with UpdateVulnerabilities but with delta +// support, so the process goes: +// * Create a new UpdateOperation +// * Query existing vulnerabilities for the updater +// * Discount and vulnerabilities with newer updates and deleted vulnerabilities +// * Update the associated updateOperation for the remaining existing vulnerabilities +// * Insert the new vulnerabilities +// * Associate new vulnerabilities with new updateOperation +func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) { + return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true) +} + +func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) { const ( // Create makes a new update operation and returns the reference and ID. create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;` + // Select existing vulnerabilities that are associated with the latest_update_operation. + selectExisting = `UpdateOperation + SELECT + "name", + "vuln"."id" + FROM + "vuln" + INNER JOIN "uo_vuln" ON ("vuln"."id" = "uo_vuln"."vuln") + INNER JOIN "latest_update_operations" ON ( + "latest_update_operations"."id" = "uo_vuln"."uo" + ) + WHERE + ( + "latest_update_operations"."kind" = 'vulnerability' + ) + AND + ( + "vuln"."updater" = $1 + )` + // assocExisting associates existing vulnerabilities with new update operations + assocExisting = `INSERT INTO uo_vuln (uo, vuln) VALUES ($1, $2) ON CONFLICT DO NOTHING;` // Insert attempts to create a new vulnerability. It fails silently. insert = ` INSERT INTO vuln ( @@ -84,18 +123,18 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string ) ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/updateVulnerabilities") - var id uint64 + var uoID uint64 var ref uuid.UUID start := time.Now() - if err := s.pool.QueryRow(ctx, create, updater, string(fingerprint)).Scan(&id, &ref); err != nil { + if err := s.pool.QueryRow(ctx, create, updater, string(fingerprint)).Scan(&uoID, &ref); err != nil { return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err) } var success bool defer func() { if !success { - if _, err := s.pool.Exec(ctx, undo, id); err != nil { + if _, err := s.pool.Exec(ctx, undo, uoID); err != nil { zlog.Error(ctx). Err(err). Stringer("ref", ref). @@ -117,6 +156,62 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string Str("ref", ref.String()). Msg("update_operation created") + if delta { + // Get existing vulns + // The reason this still works even though the new update_operation + // is already created is because the latest_update_operation view isn't updated until + // the end of this function. + start = time.Now() + rows, err := s.pool.Query(ctx, selectExisting, updater) + if err != nil { + return uuid.Nil, fmt.Errorf("failed to get existing vulns: %w", err) + } + updateVulnerabilitiesCounter.WithLabelValues("selectExisting").Add(1) + updateVulnerabilitiesDuration.WithLabelValues("selectExisting").Observe(time.Since(start).Seconds()) + + oldVulns := make(map[string]string) + for rows.Next() { + var tmpID int64 + var ID, name string + err := rows.Scan( + &name, + &tmpID, + ) + + ID = strconv.FormatInt(tmpID, 10) + if err != nil { + return uuid.Nil, fmt.Errorf("failed to scan vulnerability: %w", err) + } + oldVulns[name] = ID + } + + if len(oldVulns) > 0 { + for _, v := range vulns { + // If we have an existing vuln in the new batch + // delete it from the oldVulns map so it doesn't + // get associated with the new update_operation. + delete(oldVulns, v.Name) + } + for _, delName := range deletedVulns { + // If we have an existing vuln that has been signaled + // as deleted by the updater then delete it so it doesn't + // get associated with the new update_operation. + delete(oldVulns, delName) + } + } + start = time.Now() + // Update oldVulns' update_operations + for _, vID := range oldVulns { + _, err := tx.Exec(ctx, assocExisting, uoID, vID) + if err != nil { + return uuid.Nil, fmt.Errorf("could not update old vulnerability with new UO: %w", err) + } + } + updateVulnerabilitiesCounter.WithLabelValues("assocExisting").Add(float64(len(oldVulns))) + updateVulnerabilitiesDuration.WithLabelValues("assocExisting").Observe(time.Since(start).Seconds()) + + } + // batch insert vulnerabilities skipCt := 0 @@ -153,7 +248,7 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err) } - if err := mBatcher.Queue(ctx, assoc, hashKind, hash, id); err != nil { + if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil { return uuid.Nil, fmt.Errorf("failed to queue association: %w", err) } } diff --git a/datastore/postgres/updatevulnerabilities_test.go b/datastore/postgres/updatevulnerabilities_test.go new file mode 100644 index 000000000..3d4a2369e --- /dev/null +++ b/datastore/postgres/updatevulnerabilities_test.go @@ -0,0 +1,264 @@ +package postgres + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/datastore" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres" +) + +type latestVulnTestCase struct { + TestName string + Updater string + VulnCount int + FirstOp, SecondOp *op + Records []*claircore.IndexRecord +} + +type op struct { + vulns []*claircore.Vulnerability + deletedVulns []string +} + +func TestGetLatestVulnerabilities(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + + cases := []latestVulnTestCase{ + { + TestName: "test initial op vuln still relevant", + Updater: updater, + VulnCount: 1, + FirstOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-123", + Package: &claircore.Package{ + Name: "vi", + }, + }, + }, + }, + SecondOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-456", + Package: &claircore.Package{ + Name: "vim", + }, + }, + { + Updater: updater, + Name: "CVE-789", + Package: &claircore.Package{ + Name: "nano", + }, + }, + }, + }, + Records: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ + Name: "vi", + Source: &claircore.Package{ + Name: "vi", + Version: "v1.0.0", + }, + }, + }, + }, + }, + { + TestName: "test vuln is overwritten not duped", + Updater: updater, + VulnCount: 1, + FirstOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-123", + Package: &claircore.Package{ + Name: "grep", + }, + Severity: "BAD", + }, + { + Updater: updater, + Name: "CVE-456", + Package: &claircore.Package{ + Name: "sed", + }, + }, + }, + }, + SecondOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-123", + Package: &claircore.Package{ + Name: "grep", + }, + Severity: "NOT AS BAD AS WE THOUGHT", + }, + }, + }, + Records: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ + Name: "grep", + Source: &claircore.Package{ + Name: "grep", + Version: "v1.0.0", + }, + }, + }, + }, + }, + { + TestName: "test two vulns same package different uo", + Updater: updater, + VulnCount: 2, + FirstOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-000", + Package: &claircore.Package{ + Name: "python3", + }, + }, + }, + }, + SecondOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-123", + Package: &claircore.Package{ + Name: "python3", + }, + }, + { + Updater: updater, + Name: "CVE-456", + Package: &claircore.Package{ + Name: "python3-crypto", + }, + }, + { + Updater: updater, + Name: "CVE-789", + Package: &claircore.Package{ + Name: "python3-urllib3", + }, + }, + }, + }, + Records: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ + Name: "python3", + Source: &claircore.Package{ + Name: "python3", + Version: "v1.0.0", + }, + }, + }, + }, + }, + { + TestName: "test deleting vuln", + Updater: updater, + VulnCount: 0, + FirstOp: &op{ + deletedVulns: []string{}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-000", + Package: &claircore.Package{ + Name: "jq", + }, + }, + }, + }, + SecondOp: &op{ + deletedVulns: []string{"CVE-000"}, + vulns: []*claircore.Vulnerability{ + { + Updater: updater, + Name: "CVE-456", + Package: &claircore.Package{ + Name: "jq-libs", + }, + }, + { + Updater: updater, + Name: "CVE-789", + Package: &claircore.Package{ + Name: "jq-docs", + }, + }, + }, + }, + Records: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ + Name: "jq", + Source: &claircore.Package{ + Name: "jq", + Version: "v1.0.0", + }, + }, + }, + }, + }, + } + + // prepare DB + pool := pgtest.TestMatcherDB(ctx, t) + store := NewMatcherStore(pool) + + // run test cases + for _, tc := range cases { + t.Run(tc.TestName, func(t *testing.T) { + _, err := store.DeltaUpdateVulnerabilities(ctx, tc.Updater, driver.Fingerprint(uuid.New().String()), tc.FirstOp.vulns, tc.FirstOp.deletedVulns) + if err != nil { + t.Fatalf("failed to perform update for first op: %v", err) + } + _, err = store.DeltaUpdateVulnerabilities(ctx, tc.Updater, driver.Fingerprint(uuid.New().String()), tc.SecondOp.vulns, tc.SecondOp.deletedVulns) + if err != nil { + t.Fatalf("failed to perform update for second op: %v", err) + } + + res, err := store.Get(ctx, tc.Records, datastore.GetOpts{}) + if err != nil { + t.Fatalf("failed to get vulns: %v", err) + } + ct := 0 + for _, vs := range res { + ct = ct + len(vs) + } + + if ct != tc.VulnCount { + t.Fatalf("got %d vulns, want %d", ct, tc.VulnCount) + } + }) + } +} diff --git a/datastore/updater.go b/datastore/updater.go index 72037d391..705286427 100644 --- a/datastore/updater.go +++ b/datastore/updater.go @@ -19,7 +19,10 @@ type Updater interface { // vulnerabilities, and ensures vulnerabilities from previous updates are // not queried by clients. UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) - DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) + // DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing + // vulnerabilities and new vulnerabilities. It also takes an array of deleted + // vulnerability names which should no longer be available to query. + DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) // GetUpdateOperations returns a list of UpdateOperations in date descending // order for the given updaters. // diff --git a/libvuln/driver/updater.go b/libvuln/driver/updater.go index 7e8590fe0..4a06bdabb 100644 --- a/libvuln/driver/updater.go +++ b/libvuln/driver/updater.go @@ -60,6 +60,10 @@ type Configurable interface { Configure(context.Context, ConfigUnmarshaler, *http.Client) error } +// DeltaUpdater is an interface that Updaters can implement to force the manager to call +// DeltaParse() in lieu of Parse() with the understanding that the resulting vulnerabilities +// represent a delta and not the entire vulnerability database. DeltaParse can also return +// a slice of strings that represent the names of deleted vulnerabilities. type DeltaUpdater interface { - DeltaFetch(context.Context, Fingerprint) (io.ReadCloser, Fingerprint, error) + DeltaParse(ctx context.Context, contents io.ReadCloser) ([]*claircore.Vulnerability, []string, error) } diff --git a/libvuln/jsonblob/jsonblob.go b/libvuln/jsonblob/jsonblob.go index 5a426e2c6..1ce17632d 100644 --- a/libvuln/jsonblob/jsonblob.go +++ b/libvuln/jsonblob/jsonblob.go @@ -445,8 +445,8 @@ func (s *Store) RecordUpdaterSetStatus(ctx context.Context, updaterSet string, u return nil } -// DeltaUpdateVulnerabilities is unimplemented -func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { +// DeltaUpdateVulnerabilities is a noop +func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deleted []string) (uuid.UUID, error) { return uuid.Nil, nil } diff --git a/libvuln/updates/manager.go b/libvuln/updates/manager.go index 310fc84c1..28252d9d1 100644 --- a/libvuln/updates/manager.go +++ b/libvuln/updates/manager.go @@ -305,12 +305,18 @@ func (m *Manager) driveUpdater(ctx context.Context, u driver.Updater) (err error defer zlog.Info(ctx).Msg("finished update") uoKind := driver.VulnerabilityKind + // Do some assertions eu, euOK := u.(driver.EnrichmentUpdater) if euOK { zlog.Info(ctx). Msg("found EnrichmentUpdater") uoKind = driver.EnrichmentKind } + du, duOK := u.(driver.DeltaUpdater) + if duOK { + zlog.Info(ctx). + Msg("found DeltaUpdater") + } var prevFP driver.Fingerprint opmap, err := m.store.GetUpdateOperations(ctx, uoKind, name) @@ -322,24 +328,12 @@ func (m *Manager) driveUpdater(ctx context.Context, u driver.Updater) (err error prevFP = s[0].Fingerprint } - updateFunc := m.store.UpdateVulnerabilities - fetchFunc := u.Fetch - if du, duOK := u.(driver.DeltaUpdater); duOK { - zlog.Info(ctx). - Str("updater", u.Name()). - Msg("found DeltaUpdater") - updateFunc = m.store.DeltaUpdateVulnerabilities - // TODO (crozzy): Still on the fench about how to inform the - // manager what is is a delta updater. - fetchFunc = du.DeltaFetch - } - var vulnDB io.ReadCloser switch { case euOK: vulnDB, newFP, err = eu.FetchEnrichment(ctx, prevFP) default: - vulnDB, newFP, err = fetchFunc(ctx, prevFP) + vulnDB, newFP, err = u.Fetch(ctx, prevFP) } if vulnDB != nil { defer vulnDB.Close() @@ -367,13 +361,26 @@ func (m *Manager) driveUpdater(ctx context.Context, u driver.Updater) (err error ref, err = m.store.UpdateEnrichments(ctx, name, newFP, ers) default: var vulns []*claircore.Vulnerability - vulns, err = u.Parse(ctx, vulnDB) - if err != nil { - err = fmt.Errorf("vulnerability database parse failed: %v", err) - return - } + switch { + case duOK: + var deletedVulns []string + vulns, deletedVulns, err = du.DeltaParse(ctx, vulnDB) + if err != nil { + err = fmt.Errorf("vulnerability database delta parse failed: %v", err) + return + } - ref, err = updateFunc(ctx, name, newFP, vulns) + ref, err = m.store.DeltaUpdateVulnerabilities(ctx, name, newFP, vulns, deletedVulns) + + default: + vulns, err = u.Parse(ctx, vulnDB) + if err != nil { + err = fmt.Errorf("vulnerability database parse failed: %v", err) + return + } + + ref, err = m.store.UpdateVulnerabilities(ctx, name, newFP, vulns) + } } if err != nil { err = fmt.Errorf("failed to update: %v", err)