From f4a2a597a0cb8d019bfa114550c5240757483cfb Mon Sep 17 00:00:00 2001 From: Ronan Amsterdam Date: Fri, 28 Dec 2018 16:44:06 +0800 Subject: [PATCH 1/2] response header trailers added plus GRPC_HEADER_METADATA_KEY_ID to config Grpc-Metadata headers key --- http/handlers.go | 6 +++++- http/server.go | 9 ++++----- metadata/metadata.go | 7 +++++-- proxy/proxy.go | 38 ++++++++++++++++++++------------------ proxy/stub/stub.go | 34 +++++++++++++++++++--------------- utils/utils.go | 19 +++++++++++++++++++ 6 files changed, 72 insertions(+), 41 deletions(-) create mode 100644 utils/utils.go diff --git a/http/handlers.go b/http/handlers.go index d889e2e..51977ee 100644 --- a/http/handlers.go +++ b/http/handlers.go @@ -86,7 +86,7 @@ func (s *Server) RPCCallHandler(newClient func() Client) http.HandlerFunc { w.WriteHeader(http.StatusInternalServerError) return } - response, err := client.Call(ctx, c.Service, c.Method, inputMessage, &md) + response, responseTrailer, err := client.Call(ctx, c.Service, c.Method, inputMessage, &md) if err != nil { returnError(w, errors.Cause(err).(perrors.Error)) s.logger.Error("error in handling call", @@ -95,6 +95,10 @@ func (s *Server) RPCCallHandler(newClient func() Client) http.HandlerFunc { } w.Header().Set("Content-Type", "application/json") + for key, val := range responseTrailer { + w.Header().Set(key, val[0]) + } + w.WriteHeader(http.StatusOK) w.Write(response) return diff --git a/http/server.go b/http/server.go index 8c7ff57..99eabc8 100644 --- a/http/server.go +++ b/http/server.go @@ -2,13 +2,12 @@ package http import ( "context" + "github.com/mercari/grpc-http-proxy/metadata" + "go.uber.org/zap" + grpc_metadata "google.golang.org/grpc/metadata" "net" "net/http" "net/url" - - "go.uber.org/zap" - - "github.com/mercari/grpc-http-proxy/metadata" ) // Server is an grpc-http-proxy server @@ -56,7 +55,7 @@ type Client interface { string, []byte, *metadata.Metadata, - ) ([]byte, error) + ) ([]byte, grpc_metadata.MD, error) } // Discoverer performs service discover diff --git a/metadata/metadata.go b/metadata/metadata.go index 0e6535f..9bfde83 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -1,9 +1,12 @@ package metadata -import "strings" +import ( + "github.com/mercari/grpc-http-proxy/utils" + "strings" +) // This is from an old grpc-gateway (https://github.com/grpc-ecosystem/grpc-gateway) specification -const metadataHeaderPrefix = "Grpc-Metadata-" +var metadataHeaderPrefix = utils.GetEnvVar( "GRPC_HEADER_METADATA_KEY_ID", "Grpc-Metadata-") // Metadata is gRPC metadata sent to and from upstream type Metadata map[string][]string diff --git a/proxy/proxy.go b/proxy/proxy.go index 55e691b..18b1d09 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -2,22 +2,21 @@ package proxy import ( "context" - "net/url" - "github.com/jhump/protoreflect/dynamic/grpcdynamic" "github.com/jhump/protoreflect/grpcreflect" - "github.com/pkg/errors" - "google.golang.org/grpc" - rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" - "github.com/mercari/grpc-http-proxy/metadata" "github.com/mercari/grpc-http-proxy/proxy/reflection" pstub "github.com/mercari/grpc-http-proxy/proxy/stub" + "github.com/pkg/errors" + "google.golang.org/grpc" + grpc_metadata "google.golang.org/grpc/metadata" + rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + "net/url" ) // Proxy is a dynamic gRPC client that performs reflection type Proxy struct { - cc *grpc.ClientConn + conn *grpc.ClientConn reflector reflection.Reflector stub pstub.Stub } @@ -29,20 +28,20 @@ func NewProxy() *Proxy { // Connect opens a connection to target. func (p *Proxy) Connect(ctx context.Context, target *url.URL) error { - cc, err := grpc.DialContext(ctx, target.String(), grpc.WithInsecure()) + conn, err := grpc.DialContext(ctx, target.String(), grpc.WithInsecure()) if err != nil { return err } - p.cc = cc - rc := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(p.cc)) + p.conn = conn + rc := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(p.conn)) p.reflector = reflection.NewReflector(rc) - p.stub = pstub.NewStub(grpcdynamic.NewStub(p.cc)) + p.stub = pstub.NewStub(grpcdynamic.NewStub(p.conn)) return err } // CloseConn closes the underlying connection func (p *Proxy) CloseConn() error { - return p.cc.Close() + return p.conn.Close() } // Call performs the gRPC call after doing reflection to obtain type information @@ -50,19 +49,22 @@ func (p *Proxy) Call(ctx context.Context, serviceName, methodName string, message []byte, md *metadata.Metadata, -) ([]byte, error) { +) ([]byte, grpc_metadata.MD, error) { invocation, err := p.reflector.CreateInvocation(ctx, serviceName, methodName, message) if err != nil { - return nil, err + return nil, nil, err } - outputMsg, err := p.stub.InvokeRPC(ctx, invocation, md) + outputMsg, responseTrailer, err := p.stub.InvokeRPC(ctx, invocation, md) + if err != nil { - return nil, err + return nil, nil, err } + m, err := outputMsg.MarshalJSON() + if err != nil { - return nil, errors.Wrap(err, "failed to marshal output JSON") + return nil, nil, errors.Wrap(err, "failed to marshal output JSON") } - return m, err + return m, responseTrailer, err } diff --git a/proxy/stub/stub.go b/proxy/stub/stub.go index 34578ae..d3af0f1 100644 --- a/proxy/stub/stub.go +++ b/proxy/stub/stub.go @@ -3,17 +3,15 @@ package stub import ( "context" "fmt" - "github.com/golang/protobuf/proto" "github.com/jhump/protoreflect/desc" + "github.com/mercari/grpc-http-proxy/errors" + "github.com/mercari/grpc-http-proxy/metadata" + "github.com/mercari/grpc-http-proxy/proxy/reflection" "google.golang.org/grpc" "google.golang.org/grpc/codes" grpc_metadata "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - - "github.com/mercari/grpc-http-proxy/errors" - "github.com/mercari/grpc-http-proxy/metadata" - "github.com/mercari/grpc-http-proxy/proxy/reflection" ) // Stub performs gRPC calls based on descriptors obtained through reflection @@ -23,7 +21,7 @@ type Stub interface { InvokeRPC( ctx context.Context, invocation *reflection.MethodInvocation, - md *metadata.Metadata) (reflection.Message, error) + md *metadata.Metadata) (reflection.Message, grpc_metadata.MD, error) } type stubImpl struct { @@ -45,37 +43,43 @@ func NewStub(s grpcdynamicStub) Stub { func (s *stubImpl) InvokeRPC( ctx context.Context, invocation *reflection.MethodInvocation, - md *metadata.Metadata) (reflection.Message, error) { + md *metadata.Metadata) (reflection.Message, grpc_metadata.MD, error) { + + var responseTrailer grpc_metadata.MD // variable to store responseTrailer - o, err := s.stub.InvokeRpc(ctx, + message, err := s.stub.InvokeRpc(ctx, invocation.MethodDescriptor.AsProtoreflectDescriptor(), invocation.Message.AsProtoreflectMessage(), - grpc.Header((*grpc_metadata.MD)(md))) + grpc.Header((*grpc_metadata.MD)(md)), + grpc.Trailer(&responseTrailer)) + if err != nil { stat := status.Convert(err) + if err != nil && stat.Code() == codes.Unavailable { - return nil, &errors.ProxyError{ + return nil, nil, &errors.ProxyError{ Code: errors.UpstreamConnFailure, Message: fmt.Sprintf("could not connect to backend"), } } // When InvokeRPC returns an error, it should always be a gRPC error, so this should not panic - return nil, &errors.GRPCError{ + return nil, nil, &errors.GRPCError{ StatusCode: int(stat.Code()), Message: stat.Message(), Details: stat.Proto().Details, } } - outputMsg := invocation.MethodDescriptor.GetOutputType().NewMessage() - err = outputMsg.ConvertFrom(o) + + outputMsg := invocation.MethodDescriptor.GetOutputType().NewMessage() + err = outputMsg.ConvertFrom(message) if err != nil { - return nil, &errors.ProxyError{ + return nil, nil, &errors.ProxyError{ Code: errors.Unknown, Message: "response from backend could not be converted internally; this is a bug", } } - return outputMsg, nil + return outputMsg, responseTrailer, nil } diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..d267731 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,19 @@ +/* +* created by Roman Zhyliov +*/ + +package utils + +import ( + "os" +) + +func GetEnvVar(key, fallback string) string { + + returnVal := fallback + + if value, ok := os.LookupEnv(key); ok { + returnVal = value + } + return returnVal +} \ No newline at end of file From 0aa873bd2e38f18563fe875a6e7c3a363466fb0a Mon Sep 17 00:00:00 2001 From: Ronan Amsterdam Date: Sat, 29 Dec 2018 16:16:08 +0800 Subject: [PATCH 2/2] failing tests fixed --- http/handlers_test.go | 10 +++++----- proxy/proxy_test.go | 6 +++--- proxy/stub/stub_test.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/http/handlers_test.go b/http/handlers_test.go index 4c4577d..61c1126 100644 --- a/http/handlers_test.go +++ b/http/handlers_test.go @@ -3,14 +3,14 @@ package http import ( "context" "fmt" + "github.com/mercari/grpc-http-proxy/log" + "github.com/mercari/grpc-http-proxy/metadata" + grpc_metadata "google.golang.org/grpc/metadata" "net/http" "net/http/httptest" "net/url" "strings" "testing" - - "github.com/mercari/grpc-http-proxy/log" - "github.com/mercari/grpc-http-proxy/metadata" ) type fakeDiscoverer struct { @@ -67,12 +67,12 @@ func (c *fakeClient) Call(ctx context.Context, serviceName, methodName string, message []byte, md *metadata.Metadata, -) ([]byte, error) { +) ([]byte, grpc_metadata.MD, error) { response := fmt.Sprintf("{\"serviceVersion\":\"%s\",\"service\":\"%s\",\"method\":\"%s\"}\n", c.version, c.service, methodName) - return []byte(response), nil + return []byte(response), nil, nil } func TestServer_LivenessProbeHandler(t *testing.T) { diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 6075d05..b285362 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -35,7 +35,7 @@ func TestProxy_Call(t *testing.T) { sd := reflection.ServiceDescriptorFromFileDescriptor(fd, proxytest.TestService) p.reflector = reflection.NewReflector(&proxytest.FakeGrpcreflectClient{ServiceDescriptor: sd.ServiceDescriptor}) - _, err := p.Call(ctx, proxytest.TestService, proxytest.EmptyCall, []byte("{}"), &md) + _, _, err := p.Call(ctx, proxytest.TestService, proxytest.EmptyCall, []byte("{}"), &md) if err != nil { t.Fatalf("err should be nil, got %s", err.Error()) } @@ -49,7 +49,7 @@ func TestProxy_Call(t *testing.T) { p.stub = pstub.NewStub(&proxytest.FakeGrpcdynamicStub{}) p.reflector = reflection.NewReflector(&proxytest.FakeGrpcreflectClient{}) - _, err := p.Call(ctx, proxytest.NotFoundService, proxytest.EmptyCall, []byte("{}"), &md) + _, _, err := p.Call(ctx, proxytest.NotFoundService, proxytest.EmptyCall, []byte("{}"), &md) if err == nil { t.Fatalf("err should be not nil") } @@ -65,7 +65,7 @@ func TestProxy_Call(t *testing.T) { sd := reflection.ServiceDescriptorFromFileDescriptor(fd, proxytest.TestService) p.reflector = reflection.NewReflector(&proxytest.FakeGrpcreflectClient{ServiceDescriptor: sd.ServiceDescriptor}) - _, err := p.Call(ctx, proxytest.TestService, proxytest.UnaryCall, []byte("{}"), &md) + _, _, err := p.Call(ctx, proxytest.TestService, proxytest.UnaryCall, []byte("{}"), &md) if err == nil { t.Fatalf("err should be not nil") } diff --git a/proxy/stub/stub_test.go b/proxy/stub/stub_test.go index 547b3f0..0ca2e9c 100644 --- a/proxy/stub/stub_test.go +++ b/proxy/stub/stub_test.go @@ -70,7 +70,7 @@ func TestStub_InvokeRPC(t *testing.T) { MethodDescriptor: methodDesc, Message: inputMsg, } - outputMsg, err := stub.InvokeRPC(ctx, invocation, (*metadata.Metadata)(&map[string][]string{})) + outputMsg, _, err := stub.InvokeRPC(ctx, invocation, (*metadata.Metadata)(&map[string][]string{})) if err != nil { switch v := err.(type) { case *errors.ProxyError: