diff --git a/rhel/vex/fetcher.go b/rhel/vex/fetcher.go index c82c20a56..577eee4ae 100644 --- a/rhel/vex/fetcher.go +++ b/rhel/vex/fetcher.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "net/http" + "net/url" "path" "regexp" "strconv" @@ -31,6 +32,17 @@ var ( cvePathRegex = regexp.MustCompile(`^\d{4}/(cve-\d{4}-\d{4,}).json$`) ) +// Fetch pulls data down from the Red Hat VEX endpoints. The order of operations is: +// 1. Check if we need to process the entire archive of data. If yes: +// - Make a request to discover the latest archive endpoint. +// - Make a HEAD request to archive endpoint to get the last-modified header. +// - Save the last-modified time in the fingerprint's requestTime. +// 2. Process the changes.csv file, requesting and appending the entries that changed since the finderprint's requestTime. +// 3. Process the deletions.csv file, processing the entries that changed since the finderprint's requestTime. +// 4. If we need to process entire archive, request the archive data and append the entries that have not been changed or deleted. +// +// This helps to ensure that we only persist one copy of an advisory in the worst possible case. In most cases, +// after the initial load, the number of processed files should be very small. func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCloser, driver.Fingerprint, error) { ctx = zlog.ContextWithValues(ctx, "component", "rhel/vex/Updater.Fetch") fp, err := parseFingerprint(hint) @@ -62,48 +74,46 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl } }() + var compressedURL *url.URL // Is this the first run or has the updater changed since the last run? - if fp.changesEtag == "" || fp.version != updaterVersion { + processArchive := fp.changesEtag == "" || fp.version != updaterVersion + if processArchive { // We need to go after the full corpus of vulnerabilities - // First we target the archive_latest.txt file - latestURI, err := u.url.Parse(latestFile) + // First we target the archive_latest.txt file. + var err error + compressedURL, err = u.getCompressedFileURL(ctx) if err != nil { - return nil, hint, err - } - latestReq, err := http.NewRequestWithContext(ctx, http.MethodGet, latestURI.String(), nil) - if err != nil { - return nil, hint, err - } - latestRes, err := u.client.Do(latestReq) - if err != nil { - return nil, hint, err - } - defer latestRes.Body.Close() - - err = httputil.CheckResponse(latestRes, http.StatusOK) - if err != nil { - return nil, hint, fmt.Errorf("unexpected response from archive_latest.txt: %w", err) + return nil, hint, fmt.Errorf("could not get compressed file URL: %w", err) } + zlog.Debug(ctx). + Str("url", compressedURL.String()). + Msg("got compressed URL") - body, err := io.ReadAll(latestRes.Body) // Fine to use as expecting small number of bytes. + fp.requestTime, err = u.getLastModified(ctx, compressedURL) if err != nil { - return nil, hint, err + return nil, hint, fmt.Errorf("could not get last-modified header: %w", err) } + } - compressedFilename := string(body) - zlog.Debug(ctx). - Str("filename", compressedFilename). - Msg("requesting latest compressed file") + changed := map[string]bool{} + err = u.processChanges(ctx, cw, fp, changed) + if err != nil { + return nil, hint, err + } - uri, err := u.url.Parse(compressedFilename) - if err != nil { - return nil, hint, err - } + err = u.processDeletions(ctx, cw, fp, changed) + if err != nil { + return nil, hint, err + } + if processArchive { rctx, cancel := context.WithTimeout(ctx, compressedFileTimeout) defer cancel() - req, err := http.NewRequestWithContext(rctx, http.MethodGet, uri.String(), nil) + if compressedURL == nil { + return nil, hint, fmt.Errorf("compressed file URL needs to be populated") + } + req, err := http.NewRequestWithContext(rctx, http.MethodGet, compressedURL.String(), nil) if err != nil { return nil, hint, err } @@ -119,11 +129,6 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl return nil, hint, fmt.Errorf("unexpected response from latest compressed file: %w", err) } - lm := res.Header.Get("last-modified") - fp.requestTime, err = time.Parse(http.TimeFormat, lm) - if err != nil { - return nil, hint, fmt.Errorf("could not parse last-modified header %s: %w", lm, err) - } z, err := zreader.Reader(res.Body) if err != nil { return nil, hint, err @@ -149,6 +154,10 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl if year < lookBackToYear { continue } + if changed[path.Base(h.Name)] { + // We've already processed this file don't bother appending it to the output + continue + } buf.Grow(int(h.Size)) if _, err := buf.ReadFrom(r); err != nil { return nil, hint, err @@ -176,26 +185,71 @@ func (u *Updater) Fetch(ctx context.Context, hint driver.Fingerprint) (io.ReadCl Msg("finished writing compressed data to spool") } - err = u.processChanges(ctx, cw, fp) + fp.version = updaterVersion + fp.requestTime = time.Now() + success = true + return f, driver.Fingerprint(fp.String()), nil +} + +func (u *Updater) getCompressedFileURL(ctx context.Context) (*url.URL, error) { + latestURI, err := u.url.Parse(latestFile) + if err != nil { + return nil, err + } + latestReq, err := http.NewRequestWithContext(ctx, http.MethodGet, latestURI.String(), nil) + if err != nil { + return nil, err + } + latestRes, err := u.client.Do(latestReq) + if err != nil { + return nil, err + } + defer latestRes.Body.Close() + + err = httputil.CheckResponse(latestRes, http.StatusOK) if err != nil { - return nil, hint, err + return nil, fmt.Errorf("unexpected response from archive_latest.txt: %w", err) } - err = u.processDeletions(ctx, cw, fp) + body, err := io.ReadAll(latestRes.Body) // Fine to use as expecting small number of bytes. if err != nil { - return nil, hint, err + return nil, err } - fp.version = updaterVersion - fp.requestTime = time.Now() - success = true - return f, driver.Fingerprint(fp.String()), nil + compressedFilename := string(body) + compressedURL, err := u.url.Parse(compressedFilename) + if err != nil { + return nil, err + } + return compressedURL, nil +} + +func (u *Updater) getLastModified(ctx context.Context, cu *url.URL) (time.Time, error) { + var empty time.Time + req, err := http.NewRequestWithContext(ctx, http.MethodHead, cu.String(), nil) + if err != nil { + return empty, err + } + + res, err := u.client.Do(req) + if err != nil { + return empty, err + } + defer res.Body.Close() + + err = httputil.CheckResponse(res, http.StatusOK) + if err != nil { + return empty, fmt.Errorf("unexpected HEAD response from latest compressed file: %w", err) + } + + lm := res.Header.Get("last-modified") + return time.Parse(http.TimeFormat, lm) } // ProcessChanges deals with the published changes.csv, adding records // to w means they are deemed to have changed since the compressed // file was last processed. w and fp can be modified. -func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerprint) error { +func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerprint, changed map[string]bool) error { tf, err := tmp.NewFile("", "rhel-vex-changes.") if err != nil { return err @@ -271,6 +325,8 @@ func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerpri continue } + changed[path.Base(cvePath)] = true + advisoryURI, err := u.url.Parse(cvePath) if err != nil { return err @@ -323,7 +379,7 @@ func (u *Updater) processChanges(ctx context.Context, w io.Writer, fp *fingerpri // ProcessDeletions deals with the published deletions.csv, adding records // to w mean they are deemed to have been deleted since the last compressed // file was last processed. w and fp can be modified. -func (u *Updater) processDeletions(ctx context.Context, w io.Writer, fp *fingerprint) error { +func (u *Updater) processDeletions(ctx context.Context, w io.Writer, fp *fingerprint, changed map[string]bool) error { deletionURI, err := u.url.Parse(deletionsFile) if err != nil { return err @@ -375,6 +431,8 @@ func (u *Updater) processDeletions(ctx context.Context, w io.Writer, fp *fingerp if updatedTime.Before(fp.requestTime) { continue } + changed[path.Base(cvePath)] = true + deletedJSON, err := createDeletedJSON(cvePath) if err != nil { zlog.Warn(ctx).Err(err).Msg("error creating JSON object denoting deletion") diff --git a/rhel/vex/fetcher_test.go b/rhel/vex/fetcher_test.go index 71de45ae2..8fd921c17 100644 --- a/rhel/vex/fetcher_test.go +++ b/rhel/vex/fetcher_test.go @@ -16,6 +16,7 @@ import ( "github.com/quay/zlog" "golang.org/x/tools/txtar" + "github.com/quay/claircore/libvuln/driver" "github.com/quay/claircore/toolkit/types/csaf" ) @@ -43,17 +44,20 @@ func serveSecDB(t *testing.T, txtarFile string) (string, *http.Client) { t.Fatal(err) } filename := filepath.Base(relFilepath) - mux.HandleFunc("/"+filename, func(w http.ResponseWriter, _ *http.Request) { + mux.HandleFunc("/"+filename, func(w http.ResponseWriter, r *http.Request) { for k, v := range headers { w.Header().Set(k, v[0]) } - - f, err := os.Open("testdata/" + relFilepath) - if err != nil { - t.Fatal(err) - } - if _, err := io.Copy(w, f); err != nil { - t.Fatal(err) + switch r.Method { + case http.MethodHead: + case http.MethodGet: + f, err := os.Open("testdata/" + relFilepath) + if err != nil { + t.Fatal(err) + } + if _, err := io.Copy(w, f); err != nil { + t.Fatal(err) + } } }) for _, f := range archive.Files { @@ -111,9 +115,12 @@ func TestFactory(t *testing.T) { if f.changesEtag != "something" { t.Errorf("bad etag for the changes.csv endpoint: %s", f.changesEtag) } + if f.deletionsEtag != "somethingelse" { + t.Errorf("bad etag for the deletions.csv endpoint: %s", f.deletionsEtag) + } // Check saved vulns - expectedLnCt := 8 + expectedLnCt := 7 lnCt := 0 r := bufio.NewReader(snappy.NewReader(data)) for b, err := r.ReadBytes('\n'); err == nil; b, err = r.ReadBytes('\n') { @@ -127,7 +134,7 @@ func TestFactory(t *testing.T) { t.Errorf("got %d entries but expected %d", lnCt, expectedLnCt) } - newData, newFP, err := s.Updaters()[0].Fetch(ctx, "") + newData, newFP, err := s.Updaters()[0].Fetch(ctx, driver.Fingerprint(f.String())) if err != nil { t.Fatalf("error re-Fetching, cannot continue: %v", err) } @@ -143,9 +150,9 @@ func TestFactory(t *testing.T) { if f.deletionsEtag != "somethingelse" { t.Errorf("bad etag for the deletions.csv endpoint: %s", f.deletionsEtag) } - buf := &bytes.Buffer{} - sz, _ := newData.Read(buf.Bytes()) - if sz != 0 { - t.Errorf("got too much data: %s", buf.String()) + + r = bufio.NewReader(snappy.NewReader(newData)) + for _, err := r.ReadBytes('\n'); err == nil; _, err = r.ReadBytes('\n') { + t.Fatal("should not have anymore data") } }