From d03012bf95479fc4fcf859f5019a311647b2510c Mon Sep 17 00:00:00 2001 From: crozzy Date: Wed, 17 Jan 2024 15:46:49 -0800 Subject: [PATCH] fixup! updater: Support delta updates --- datastore/postgres/updatevulnerabilities.go | 47 ++++++++++++--------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/datastore/postgres/updatevulnerabilities.go b/datastore/postgres/updatevulnerabilities.go index 82ea3c260..45d4b4638 100644 --- a/datastore/postgres/updatevulnerabilities.go +++ b/datastore/postgres/updatevulnerabilities.go @@ -32,7 +32,7 @@ var ( Name: "updatevulnerabilities_total", Help: "Total number of database queries issued in the updateVulnerabilities method.", }, - []string{"query"}, + []string{"query", "is_delta"}, ) updateVulnerabilitiesDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ @@ -41,7 +41,7 @@ var ( Name: "updatevulnerabilities_duration_seconds", Help: "The duration of all queries issued in the updateVulnerabilities method", }, - []string{"query"}, + []string{"query", "is_delta"}, ) ) @@ -51,20 +51,23 @@ var ( // provided vulnerabilities and computes a diff comprising the removed // and added vulnerabilities for this UpdateOperation. func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities") return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false) } // DeltaUpdateVulnerabilities implements vulnstore.Updater. // -// It shares functionality with UpdateVulnerabilities but with delta -// support, so the process goes: -// * Create a new UpdateOperation -// * Query existing vulnerabilities for the updater -// * Discount and vulnerabilities with newer updates and deleted vulnerabilities -// * Update the associated updateOperation for the remaining existing vulnerabilities -// * Insert the new vulnerabilities -// * Associate new vulnerabilities with new updateOperation +// It is similar to UpdateVulnerabilities but support processing of +// partial data as opposed to needing an entire vulnerability database +// Order of operations: +// - Create a new UpdateOperation +// - Query existing vulnerabilities for the updater +// - Discount and vulnerabilities with newer updates and deleted vulnerabilities +// - Update the associated updateOperation for the remaining existing vulnerabilities +// - Insert the new vulnerabilities +// - Associate new vulnerabilities with new updateOperation func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) { + ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities") return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true) } @@ -121,7 +124,6 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string undo = `DELETE FROM update_operation WHERE id = $1;` refreshView = `REFRESH MATERIALIZED VIEW CONCURRENTLY latest_update_operations;` ) - ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/updateVulnerabilities") var uoID uint64 var ref uuid.UUID @@ -143,8 +145,8 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string } }() - updateVulnerabilitiesCounter.WithLabelValues("create").Add(1) - updateVulnerabilitiesDuration.WithLabelValues("create").Observe(time.Since(start).Seconds()) + updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1) + updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) tx, err := s.pool.Begin(ctx) if err != nil { @@ -157,6 +159,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string Msg("update_operation created") if delta { + ctx = zlog.ContextWithValues(ctx, "mode", "delta") // Get existing vulns // The reason this still works even though the new update_operation // is already created is because the latest_update_operation view isn't updated until @@ -166,8 +169,9 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string if err != nil { return uuid.Nil, fmt.Errorf("failed to get existing vulns: %w", err) } - updateVulnerabilitiesCounter.WithLabelValues("selectExisting").Add(1) - updateVulnerabilitiesDuration.WithLabelValues("selectExisting").Observe(time.Since(start).Seconds()) + defer rows.Close() + updateVulnerabilitiesCounter.WithLabelValues("selectExisting", strconv.FormatBool(delta)).Add(1) + updateVulnerabilitiesDuration.WithLabelValues("selectExisting", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) oldVulns := make(map[string][]string) for rows.Next() { @@ -184,6 +188,9 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string } oldVulns[name] = append(oldVulns[name], ID) } + if err := rows.Err(); err != nil { + return uuid.Nil, fmt.Errorf("error reading existing vulnerabilities: %w", err) + } if len(oldVulns) > 0 { for _, v := range vulns { @@ -200,7 +207,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string } } start = time.Now() - // Update oldVulns' update_operations + // Associate already existing vulnerabilities with new update_operation. for _, vs := range oldVulns { for _, vID := range vs { _, err := tx.Exec(ctx, assocExisting, uoID, vID) @@ -209,8 +216,8 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string } } } - updateVulnerabilitiesCounter.WithLabelValues("assocExisting").Add(float64(len(oldVulns))) - updateVulnerabilitiesDuration.WithLabelValues("assocExisting").Observe(time.Since(start).Seconds()) + updateVulnerabilitiesCounter.WithLabelValues("assocExisting", strconv.FormatBool(delta)).Add(float64(len(oldVulns))) + updateVulnerabilitiesDuration.WithLabelValues("assocExisting", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) } @@ -258,8 +265,8 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err) } - updateVulnerabilitiesCounter.WithLabelValues("insert_batch").Add(1) - updateVulnerabilitiesDuration.WithLabelValues("insert_batch").Observe(time.Since(start).Seconds()) + updateVulnerabilitiesCounter.WithLabelValues("insert_batch", strconv.FormatBool(delta)).Add(1) + updateVulnerabilitiesDuration.WithLabelValues("insert_batch", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds()) if err := tx.Commit(ctx); err != nil { return uuid.Nil, fmt.Errorf("failed to commit transaction: %w", err)