Skip to content

Commit

Permalink
fixup! updater: Support delta updates
Browse files Browse the repository at this point in the history
  • Loading branch information
crozzy committed Dec 19, 2023
1 parent 9323ca5 commit 426d8b5
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 196 deletions.
230 changes: 230 additions & 0 deletions datastore/postgres/deltaupdatevulnerabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package postgres

import (
"context"
"fmt"
"strconv"
"time"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/quay/zlog"

"github.com/quay/claircore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/microbatch"
)

var (
deltaUpdateVulnerabilitiesCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "claircore",
Subsystem: "vulnstore",
Name: "deltaupdatevulnerabilities_total",
Help: "Total number of database queries issued in the DeltaUpdateVulnerabilities method.",
},
[]string{"query"},
)
deltaUpdateVulnerabilitiesDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "claircore",
Subsystem: "vulnstore",
Name: "deltaupdatevulnerabilities_duration_seconds",
Help: "The duration of all queries issued in the DeltaUpdateVulnerabilities method",
},
[]string{"query"},
)
)

// DeltaUpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
// provided vulnerabilities and computes a diff comprising the removed
// and added vulnerabilities for this UpdateOperation.
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
const (
existingQuery = `
SELECT
"name",
"vuln"."id"
FROM
"vuln"
INNER JOIN "uo_vuln" ON ("vuln"."id" = "uo_vuln"."vuln")
INNER JOIN "latest_update_operations" ON (
"latest_update_operations"."id" = "uo_vuln"."uo"
)
WHERE
(
"latest_update_operations"."kind" = 'vulnerability'
)
AND
(
"vuln"."updater" = $1
)`
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
// Insert attempts to create a new vulnerability. It fails silently.
insert = `
INSERT INTO vuln (
hash_kind, hash,
name, updater, description, issued, links, severity, normalized_severity,
package_name, package_version, package_module, package_arch, package_kind,
dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name,
repo_name, repo_key, repo_uri,
fixed_in_version, arch_operation, version_kind, vulnerable_range
) VALUES (
$1, $2,
$3, $4, $5, $6, $7, $8, $9,
$10, $11, $12, $13, $14,
$15, $16, $17, $18, $19, $20, $21, $22,
$23, $24, $25,
$26, $27, $28, VersionRange($29, $30)
)
ON CONFLICT (hash_kind, hash) DO NOTHING;`
// Assoc associates an update operation and a vulnerability. It fails
// silently.
assoc = `
INSERT INTO uo_vuln (uo, vuln) VALUES (
$3,
(SELECT id FROM vuln WHERE hash_kind = $1 AND hash = $2))
ON CONFLICT DO NOTHING;`
assocExisting = `INSERT INTO uo_vuln (uo, vuln) VALUES ($1, $2) ON CONFLICT DO NOTHING;`
undo = `DELETE FROM update_operation WHERE id = $1;`
refreshView = `REFRESH MATERIALIZED VIEW CONCURRENTLY latest_update_operations;`
)
ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/updateVulnerabilities")

// Get existing vulns
rows, err := s.pool.Query(ctx, existingQuery, updater)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to get existing vulns: %w", err)
}
oldVulns := make(map[string]string)
for rows.Next() {
var tmpID int64
var ID, name string
err := rows.Scan(
&name,
&tmpID,
)

ID = strconv.FormatInt(tmpID, 10)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to scan vulnerability: %w", err)
}
oldVulns[name] = ID
}

if len(oldVulns) > 0 {
for _, v := range vulns {
// If we have an existing vuln in the new batch
// Delete it from the oldVulns map so it doesn't
// get associated with the new uo.
delete(oldVulns, v.Name)
}
}

var uoID uint64
var ref uuid.UUID

start := time.Now()

if err := s.pool.QueryRow(ctx, create, updater, string(fingerprint)).Scan(&uoID, &ref); err != nil {
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err)
}
var success bool
defer func() {
if !success {
if _, err := s.pool.Exec(ctx, undo, uoID); err != nil {
zlog.Error(ctx).
Err(err).
Stringer("ref", ref).
Msg("unable to remove update operation")
}
}
}()

deltaUpdateVulnerabilitiesCounter.WithLabelValues("create").Add(1)
deltaUpdateVulnerabilitiesDuration.WithLabelValues("create").Observe(time.Since(start).Seconds())

tx, err := s.pool.Begin(ctx)
if err != nil {
return uuid.Nil, fmt.Errorf("unable to start transaction: %w", err)
}
defer tx.Rollback(ctx)

zlog.Debug(ctx).
Str("ref", ref.String()).
Msg("update_operation created")

// Update oldVuln's uos
for _, vID := range oldVulns {
_, err := tx.Exec(ctx, assocExisting, uoID, vID)
if err != nil {
return uuid.Nil, fmt.Errorf("could not update old vulnerability with new UO: %w", err)
}
}

// batch insert vulnerabilities
skipCt := 0

start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
}

pkg := vuln.Package
dist := vuln.Dist
repo := vuln.Repo
if dist == nil {
dist = &zeroDist
}
if repo == nil {
repo = &zeroRepo
}
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)

err := mBatcher.Queue(ctx, insert,
hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName,
repo.Name, repo.Key, repo.URI,
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
}

if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
}
}
if err := mBatcher.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err)
}

updateVulnerabilitiesCounter.WithLabelValues("insert_batch").Add(1)
updateVulnerabilitiesDuration.WithLabelValues("insert_batch").Observe(time.Since(start).Seconds())

if err := tx.Commit(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to commit transaction: %w", err)
}
if _, err = s.pool.Exec(ctx, refreshView); err != nil {
return uuid.Nil, fmt.Errorf("could not refresh latest_update_operations: %w", err)
}

success = true
zlog.Debug(ctx).
Str("ref", ref.String()).
Int("skipped", skipCt).
Int("inserted", len(vulns)-skipCt).
Msg("update_operation committed")
return ref, nil
}
Loading

0 comments on commit 426d8b5

Please sign in to comment.