Skip to content

Commit

Permalink
Adding optional logger interface to client so logs can be easily capt…
Browse files Browse the repository at this point in the history
…ured (#82)

* Adding optional logger interface to client so logs can be captured by programs using the sdk

* Adding null loger and removing unused receiver from simple logger
  • Loading branch information
RachelTucker authored and rpmoore committed Jun 13, 2019
1 parent 864c055 commit 4e22fa1
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 42 deletions.
24 changes: 20 additions & 4 deletions ds3/ds3Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ds3
import (
"github.com/SpectraLogic/ds3_go_sdk/ds3/networking"
"net/url"
"github.com/SpectraLogic/ds3_go_sdk/sdk_log"
)

const (
Expand All @@ -18,11 +19,15 @@ type Client struct {
sendNetwork networking.Network
clientPolicy *ClientPolicy
connectionInfo *networking.ConnectionInfo

// Logger where all messages will be logged to
sdk_log.Logger
}

type ClientBuilder struct {
connectionInfo *networking.ConnectionInfo
clientPolicy *ClientPolicy
clientPolicy *ClientPolicy
logger sdk_log.Logger
}

type ClientPolicy struct {
Expand All @@ -35,13 +40,14 @@ const DEFAULT_MAX_REDIRECTS = 5

func NewClientBuilder(endpoint *url.URL, creds *networking.Credentials) *ClientBuilder {
return &ClientBuilder{
&networking.ConnectionInfo{
connectionInfo: &networking.ConnectionInfo{
Endpoint: endpoint,
Credentials: creds,
Proxy: nil},
&ClientPolicy{
clientPolicy: &ClientPolicy{
maxRetries: DEFAULT_MAX_RETRIES,
maxRedirect: DEFAULT_MAX_REDIRECTS}}
maxRedirect: DEFAULT_MAX_REDIRECTS},
}
}

func (clientBuilder *ClientBuilder) WithProxy(proxy *url.URL) *ClientBuilder {
Expand All @@ -59,10 +65,20 @@ func (clientBuilder *ClientBuilder) WithNetworkRetryCount(count int) *ClientBuil
return clientBuilder
}

func (clientBuilder *ClientBuilder) WithLogger(logger sdk_log.Logger) *ClientBuilder {
clientBuilder.logger = logger
return clientBuilder
}

func (clientBuilder *ClientBuilder) BuildClient() *Client {
if clientBuilder.logger == nil {
clientBuilder.logger = sdk_log.NewSimpleLogger()
}

return &Client{
sendNetwork: networking.NewSendNetwork(clientBuilder.connectionInfo),
clientPolicy: clientBuilder.clientPolicy,
connectionInfo: clientBuilder.connectionInfo,
Logger: clientBuilder.logger,
}
}
11 changes: 4 additions & 7 deletions helpers/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package helpers
import (
"testing"
"sync"
"fmt"
"github.com/SpectraLogic/ds3_go_sdk/ds3_utils/ds3Testing"
)

func testTransferBuilder(i int, waitGroup *sync.WaitGroup, resultCount *int, resultMux *sync.Mutex) TransferOperation {
func testTransferBuilder(t *testing.T, i int, resultCount *int, resultMux *sync.Mutex) TransferOperation {
return func() {
//defer waitGroup.Done()

resultMux.Lock()
*resultCount++
resultMux.Unlock()

fmt.Printf("Transfer Op: '%d'\n", i)
t.Logf("Transfer Op: '%d'\n", i)
}
}

Expand All @@ -31,9 +28,9 @@ func TestProducerConsumerModel(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)

var transferOf = testTransferBuilder(i, &wg, &resultCount, &resultMux)
var transferOf = testTransferBuilder(t, i, &resultCount, &resultMux)

fmt.Printf("Producer: '%d'\n", i)
t.Logf("Producer: '%d'\n", i)

*queue <- transferOf
}
Expand Down
31 changes: 17 additions & 14 deletions helpers/getProducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
"github.com/SpectraLogic/ds3_go_sdk/ds3"
"sync"
"log"
"io"
"github.com/SpectraLogic/ds3_go_sdk/helpers/ranges"
helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
"github.com/SpectraLogic/ds3_go_sdk/sdk_log"
)

type getProducer struct {
Expand All @@ -21,6 +21,7 @@ type getProducer struct {
processedBlobTracker blobTracker
deferredBlobQueue BlobDescriptionQueue // queue of blobs whose channels are not yet ready for transfer
rangeFinder ranges.BlobRangeFinder
sdk_log.Logger
}

func newGetProducer(jobMasterObjectList *ds3Models.MasterObjectList, getObjects *[]helperModels.GetObject, queue *chan TransferOperation, strategy *ReadTransferStrategy, client *ds3.Client, waitGroup *sync.WaitGroup) *getProducer {
Expand All @@ -35,6 +36,7 @@ func newGetProducer(jobMasterObjectList *ds3Models.MasterObjectList, getObjects
processedBlobTracker: newProcessedBlobTracker(),
deferredBlobQueue: NewBlobDescriptionQueue(),
rangeFinder: ranges.NewBlobRangeFinder(getObjects),
Logger: client.Logger, //use the same logger as the client
}
}

Expand All @@ -55,11 +57,11 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode

// Processes all the blobs in a chunk that are ready for transfer from BP
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
log.Printf("DEBUG begin chunk processing %s", curChunk.ChunkId)
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)

// transfer blobs that are ready, and queue those that are waiting for channel
for _, curObj := range curChunk.Objects {
log.Printf("DEBUG queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
producer.queueBlobForTransfer(&blob, bucketName, jobId, aggErr)
}
Expand All @@ -78,7 +80,7 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
return func() {
blobRanges := producer.rangeFinder.GetRanges(info.blob.Name(), info.blob.Offset(), info.blob.Length())

log.Printf("TRANSFER: objectName='%s' offset=%d ranges=%v", info.blob.Name(), info.blob.Offset(), blobRanges)
producer.Debugf("transferring objectName='%s' offset=%d ranges=%v", info.blob.Name(), info.blob.Offset(), blobRanges)

getObjRequest := ds3Models.NewGetObjectRequest(info.bucketName, info.blob.Name()).
WithOffset(info.blob.Offset()).
Expand All @@ -91,22 +93,22 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
getObjResponse, err := producer.client.GetObject(getObjRequest)
if err != nil {
aggErr.Append(err)
log.Printf("ERROR during retrieval of %s: %s", info.blob.Name(), err.Error())
producer.Errorf("unable to retrieve object '%s' at offset %d: %s", info.blob.Name(), info.blob.Offset(), err.Error())
return
}

if len(blobRanges) == 0 {
writer, err := info.channelBuilder.GetChannel(info.blob.Offset())
if err != nil {
aggErr.Append(err)
log.Printf("ERROR when copying content for object '%s' at offset '%d': %s", info.blob.Name(), info.blob.Offset(), err.Error())
producer.Errorf("unable to read contents of object '%s' at offset '%d': %s", info.blob.Name(), info.blob.Offset(), err.Error())
return
}
defer info.channelBuilder.OnDone(writer)
_, err = io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer
if err != nil {
aggErr.Append(err)
log.Printf("ERROR when copying content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
}
return
}
Expand All @@ -116,7 +118,7 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
err := writeRangeToDestination(info.channelBuilder, r, getObjResponse.Content)
if err != nil {
aggErr.Append(err)
log.Printf("ERROR when writing to destination channel for object '%s' with range '%v': %s", info.blob.Name(), r, err.Error())
producer.Errorf("unable to write to destination channel for object '%s' with range '%v': %s", info.blob.Name(), r, err.Error())
}
}
}
Expand Down Expand Up @@ -145,12 +147,12 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
curReadObj := producer.readObjectMap[blob.Name()]

if !curReadObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) {
log.Printf("DEBUG channel is NOT available for getting blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
producer.Debugf("channel is not currently available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
producer.deferredBlobQueue.Push(blob)
return
}

log.Printf("DEBUG channel is available for getting blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
producer.Debugf("channel is available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())

// Create transfer operation
objInfo := getObjectInfo{
Expand Down Expand Up @@ -178,11 +180,11 @@ func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string
for i := 0; i < waitingBlobs; i++ {
//attempt transfer
curBlob, err := producer.deferredBlobQueue.Pop()
log.Printf("DEBUG attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
producer.Debugf("attempting to process '%s' offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
if err != nil {
//should not be possible to get here
aggErr.Append(err)
log.Printf("ERROR when attempting blob transfer: %s", err.Error())
producer.Errorf("failure during blob transfer '%s' at offset %d: %s", curBlob.Name(), curBlob.Offset(), err.Error())
}
producer.queueBlobForTransfer(curBlob, bucketName, jobId, aggErr)
}
Expand All @@ -197,7 +199,7 @@ func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {

// determine number of blobs to be processed
var totalBlobCount int64 = producer.totalBlobCount()
log.Printf("DEBUG totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())
producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())

// process all chunks and make sure all blobs are queued for transfer
for producer.processedBlobTracker.NumberOfProcessedBlobs() < totalBlobCount || producer.deferredBlobQueue.Size() > 0 {
Expand All @@ -208,7 +210,8 @@ func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {
chunksReadyResponse, err := producer.client.GetJobChunksReadyForClientProcessingSpectraS3(chunksReady)
if err != nil {
aggErr.Append(err)
log.Fatal(err)
producer.Errorf("unrecoverable error: %v", err)
return
}

// Check to see if any chunks can be processed
Expand Down
2 changes: 1 addition & 1 deletion helpers/getTransfernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (transceiver *getTransceiver) transfer() (string, error) {
// init queue, producer and consumer
var waitGroup sync.WaitGroup

queue := newOperationQueue(transceiver.Strategy.BlobStrategy.maxWaitingTransfers())
queue := newOperationQueue(transceiver.Strategy.BlobStrategy.maxWaitingTransfers(), transceiver.Client.Logger)
producer := newGetProducer(&bulkGetResponse.MasterObjectList, transceiver.ReadObjects, &queue, transceiver.Strategy, transceiver.Client, &waitGroup)
consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers())

Expand Down
10 changes: 6 additions & 4 deletions helpers/operationQueue.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package helpers

import "log"
import (
"github.com/SpectraLogic/ds3_go_sdk/sdk_log"
)

type TransferOperation func() // transfer operation that sends/gets stuff from BP

const MinQueueSize uint = 1
const MaxQueueSize uint = 100

func newOperationQueue(size uint) chan TransferOperation {
func newOperationQueue(size uint, logger sdk_log.Logger) chan TransferOperation {
var queue chan TransferOperation

if size > MaxQueueSize {
log.Printf("WARNING Invalid operation queue size: specified value '%d' which exceeds the maximum, defaulting to '%d'\n", size, MaxQueueSize)
logger.Warningf("invalid operation queue size: specified value '%d' which exceeds the maximum, defaulting to '%d'", size, MaxQueueSize)
queue = make(chan TransferOperation, MaxQueueSize)
} else if size < MinQueueSize {
log.Printf("WARNING Invalid operation queue size: specified value '%d' which is below the minimum, defaulting to '%d'\n", size, MinQueueSize)
logger.Warningf("invalid operation queue size: specified value '%d' which is below the minimum, defaulting to '%d'", size, MinQueueSize)
queue = make(chan TransferOperation, MinQueueSize)
} else {
queue = make(chan TransferOperation, size)
Expand Down
27 changes: 16 additions & 11 deletions helpers/putProducer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package helpers

import (
"log"
ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
"github.com/SpectraLogic/ds3_go_sdk/ds3"
"sync"
"github.com/SpectraLogic/ds3_go_sdk/sdk_log"
)

type putProducer struct {
Expand All @@ -18,6 +18,7 @@ type putProducer struct {
writeObjectMap map[string]helperModels.PutObject
processedBlobTracker blobTracker
deferredBlobQueue BlobDescriptionQueue // queue of blobs whose channels are not yet ready for transfer
sdk_log.Logger
}

func newPutProducer(jobMasterObjectList *ds3Models.MasterObjectList, putObjects *[]helperModels.PutObject, queue *chan TransferOperation, strategy *WriteTransferStrategy, client *ds3.Client, waitGroup *sync.WaitGroup) *putProducer {
Expand All @@ -31,6 +32,7 @@ func newPutProducer(jobMasterObjectList *ds3Models.MasterObjectList, putObjects
writeObjectMap: toWriteObjectMap(putObjects),
deferredBlobQueue: NewBlobDescriptionQueue(),
processedBlobTracker: newProcessedBlobTracker(),
Logger: client.Logger, // use the same logger as the client
}
}

Expand Down Expand Up @@ -63,7 +65,7 @@ func (producer *putProducer) transferOperationBuilder(info putObjectInfo, aggErr
reader, err := info.channelBuilder.GetChannel(info.blob.Offset())
if err != nil {
aggErr.Append(err)
log.Printf("ERROR could not get reader for object with name='%s' offset=%d length=%d", info.blob.Name(), info.blob.Offset(), info.blob.Length())
producer.Errorf("could not get reader for object with name='%s' offset=%d length=%d", info.blob.Name(), info.blob.Offset(), info.blob.Length())
return
}
defer info.channelBuilder.OnDone(reader)
Expand All @@ -78,7 +80,7 @@ func (producer *putProducer) transferOperationBuilder(info putObjectInfo, aggErr
_, err = producer.client.PutObject(putObjRequest)
if err != nil {
aggErr.Append(err)
log.Printf("ERROR during transfer of %s: %s\n", info.blob.Name(), err.Error())
producer.Errorf("problem during transfer of %s: %s", info.blob.Name(), err.Error())
}
}
}
Expand Down Expand Up @@ -111,11 +113,11 @@ func (producer *putProducer) metadataFrom(info putObjectInfo) map[string]string
// Processes all the blobs in a chunk and attempts to add them to the transfer queue.
// If a blob is not ready for transfer, then it is added to the waiting to be transferred queue.
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
log.Printf("DEBUG begin chunk processing %s", curChunk.ChunkId)
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)

// transfer blobs that are ready, and queue those that are waiting for channel
for _, curObj := range curChunk.Objects {
log.Printf("DEBUG queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
producer.queueBlobForTransfer(&blob, bucketName, jobId, aggErr)
}
Expand All @@ -129,11 +131,12 @@ func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string
for i := 0; i < waitingBlobs; i++ {
//attempt transfer
curBlob, err := producer.deferredBlobQueue.Pop()
log.Printf("DEBUG attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
if err != nil {
aggErr.Append(err)
log.Printf("ERROR when attempting blob transfer: %s", err.Error())
producer.Errorf("problem when getting next blob to be transferred: %s", err.Error())
continue
}
producer.Debugf("attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
producer.queueBlobForTransfer(curBlob, bucketName, jobId, aggErr)
}
}
Expand All @@ -148,13 +151,13 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
curWriteObj := producer.writeObjectMap[blob.Name()]

if !curWriteObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) {
log.Printf("DEBUG channel is NOT available for blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
producer.Debugf("channel is not currently available for blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
// Not ready to be transferred
producer.deferredBlobQueue.Push(blob)
return
}

log.Printf("DEBUG channel is available for blob %s offset=%d length=%d", curWriteObj.PutObject.Name, blob.Offset(), blob.Length())
producer.Debugf("channel is available for blob %s offset=%d length=%d", curWriteObj.PutObject.Name, blob.Offset(), blob.Length())
// Blob ready to be transferred

// Create transfer operation
Expand Down Expand Up @@ -184,7 +187,7 @@ func (producer *putProducer) run(aggErr *ds3Models.AggregateError) {

// determine number of blobs to be processed
var totalBlobCount int64 = producer.totalBlobCount()
log.Printf("DEBUG totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())
producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())

// process all chunks and make sure all blobs are queued for transfer
for producer.processedBlobTracker.NumberOfProcessedBlobs() < totalBlobCount || producer.deferredBlobQueue.Size() > 0 {
Expand All @@ -194,7 +197,9 @@ func (producer *putProducer) run(aggErr *ds3Models.AggregateError) {
chunksReady := ds3Models.NewGetJobChunksReadyForClientProcessingSpectraS3Request(producer.JobMasterObjectList.JobId)
chunksReadyResponse, err := producer.client.GetJobChunksReadyForClientProcessingSpectraS3(chunksReady)
if err != nil {
log.Fatal(err)
aggErr.Append(err)
producer.Errorf("unrecoverable error: %v", err)
return
}

// Check to see if any chunks can be processed
Expand Down
2 changes: 1 addition & 1 deletion helpers/putTransceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (transceiver *putTransceiver) transfer() (string, error) {
// init queue, producer and consumer
var waitGroup sync.WaitGroup

queue := newOperationQueue(transceiver.Strategy.BlobStrategy.maxWaitingTransfers())
queue := newOperationQueue(transceiver.Strategy.BlobStrategy.maxWaitingTransfers(), transceiver.Client.Logger)
producer := newPutProducer(&bulkPutResponse.MasterObjectList, transceiver.WriteObjects, &queue, transceiver.Strategy, transceiver.Client, &waitGroup)
consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers())

Expand Down
8 changes: 8 additions & 0 deletions sdk_log/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package sdk_log

type Logger interface {
Infof(format string, args ...interface{})
Debugf(format string, args ...interface{})
Warningf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
Loading

0 comments on commit 4e22fa1

Please sign in to comment.