From 710434779a632e96fc8e77deebb6be377ac97a32 Mon Sep 17 00:00:00 2001 From: nkmr-jp Date: Fri, 22 Jul 2022 11:32:27 +0900 Subject: [PATCH 1/3] :art: Update test --- Makefile | 2 +- fetch_test.go | 83 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index 8f9ee7c..7c655d0 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ PROJECT_NUMBER=$(shell gcloud projects list --filter="project_id:$(PROJECT_ID)" FUNC_NAME=fetch ENTRY_POINT=Fetch TOPIC_NAME=$(FUNC_NAME)-topic -BUCKET_NAME=$(PROJECT_ID)-fetch +BUCKET_NAME=$(PROJECT_ID)-data-fetch # VERSION=$(shell git rev-parse --short HEAD) VERSION=$(shell git describe --abbrev=0 --tags) diff --git a/fetch_test.go b/fetch_test.go index bacc865..aed205d 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -14,53 +14,70 @@ import ( ) func TestRun(t *testing.T) { - test := NewTestFetch() - test.setup(t) - objPath := "api.github.com/users/github" - ctx := context.Background() - client, _ := storage.NewClient(ctx) + test := NewTestFetch(t) - // Get generation before send pubsub message. - var preGeneration int64 - rc, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) - defer rc.Close() // nolint - if err == nil { - preGeneration = rc.Attrs.Generation - } + t.Run("single url", func(t *testing.T) { + objPath := "api.github.com/users/github" + pubsubData := "https://" + objPath + ctx := context.Background() - // Send pubsub message - if err := fetch.Run(ctx, test.event); err != nil { - assert.Fail(t, err.Error()) - } + // Send pubsub message1 + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } + reader1 := test.getObject(ctx, objPath) - // Get generation after send pubsub message. - rc2, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) - defer rc.Close() // nolint - if err != nil { - assert.Fail(t, err.Error()) - } + // Send pubsub message2 + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } + reader2 := test.getObject(ctx, objPath) + + // Get generation after send pubsub message. + assert.NotEqual(t, reader1.Attrs.Generation, reader2.Attrs.Generation) + }) - assert.NotEqual(t, preGeneration, rc2.Attrs.Generation) + t.Run("multi url", func(t *testing.T) { + t.Skip() + // if got := add(tt.args.a, tt.args.b); got != tt.want { + // t.Errorf("add() = %v, want %v", got, tt.want) + // } + }) } type TestFetch struct { - event event.Event + t *testing.T } -func NewTestFetch() *TestFetch { - return &TestFetch{} +func NewTestFetch(t *testing.T) *TestFetch { + return &TestFetch{t} } -func (f *TestFetch) setup(t *testing.T) { +func (f *TestFetch) getObject(ctx context.Context, objPath string) *storage.Reader { + client, _ := storage.NewClient(ctx) + reader, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) + defer func(rc *storage.Reader) { + err := rc.Close() + if err != nil { + assert.Fail(f.t, err.Error()) + } + }(reader) + if err != nil { + assert.Fail(f.t, err.Error()) + } + return reader +} + +func (f *TestFetch) event(data string) event.Event { msg := pubsub.MessagePublishedData{ Message: &pubsub.Message{ - Data: []byte("https://api.github.com/users/github"), + Data: []byte(data), }, } - - f.event = event.New() - f.event.SetDataContentType("application/json") - if err := f.event.SetData(f.event.DataContentType(), msg); err != nil { - assert.Fail(t, err.Error()) + e := event.New() + e.SetDataContentType("application/json") + if err := e.SetData(e.DataContentType(), msg); err != nil { + assert.Fail(f.t, err.Error()) } + return e } From 55b8fe01ef31f160198dc4c5e21b4d6a56b89ec3 Mon Sep 17 00:00:00 2001 From: nkmr-jp Date: Fri, 22 Jul 2022 12:26:14 +0900 Subject: [PATCH 2/3] :white_check_mark: Add deleteObjects to test --- fetch_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/fetch_test.go b/fetch_test.go index aed205d..15f26c7 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -10,7 +10,9 @@ import ( "github.com/cloudevents/sdk-go/v2/event" "github.com/googleapis/google-cloudevents-go/cloud/pubsub/v1" fetch "github.com/nkmr-jp/gcf-fetch" + "github.com/nkmr-jp/zl" "github.com/stretchr/testify/assert" + "google.golang.org/api/iterator" ) func TestRun(t *testing.T) { @@ -18,8 +20,9 @@ func TestRun(t *testing.T) { t.Run("single url", func(t *testing.T) { objPath := "api.github.com/users/github" - pubsubData := "https://" + objPath + pubsubData := "https://api.github.com/users/github" ctx := context.Background() + test.deleteObjects(ctx, "api.github.com") // Send pubsub message1 if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { @@ -32,16 +35,34 @@ func TestRun(t *testing.T) { assert.Fail(t, err.Error()) } reader2 := test.getObject(ctx, objPath) - // Get generation after send pubsub message. assert.NotEqual(t, reader1.Attrs.Generation, reader2.Attrs.Generation) }) t.Run("multi url", func(t *testing.T) { t.Skip() - // if got := add(tt.args.a, tt.args.b); got != tt.want { - // t.Errorf("add() = %v, want %v", got, tt.want) - // } + objPath := ` +api.github.com/users/github +api.github.com/users/github/followers +` + pubsubData := ` +https://api.github.com/users/github +https://api.github.com/users/github/followers +` + ctx := context.Background() + + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } + reader1 := test.getObject(ctx, objPath) + + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } + reader2 := test.getObject(ctx, objPath) + + // Get generation after send pubsub message. + assert.NotEqual(t, reader1.Attrs.Generation, reader2.Attrs.Generation) }) } @@ -53,6 +74,26 @@ func NewTestFetch(t *testing.T) *TestFetch { return &TestFetch{t} } +func (f *TestFetch) deleteObjects(ctx context.Context, objPath string) { + client, _ := storage.NewClient(ctx) + query := storage.Query{Prefix: objPath, Versions: true} + bucket := client.Bucket(os.Getenv("BUCKET_NAME")) + it := bucket.Objects(ctx, &query) + for { + objAttrs, err := it.Next() + if err != nil && err != iterator.Done { + assert.Fail(f.t, err.Error()) + } + if err == iterator.Done { + break + } + if err := bucket.Object(objAttrs.Name).Generation(objAttrs.Generation).Delete(ctx); err != nil { + assert.Fail(f.t, err.Error()) + } + zl.Dump(objAttrs) + } +} + func (f *TestFetch) getObject(ctx context.Context, objPath string) *storage.Reader { client, _ := storage.NewClient(ctx) reader, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) From 5a45b05b8adbe508bcd8e29184e829f5a4ba7126 Mon Sep 17 00:00:00 2001 From: nkmr-jp Date: Fri, 22 Jul 2022 14:51:07 +0900 Subject: [PATCH 3/3] :sparkles: Add multi urls support --- fetch.go | 27 ++++++++++++++++++--------- fetch_test.go | 19 +++---------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/fetch.go b/fetch.go index cc954e3..9f0a75c 100644 --- a/fetch.go +++ b/fetch.go @@ -9,6 +9,7 @@ import ( "net/http" nu "net/url" "os" + "strings" "time" "cloud.google.com/go/storage" @@ -28,12 +29,17 @@ func Run(ctx context.Context, event event.Event) error { defer zl.Sync() // Flush log file buffer. for debug in mac local. bucket := getEnv() - url := parseEvent(event) - gcsPath := parseURL(url) - buf := get(url) - - if err := save(ctx, bucket, gcsPath, buf); err != nil { - return err + urls := parseEvent(event) + if urls == nil { + return fmt.Errorf("urls is nil") + } + for i := range urls { + gcsPath := parseURL(urls[i]) + buf := get(urls[i]) + if err := save(ctx, bucket, gcsPath, buf); err != nil { + return err + } + time.Sleep(time.Second) } return nil @@ -78,20 +84,22 @@ func parseURL(s string) string { return url.Host + url.Path } -func parseEvent(event event.Event) string { +func parseEvent(event event.Event) []string { var data pubsub.MessagePublishedData err := json.Unmarshal(event.Data(), &data) if err != nil { zl.Error("UNMARSHAL_ERROR", err) - return "" + return nil } zl.Info("CLOUD_EVENT_RECEIVED", zap.String("cloudEventContext", event.Context.String()), zap.Any("cloudEventData", data), ) - return string(data.Message.Data) + // return string(data.Message.Data) + dataStr := strings.TrimSpace(string(data.Message.Data)) + return strings.Split(dataStr, "\n") } func getEnv() (bucket string) { @@ -135,6 +143,7 @@ func writeBuf(ctx context.Context, client *storage.Client, bucket, object string // writer close fields := []zap.Field{ + zl.Console(object), zap.String("bucket", bucket), zap.String("object", object), } diff --git a/fetch_test.go b/fetch_test.go index 15f26c7..c55ca2c 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -10,7 +10,6 @@ import ( "github.com/cloudevents/sdk-go/v2/event" "github.com/googleapis/google-cloudevents-go/cloud/pubsub/v1" fetch "github.com/nkmr-jp/gcf-fetch" - "github.com/nkmr-jp/zl" "github.com/stretchr/testify/assert" "google.golang.org/api/iterator" ) @@ -40,29 +39,18 @@ func TestRun(t *testing.T) { }) t.Run("multi url", func(t *testing.T) { - t.Skip() - objPath := ` -api.github.com/users/github -api.github.com/users/github/followers -` pubsubData := ` https://api.github.com/users/github https://api.github.com/users/github/followers ` ctx := context.Background() - - if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { - assert.Fail(t, err.Error()) - } - reader1 := test.getObject(ctx, objPath) - + test.deleteObjects(ctx, "api.github.com") if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { assert.Fail(t, err.Error()) } - reader2 := test.getObject(ctx, objPath) - // Get generation after send pubsub message. - assert.NotEqual(t, reader1.Attrs.Generation, reader2.Attrs.Generation) + assert.NotNilf(t, test.getObject(ctx, "api.github.com/users/github"), "reader1") + assert.NotNilf(t, test.getObject(ctx, "api.github.com/users/github/followers"), "reader2") }) } @@ -90,7 +78,6 @@ func (f *TestFetch) deleteObjects(ctx context.Context, objPath string) { if err := bucket.Object(objAttrs.Name).Generation(objAttrs.Generation).Delete(ctx); err != nil { assert.Fail(f.t, err.Error()) } - zl.Dump(objAttrs) } }