Skip to content

Commit

Permalink
change getpdf
Browse files Browse the repository at this point in the history
  • Loading branch information
purplenicole730 committed Nov 21, 2024
1 parent 4073cc8 commit d94b8a3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 100 deletions.
112 changes: 23 additions & 89 deletions app/billing_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package app

import (
"context"
"sync"
"errors"
"io"

pb "go.viam.com/api/app/v1"
"go.viam.com/utils"
"go.viam.com/utils/rpc"
"google.golang.org/protobuf/types/known/timestamppb"

"go.viam.com/rdk/logging"
)

// UsageCostType specifies the type of usage cost.
Expand Down Expand Up @@ -230,89 +228,14 @@ func invoiceSummaryFromProto(summary *pb.InvoiceSummary) *InvoiceSummary {
}
}

type invoiceStream struct {
client *BillingClient
streamCancel context.CancelFunc
streamMu sync.Mutex

activeBackgroundWorkers sync.WaitGroup
}

func (s *invoiceStream) startStream(ctx context.Context, id, orgID string, ch chan []byte) error {
s.streamMu.Lock()
defer s.streamMu.Unlock()

if ctx.Err() != nil {
return ctx.Err()
}

ctx, cancel := context.WithCancel(ctx)
s.streamCancel = cancel

select {
case <-ctx.Done():
return ctx.Err()
default:
}

// This call won't return any errors it had until the client tries to receive.
//nolint:errcheck
stream, _ := s.client.client.GetInvoicePdf(ctx, &pb.GetInvoicePdfRequest{
Id: id,
OrgId: orgID,
})
_, err := stream.Recv()
if err != nil {
s.client.logger.CError(ctx, err)
return err
}

// Create a background go routine to receive from the server stream.
// We rely on calling the Done function here rather than in close stream
// since managed go calls that function when the routine exits.
s.activeBackgroundWorkers.Add(1)
utils.ManagedGo(func() {
s.receiveFromStream(ctx, stream, ch)
},
s.activeBackgroundWorkers.Done)
return nil
}

func (s *invoiceStream) receiveFromStream(ctx context.Context, stream pb.BillingService_GetInvoicePdfClient, ch chan []byte) {
defer s.streamCancel()

// repeatedly receive from the stream
for {
select {
case <-ctx.Done():
s.client.logger.Debug(ctx.Err())
return
default:
}
streamResp, err := stream.Recv()
if err != nil {
// only debug log the context canceled error
s.client.logger.Debug(err)
return
}
// If there is a response, send to the channel.
var pdf []byte
pdf = append(pdf, streamResp.Chunk...)
ch <- pdf
}
}

// BillingClient is a gRPC client for method calls to the Billing API.
type BillingClient struct {
client pb.BillingServiceClient
logger logging.Logger

mu sync.Mutex
}

// NewBillingClient constructs a new BillingClient using the connection passed in by the Viam client.
func NewBillingClient(conn rpc.ClientConn, logger logging.Logger) *BillingClient {
return &BillingClient{client: pb.NewBillingServiceClient(conn), logger: logger}
func NewBillingClient(conn rpc.ClientConn) *BillingClient {
return &BillingClient{client: pb.NewBillingServiceClient(conn)}
}

// GetCurrentMonthUsage gets the data usage information for the current month for an organization.
Expand Down Expand Up @@ -353,17 +276,28 @@ func (c *BillingClient) GetInvoicesSummary(ctx context.Context, orgID string) (f
}

// GetInvoicePDF gets the invoice PDF data.
func (c *BillingClient) GetInvoicePDF(ctx context.Context, id, orgID string, ch chan []byte) error {
stream := &invoiceStream{client: c}

err := stream.startStream(ctx, id, orgID, ch)
func (c *BillingClient) GetInvoicePDF(ctx context.Context, id, orgID string) ([]byte, error) {
stream, err := c.client.GetInvoicePdf(ctx, &pb.GetInvoicePdfRequest{
Id: id,
OrgId: orgID,
})
if err != nil {
return nil
return nil, err
}

var data []byte
for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return data, err
}
data = append(data, resp.Chunk...)
}

c.mu.Lock()
defer c.mu.Unlock()
return nil
return data, nil
}

// SendPaymentRequiredEmail sends an email about payment requirement.
Expand Down
26 changes: 18 additions & 8 deletions app/billing_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"io"
"testing"

pb "go.viam.com/api/app/v1"
Expand Down Expand Up @@ -77,7 +78,11 @@ var (
DueDate: &dueDate,
PaidDate: &paidDate,
}
chunk = []byte{4, 8}
chunk1 = []byte{4, 8}
chunk2 = []byte("chunk1")
chunk3 = []byte("chunk2")
chunks = [][]byte{chunk1, chunk2, chunk3}
chunkCount = len(chunks)
)

func sourceTypeToProto(sourceType SourceType) pb.SourceType {
Expand Down Expand Up @@ -219,28 +224,33 @@ func TestBillingClient(t *testing.T) {
})

t.Run("GetInvoicePDF", func(t *testing.T) {
var expectedData []byte
expectedData = append(expectedData, chunk1...)
expectedData = append(expectedData, chunk2...)
expectedData = append(expectedData, chunk3...)
var count int
mockStream := &inject.BillingServiceGetInvoicePdfClient{
RecvFunc: func() (*pb.GetInvoicePdfResponse, error) {
if count >= chunkCount {
return nil, io.EOF
}
chunk := chunks[count]
count++
return &pb.GetInvoicePdfResponse{
Chunk: chunk,
}, nil
},
}
ch := make(chan []byte)
grpcClient.GetInvoicePdfFunc = func(
ctx context.Context, in *pb.GetInvoicePdfRequest, opts ...grpc.CallOption,
) (pb.BillingService_GetInvoicePdfClient, error) {
test.That(t, in.Id, test.ShouldEqual, invoiceID)
test.That(t, in.OrgId, test.ShouldEqual, organizationID)
return mockStream, nil
}
err := client.GetInvoicePDF(context.Background(), invoiceID, organizationID, ch)
data, err := client.GetInvoicePDF(context.Background(), invoiceID, organizationID)
test.That(t, err, test.ShouldBeNil)
// var resp []byte
// for chunkByte := range ch {
// resp = append(resp, chunkByte...)
// }
// test.That(t, resp, test.ShouldResemble, chunk)
test.That(t, data, test.ShouldResemble, expectedData)
})

t.Run("SendPaymentRequiredEmail", func(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions app/viam_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
// ViamClient is a gRPC client for method calls to Viam app.
type ViamClient struct {
conn rpc.ClientConn
logger logging.Logger
billingClient *BillingClient
dataClient *DataClient
}
Expand Down Expand Up @@ -50,7 +49,7 @@ func CreateViamClientWithOptions(ctx context.Context, options Options, logger lo
if err != nil {
return nil, err
}
return &ViamClient{conn: conn, logger: logger}, nil
return &ViamClient{conn: conn}, nil
}

// CreateViamClientWithAPIKey creates a ViamClient with an API key.
Expand All @@ -71,7 +70,7 @@ func (c *ViamClient) Billingclient() *BillingClient {
if c.billingClient != nil {
return c.billingClient
}
c.billingClient = NewBillingClient(c.conn, c.logger)
c.billingClient = NewBillingClient(c.conn)
return c.billingClient
}

Expand Down

0 comments on commit d94b8a3

Please sign in to comment.