diff --git a/Makefile b/Makefile index 8f9ee7c..7c655d0 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ PROJECT_NUMBER=$(shell gcloud projects list --filter="project_id:$(PROJECT_ID)" FUNC_NAME=fetch ENTRY_POINT=Fetch TOPIC_NAME=$(FUNC_NAME)-topic -BUCKET_NAME=$(PROJECT_ID)-fetch +BUCKET_NAME=$(PROJECT_ID)-data-fetch # VERSION=$(shell git rev-parse --short HEAD) VERSION=$(shell git describe --abbrev=0 --tags) diff --git a/fetch.go b/fetch.go index cc954e3..9f0a75c 100644 --- a/fetch.go +++ b/fetch.go @@ -9,6 +9,7 @@ import ( "net/http" nu "net/url" "os" + "strings" "time" "cloud.google.com/go/storage" @@ -28,12 +29,17 @@ func Run(ctx context.Context, event event.Event) error { defer zl.Sync() // Flush log file buffer. for debug in mac local. bucket := getEnv() - url := parseEvent(event) - gcsPath := parseURL(url) - buf := get(url) - - if err := save(ctx, bucket, gcsPath, buf); err != nil { - return err + urls := parseEvent(event) + if urls == nil { + return fmt.Errorf("urls is nil") + } + for i := range urls { + gcsPath := parseURL(urls[i]) + buf := get(urls[i]) + if err := save(ctx, bucket, gcsPath, buf); err != nil { + return err + } + time.Sleep(time.Second) } return nil @@ -78,20 +84,22 @@ func parseURL(s string) string { return url.Host + url.Path } -func parseEvent(event event.Event) string { +func parseEvent(event event.Event) []string { var data pubsub.MessagePublishedData err := json.Unmarshal(event.Data(), &data) if err != nil { zl.Error("UNMARSHAL_ERROR", err) - return "" + return nil } zl.Info("CLOUD_EVENT_RECEIVED", zap.String("cloudEventContext", event.Context.String()), zap.Any("cloudEventData", data), ) - return string(data.Message.Data) + // return string(data.Message.Data) + dataStr := strings.TrimSpace(string(data.Message.Data)) + return strings.Split(dataStr, "\n") } func getEnv() (bucket string) { @@ -135,6 +143,7 @@ func writeBuf(ctx context.Context, client *storage.Client, bucket, object string // writer close fields := []zap.Field{ + zl.Console(object), zap.String("bucket", bucket), zap.String("object", object), } diff --git a/fetch_test.go b/fetch_test.go index bacc865..c55ca2c 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -11,56 +11,101 @@ import ( "github.com/googleapis/google-cloudevents-go/cloud/pubsub/v1" fetch "github.com/nkmr-jp/gcf-fetch" "github.com/stretchr/testify/assert" + "google.golang.org/api/iterator" ) func TestRun(t *testing.T) { - test := NewTestFetch() - test.setup(t) - objPath := "api.github.com/users/github" - ctx := context.Background() - client, _ := storage.NewClient(ctx) + test := NewTestFetch(t) - // Get generation before send pubsub message. - var preGeneration int64 - rc, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) - defer rc.Close() // nolint - if err == nil { - preGeneration = rc.Attrs.Generation - } + t.Run("single url", func(t *testing.T) { + objPath := "api.github.com/users/github" + pubsubData := "https://api.github.com/users/github" + ctx := context.Background() + test.deleteObjects(ctx, "api.github.com") - // Send pubsub message - if err := fetch.Run(ctx, test.event); err != nil { - assert.Fail(t, err.Error()) - } + // Send pubsub message1 + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } + reader1 := test.getObject(ctx, objPath) - // Get generation after send pubsub message. - rc2, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) - defer rc.Close() // nolint - if err != nil { - assert.Fail(t, err.Error()) - } + // Send pubsub message2 + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } + reader2 := test.getObject(ctx, objPath) + // Get generation after send pubsub message. + assert.NotEqual(t, reader1.Attrs.Generation, reader2.Attrs.Generation) + }) + + t.Run("multi url", func(t *testing.T) { + pubsubData := ` +https://api.github.com/users/github +https://api.github.com/users/github/followers +` + ctx := context.Background() + test.deleteObjects(ctx, "api.github.com") + if err := fetch.Run(ctx, test.event(pubsubData)); err != nil { + assert.Fail(t, err.Error()) + } - assert.NotEqual(t, preGeneration, rc2.Attrs.Generation) + assert.NotNilf(t, test.getObject(ctx, "api.github.com/users/github"), "reader1") + assert.NotNilf(t, test.getObject(ctx, "api.github.com/users/github/followers"), "reader2") + }) } type TestFetch struct { - event event.Event + t *testing.T +} + +func NewTestFetch(t *testing.T) *TestFetch { + return &TestFetch{t} } -func NewTestFetch() *TestFetch { - return &TestFetch{} +func (f *TestFetch) deleteObjects(ctx context.Context, objPath string) { + client, _ := storage.NewClient(ctx) + query := storage.Query{Prefix: objPath, Versions: true} + bucket := client.Bucket(os.Getenv("BUCKET_NAME")) + it := bucket.Objects(ctx, &query) + for { + objAttrs, err := it.Next() + if err != nil && err != iterator.Done { + assert.Fail(f.t, err.Error()) + } + if err == iterator.Done { + break + } + if err := bucket.Object(objAttrs.Name).Generation(objAttrs.Generation).Delete(ctx); err != nil { + assert.Fail(f.t, err.Error()) + } + } +} + +func (f *TestFetch) getObject(ctx context.Context, objPath string) *storage.Reader { + client, _ := storage.NewClient(ctx) + reader, err := client.Bucket(os.Getenv("BUCKET_NAME")).Object(objPath).NewReader(ctx) + defer func(rc *storage.Reader) { + err := rc.Close() + if err != nil { + assert.Fail(f.t, err.Error()) + } + }(reader) + if err != nil { + assert.Fail(f.t, err.Error()) + } + return reader } -func (f *TestFetch) setup(t *testing.T) { +func (f *TestFetch) event(data string) event.Event { msg := pubsub.MessagePublishedData{ Message: &pubsub.Message{ - Data: []byte("https://api.github.com/users/github"), + Data: []byte(data), }, } - - f.event = event.New() - f.event.SetDataContentType("application/json") - if err := f.event.SetData(f.event.DataContentType(), msg); err != nil { - assert.Fail(t, err.Error()) + e := event.New() + e.SetDataContentType("application/json") + if err := e.SetData(e.DataContentType(), msg); err != nil { + assert.Fail(f.t, err.Error()) } + return e }