Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libvuln SQL: POC to introduce bulk inserts #996

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 74 additions & 53 deletions datastore/postgres/updatevulnerabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ var (
)
)

type HashData struct {
HashKind string
Hash interface{}
}

// UpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
Expand All @@ -54,26 +59,6 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string
const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
// Insert attempts to create a new vulnerability. It fails silently.
insert = `
INSERT INTO vuln (
hash_kind, hash,
name, updater, description, issued, links, severity, normalized_severity,
package_name, package_version, package_module, package_arch, package_kind,
dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name,
repo_name, repo_key, repo_uri,
fixed_in_version, arch_operation, version_kind, vulnerable_range
) VALUES (
$1, $2,
$3, $4, $5, $6, $7, $8, $9,
$10, $11, $12, $13, $14,
$15, $16, $17, $18, $19, $20, $21, $22,
$23, $24, $25,
$26, $27, $28, VersionRange($29, $30)
)
ON CONFLICT (hash_kind, hash) DO NOTHING;`
// Assoc associates an update operation and a vulnerability. It fails
// silently.
assoc = `
INSERT INTO uo_vuln (uo, vuln) VALUES (
$3,
Expand Down Expand Up @@ -118,42 +103,78 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string

// batch insert vulnerabilities
skipCt := 0

start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
}

pkg := vuln.Package
dist := vuln.Dist
repo := vuln.Repo
if dist == nil {
dist = &zeroDist
batchSize := 1000
totalVulns := len(vulns)
numBatches := (totalVulns + batchSize - 1) / batchSize
mBatcher := microbatch.NewInsert(tx, 1000, time.Minute)
for batchIndex := 0; batchIndex < numBatches; batchIndex++ {
// Insert attempts to create a new vulnerabilities. It fails silently.
insert_query := `
INSERT INTO vuln (
hash_kind, hash,
name, updater, description, issued, links, severity, normalized_severity,
package_name, package_version, package_module, package_arch, package_kind,
dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name,
repo_name, repo_key, repo_uri,
fixed_in_version, arch_operation, version_kind, vulnerable_range
) VALUES %s
ON CONFLICT (hash_kind, hash) DO NOTHING;`
insert_values := []interface{}{}
assoc_values := []HashData{}
placeholders := []string{}
startIndex := batchIndex * batchSize
endIndex := (batchIndex + 1) * batchSize
if endIndex > totalVulns {
endIndex = totalVulns
}
if repo == nil {
repo = &zeroRepo
j := 0
for i := startIndex; i < endIndex; i++ {
vuln := vulns[i]
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
}
pkg := vuln.Package
dist := vuln.Dist
repo := vuln.Repo
if dist == nil {
dist = &zeroDist
}
if repo == nil {
repo = &zeroRepo
}
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)
rowValues := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, VersionRange($%d, $%d))", j*30+1, j*30+2, j*30+3, j*30+4, j*30+5, j*30+6, j*30+7, j*30+8, j*30+9, j*30+10, j*30+11, j*30+12, j*30+13, j*30+14, j*30+15, j*30+16, j*30+17, j*30+18, j*30+19, j*30+20, j*30+21, j*30+22, j*30+23, j*30+24, j*30+25, j*30+26, j*30+27, j*30+28, j*30+29, j*30+30)
placeholders = append(placeholders, rowValues)
insert_values = append(insert_values, hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName,
repo.Name, repo.Key, repo.URI,
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper)
hashData := HashData{
HashKind: hashKind,
Hash: hash,
}
assoc_values = append(assoc_values, hashData)
j += 1
}
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)

err := mBatcher.Queue(ctx, insert,
hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName,
repo.Name, repo.Key, repo.URI,
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
bulkValues := strings.Join(placeholders, ", ")
insert_query = fmt.Sprintf(insert_query, bulkValues)
if len(insert_values) == 0 {
zlog.Debug(ctx).Msg("Bulk operations omitted because of no data")
} else {
_, err = s.pool.Exec(context.Background(), insert_query, insert_values...)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to perform bulk insert vulnerabilities: %w", err)
}
}

if err := mBatcher.Queue(ctx, assoc, hashKind, hash, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
for _, hashData := range assoc_values {
if err := mBatcher.Queue(ctx, assoc, hashData.HashKind, hashData.Hash, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
}
}
}
if err := mBatcher.Done(ctx); err != nil {
Expand Down Expand Up @@ -251,4 +272,4 @@ func rangefmt(r *claircore.Range) (kind *string, lower, upper string) {
upper = buf.String()

return kind, lower, upper
}
}