Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌟feat(responseTrailers): support for response header trailers added #7

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion http/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Server) RPCCallHandler(newClient func() Client) http.HandlerFunc {
w.WriteHeader(http.StatusInternalServerError)
return
}
response, err := client.Call(ctx, c.Service, c.Method, inputMessage, &md)
response, responseTrailer, err := client.Call(ctx, c.Service, c.Method, inputMessage, &md)
if err != nil {
returnError(w, errors.Cause(err).(perrors.Error))
s.logger.Error("error in handling call",
Expand All @@ -95,6 +95,10 @@ func (s *Server) RPCCallHandler(newClient func() Client) http.HandlerFunc {
}

w.Header().Set("Content-Type", "application/json")
for key, val := range responseTrailer {
w.Header().Set(key, val[0])
}

w.WriteHeader(http.StatusOK)
w.Write(response)
return
Expand Down
10 changes: 5 additions & 5 deletions http/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package http
import (
"context"
"fmt"
"github.com/mercari/grpc-http-proxy/log"
"github.com/mercari/grpc-http-proxy/metadata"
grpc_metadata "google.golang.org/grpc/metadata"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"

"github.com/mercari/grpc-http-proxy/log"
"github.com/mercari/grpc-http-proxy/metadata"
)

type fakeDiscoverer struct {
Expand Down Expand Up @@ -67,12 +67,12 @@ func (c *fakeClient) Call(ctx context.Context,
serviceName, methodName string,
message []byte,
md *metadata.Metadata,
) ([]byte, error) {
) ([]byte, grpc_metadata.MD, error) {
response := fmt.Sprintf("{\"serviceVersion\":\"%s\",\"service\":\"%s\",\"method\":\"%s\"}\n",
c.version,
c.service,
methodName)
return []byte(response), nil
return []byte(response), nil, nil
}

func TestServer_LivenessProbeHandler(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package http

import (
"context"
"github.com/mercari/grpc-http-proxy/metadata"
"go.uber.org/zap"
grpc_metadata "google.golang.org/grpc/metadata"
"net"
"net/http"
"net/url"

"go.uber.org/zap"

"github.com/mercari/grpc-http-proxy/metadata"
)

// Server is an grpc-http-proxy server
Expand Down Expand Up @@ -56,7 +55,7 @@ type Client interface {
string,
[]byte,
*metadata.Metadata,
) ([]byte, error)
) ([]byte, grpc_metadata.MD, error)
}

// Discoverer performs service discover
Expand Down
7 changes: 5 additions & 2 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package metadata

import "strings"
import (
"github.com/mercari/grpc-http-proxy/utils"
"strings"
)

// This is from an old grpc-gateway (https://github.com/grpc-ecosystem/grpc-gateway) specification
const metadataHeaderPrefix = "Grpc-Metadata-"
var metadataHeaderPrefix = utils.GetEnvVar( "GRPC_HEADER_METADATA_KEY_ID", "Grpc-Metadata-")

// Metadata is gRPC metadata sent to and from upstream
type Metadata map[string][]string
Expand Down
38 changes: 20 additions & 18 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ package proxy

import (
"context"
"net/url"

"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/pkg/errors"
"google.golang.org/grpc"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

"github.com/mercari/grpc-http-proxy/metadata"
"github.com/mercari/grpc-http-proxy/proxy/reflection"
pstub "github.com/mercari/grpc-http-proxy/proxy/stub"
"github.com/pkg/errors"
"google.golang.org/grpc"
grpc_metadata "google.golang.org/grpc/metadata"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"net/url"
)

// Proxy is a dynamic gRPC client that performs reflection
type Proxy struct {
cc *grpc.ClientConn
conn *grpc.ClientConn
reflector reflection.Reflector
stub pstub.Stub
}
Expand All @@ -29,40 +28,43 @@ func NewProxy() *Proxy {

// Connect opens a connection to target.
func (p *Proxy) Connect(ctx context.Context, target *url.URL) error {
cc, err := grpc.DialContext(ctx, target.String(), grpc.WithInsecure())
conn, err := grpc.DialContext(ctx, target.String(), grpc.WithInsecure())
if err != nil {
return err
}
p.cc = cc
rc := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(p.cc))
p.conn = conn
rc := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(p.conn))
p.reflector = reflection.NewReflector(rc)
p.stub = pstub.NewStub(grpcdynamic.NewStub(p.cc))
p.stub = pstub.NewStub(grpcdynamic.NewStub(p.conn))
return err
}

// CloseConn closes the underlying connection
func (p *Proxy) CloseConn() error {
return p.cc.Close()
return p.conn.Close()
}

// Call performs the gRPC call after doing reflection to obtain type information
func (p *Proxy) Call(ctx context.Context,
serviceName, methodName string,
message []byte,
md *metadata.Metadata,
) ([]byte, error) {
) ([]byte, grpc_metadata.MD, error) {
invocation, err := p.reflector.CreateInvocation(ctx, serviceName, methodName, message)
if err != nil {
return nil, err
return nil, nil, err
}

outputMsg, err := p.stub.InvokeRPC(ctx, invocation, md)
outputMsg, responseTrailer, err := p.stub.InvokeRPC(ctx, invocation, md)

if err != nil {
return nil, err
return nil, nil, err
}

m, err := outputMsg.MarshalJSON()

if err != nil {
return nil, errors.Wrap(err, "failed to marshal output JSON")
return nil, nil, errors.Wrap(err, "failed to marshal output JSON")
}
return m, err
return m, responseTrailer, err
}
6 changes: 3 additions & 3 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestProxy_Call(t *testing.T) {
sd := reflection.ServiceDescriptorFromFileDescriptor(fd, proxytest.TestService)
p.reflector = reflection.NewReflector(&proxytest.FakeGrpcreflectClient{ServiceDescriptor: sd.ServiceDescriptor})

_, err := p.Call(ctx, proxytest.TestService, proxytest.EmptyCall, []byte("{}"), &md)
_, _, err := p.Call(ctx, proxytest.TestService, proxytest.EmptyCall, []byte("{}"), &md)
if err != nil {
t.Fatalf("err should be nil, got %s", err.Error())
}
Expand All @@ -49,7 +49,7 @@ func TestProxy_Call(t *testing.T) {
p.stub = pstub.NewStub(&proxytest.FakeGrpcdynamicStub{})
p.reflector = reflection.NewReflector(&proxytest.FakeGrpcreflectClient{})

_, err := p.Call(ctx, proxytest.NotFoundService, proxytest.EmptyCall, []byte("{}"), &md)
_, _, err := p.Call(ctx, proxytest.NotFoundService, proxytest.EmptyCall, []byte("{}"), &md)
if err == nil {
t.Fatalf("err should be not nil")
}
Expand All @@ -65,7 +65,7 @@ func TestProxy_Call(t *testing.T) {
sd := reflection.ServiceDescriptorFromFileDescriptor(fd, proxytest.TestService)
p.reflector = reflection.NewReflector(&proxytest.FakeGrpcreflectClient{ServiceDescriptor: sd.ServiceDescriptor})

_, err := p.Call(ctx, proxytest.TestService, proxytest.UnaryCall, []byte("{}"), &md)
_, _, err := p.Call(ctx, proxytest.TestService, proxytest.UnaryCall, []byte("{}"), &md)
if err == nil {
t.Fatalf("err should be not nil")
}
Expand Down
34 changes: 19 additions & 15 deletions proxy/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ package stub
import (
"context"
"fmt"

"github.com/golang/protobuf/proto"
"github.com/jhump/protoreflect/desc"
"github.com/mercari/grpc-http-proxy/errors"
"github.com/mercari/grpc-http-proxy/metadata"
"github.com/mercari/grpc-http-proxy/proxy/reflection"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpc_metadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/mercari/grpc-http-proxy/errors"
"github.com/mercari/grpc-http-proxy/metadata"
"github.com/mercari/grpc-http-proxy/proxy/reflection"
)

// Stub performs gRPC calls based on descriptors obtained through reflection
Expand All @@ -23,7 +21,7 @@ type Stub interface {
InvokeRPC(
ctx context.Context,
invocation *reflection.MethodInvocation,
md *metadata.Metadata) (reflection.Message, error)
md *metadata.Metadata) (reflection.Message, grpc_metadata.MD, error)
}

type stubImpl struct {
Expand All @@ -45,37 +43,43 @@ func NewStub(s grpcdynamicStub) Stub {
func (s *stubImpl) InvokeRPC(
ctx context.Context,
invocation *reflection.MethodInvocation,
md *metadata.Metadata) (reflection.Message, error) {
md *metadata.Metadata) (reflection.Message, grpc_metadata.MD, error) {

var responseTrailer grpc_metadata.MD // variable to store responseTrailer

o, err := s.stub.InvokeRpc(ctx,
message, err := s.stub.InvokeRpc(ctx,
invocation.MethodDescriptor.AsProtoreflectDescriptor(),
invocation.Message.AsProtoreflectMessage(),
grpc.Header((*grpc_metadata.MD)(md)))
grpc.Header((*grpc_metadata.MD)(md)),
grpc.Trailer(&responseTrailer))

if err != nil {
stat := status.Convert(err)

if err != nil && stat.Code() == codes.Unavailable {
return nil, &errors.ProxyError{
return nil, nil, &errors.ProxyError{
Code: errors.UpstreamConnFailure,
Message: fmt.Sprintf("could not connect to backend"),
}
}

// When InvokeRPC returns an error, it should always be a gRPC error, so this should not panic
return nil, &errors.GRPCError{
return nil, nil, &errors.GRPCError{
StatusCode: int(stat.Code()),
Message: stat.Message(),
Details: stat.Proto().Details,
}
}
outputMsg := invocation.MethodDescriptor.GetOutputType().NewMessage()
err = outputMsg.ConvertFrom(o)

outputMsg := invocation.MethodDescriptor.GetOutputType().NewMessage()
err = outputMsg.ConvertFrom(message)

if err != nil {
return nil, &errors.ProxyError{
return nil, nil, &errors.ProxyError{
Code: errors.Unknown,
Message: "response from backend could not be converted internally; this is a bug",
}
}

return outputMsg, nil
return outputMsg, responseTrailer, nil
}
2 changes: 1 addition & 1 deletion proxy/stub/stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestStub_InvokeRPC(t *testing.T) {
MethodDescriptor: methodDesc,
Message: inputMsg,
}
outputMsg, err := stub.InvokeRPC(ctx, invocation, (*metadata.Metadata)(&map[string][]string{}))
outputMsg, _, err := stub.InvokeRPC(ctx, invocation, (*metadata.Metadata)(&map[string][]string{}))
if err != nil {
switch v := err.(type) {
case *errors.ProxyError:
Expand Down
19 changes: 19 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* created by Roman Zhyliov <[email protected]>
*/

package utils

import (
"os"
)

func GetEnvVar(key, fallback string) string {

returnVal := fallback

if value, ok := os.LookupEnv(key); ok {
returnVal = value
}
return returnVal
}