Skip to content

Commit

Permalink
Use stdlib md5 + tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
evenh committed Feb 17, 2022
1 parent 39141e8 commit 289d24a
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 32 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/evenh/az-blob-hashdeep
go 1.17

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0
github.com/openlyinc/pointy v1.1.2
github.com/pkg/errors v0.9.1
Expand All @@ -11,11 +12,8 @@ require (
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,6 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.1 h1:lb04bBEJoAoV48eHs4Eq0UyhmJCkRSdIjQ3uS8WJRM4=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand All @@ -270,8 +268,6 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
Expand Down
34 changes: 20 additions & 14 deletions internal/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/evenh/az-blob-hashdeep/internal/hashes"
md5simd "github.com/minio/md5-simd"
"github.com/openlyinc/pointy"
log "github.com/sirupsen/logrus"
)
Expand All @@ -34,11 +34,7 @@ const maxAzResults int32 = 5000
const channelSize = maxAzResults * 2
const progressInterval = 5 * time.Minute

var mdFivess = md5simd.NewServer()

func Generate(ctx context.Context, c *GenerateConfig) {
defer mdFivess.Close()

var wg sync.WaitGroup
files := make(chan *HashdeepEntry, channelSize)
writer := &HashdeepOutputFile{OutputFile: c.OutputFile, PathPrefix: c.Prefix}
Expand Down Expand Up @@ -99,13 +95,15 @@ func traverseBlobStorage(ctx context.Context, files chan *HashdeepEntry, c *Gene
var hasher hashes.Hasher
if c.Calculate {
logger.Info("hashing strategy: Download files and calculate hashes locally")
hasher = hashes.DownloadAndCalculateHasher{
Client: &container,
MdFiveHashServer: &mdFivess,
hasher = &hashes.DownloadAndCalculateHasher{
Client: &container,
}
// hasher = &hashes.BuiltinDownloadAndCalculateHasher{
// Client: &container,
// }
} else {
logger.Info("hashing strategy: Use hash from blob metadata")
hasher = hashes.MetadataHasher{}
hasher = &hashes.MetadataHasher{}
}
hashJobs, workersGroup := configureBackgroundWorkers(ctx, c.WorkerCount, hasher, files)

Expand Down Expand Up @@ -138,7 +136,7 @@ func traverseBlobStorage(ctx context.Context, files chan *HashdeepEntry, c *Gene
close(hashJobs)

if err := pager.Err(); err != nil {
handleErrors("list_blobs", err)
handleErrors("list_blobs", err)(logger)
}

logger.Debug("awaiting workersGroup")
Expand All @@ -152,15 +150,15 @@ func azureCheck(ctx context.Context, c *GenerateConfig) azblob.ContainerClient {

container, err := configureContainerClient(c)
if err != nil {
handleErrors("az_client_configuration", err)
handleErrors("az_client_configuration", err)(logger)
os.Exit(1)
}

// Self test: Can we reach the container via the API?
logger.Debug("performing connectivity test")
_, err = container.GetProperties(ctx, nil)
if err != nil {
handleErrors("connectivity_test", err)
handleErrors("connectivity_test", err)(logger)
os.Exit(1)
}

Expand All @@ -172,11 +170,19 @@ func azureCheck(ctx context.Context, c *GenerateConfig) azblob.ContainerClient {
func configureContainerClient(c *GenerateConfig) (azblob.ContainerClient, error) {
logger := log.WithField("phase", "configure_auth")
u := fmt.Sprintf("https://%s.blob.core.windows.net/%s", c.AccountName, c.Container)
opts := &azblob.ClientOptions{
Transporter: customHttpClient(c.WorkerCount*2, 10*time.Second),
Retry: policy.RetryOptions{
MaxRetries: 3,
RetryDelay: 4,
MaxRetryDelay: 3 * 3,
},
}

if len(c.SasToken) > 0 {
logger.Infof("Using SAS token")
sasFormat := fmt.Sprintf("%s?%s", u, c.SasToken)
return azblob.NewContainerClientWithNoCredential(sasFormat, nil)
return azblob.NewContainerClientWithNoCredential(sasFormat, opts)
}

// Account key
Expand All @@ -186,5 +192,5 @@ func configureContainerClient(c *GenerateConfig) (azblob.ContainerClient, error)
log.Fatalf("could not configure account key: %+v", err)
}

return azblob.NewContainerClientWithSharedKey(u, credential, nil)
return azblob.NewContainerClientWithSharedKey(u, credential, opts)
}
19 changes: 11 additions & 8 deletions internal/hashes/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ package hashes

import (
"context"
"crypto/md5"
"fmt"
"io"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
md5simd "github.com/minio/md5-simd"
"github.com/openlyinc/pointy"
log "github.com/sirupsen/logrus"
)
Expand All @@ -37,26 +37,29 @@ var (

// Stream bytes to memory and perform MD5 hashing locally.
type DownloadAndCalculateHasher struct {
Client *azblob.ContainerClient
MdFiveHashServer *md5simd.Server
Client *azblob.ContainerClient
}

func (d DownloadAndCalculateHasher) Hash(ctx context.Context, item azblob.BlobItemInternal) (*string, error) {
func (d *DownloadAndCalculateHasher) Hash(ctx context.Context, item azblob.BlobItemInternal) (*string, error) {
url := d.Client.NewBlobClient(*item.Name)
resp, err := url.Download(ctx, downloadBlobOptions)
if err != nil {
return nil, err
}

mdFive := (*d.MdFiveHashServer).NewHash()
defer mdFive.Close()
h := md5.New()

blobStream := resp.Body(azblob.RetryReaderOptions{MaxRetryRequests: 5})
defer func(blobStream io.ReadCloser) {
if err := blobStream.Close(); err != nil {
logger.Warnf("could not close blob stream for %s", url.URL())
}
}(blobStream)

if _, err = io.Copy(mdFive, blobStream); err != nil {
if _, err = io.Copy(h, blobStream); err != nil {
logger.Warnf("could not download %s for local hash calculation", url.URL())
return nil, nil
}

return pointy.String(fmt.Sprintf("%x", mdFive.Sum(nil))), nil
return pointy.String(fmt.Sprintf("%x", h.Sum(nil))), nil
}
4 changes: 2 additions & 2 deletions internal/hashes/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Hasher interface {
type MetadataHasher struct {
}

func (m MetadataHasher) Hash(_ context.Context, item azblob.BlobItemInternal) (*string, error) {
func (m *MetadataHasher) Hash(_ context.Context, item azblob.BlobItemInternal) (*string, error) {
return pointy.String(hex.EncodeToString(item.Properties.ContentMD5)), nil
}

Expand All @@ -40,6 +40,6 @@ type DummyHasher struct {
StaticValue string
}

func (d DummyHasher) Hash(_ context.Context, _ azblob.BlobItemInternal) (*string, error) {
func (d *DummyHasher) Hash(_ context.Context, _ azblob.BlobItemInternal) (*string, error) {
return pointy.String(d.StaticValue), nil
}
20 changes: 19 additions & 1 deletion internal/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ limitations under the License.
*/
package internal

import log "github.com/sirupsen/logrus"
import (
"net/http"
"time"

log "github.com/sirupsen/logrus"
)

func handleErrors(step string, err error) func(logger *log.Entry) {
return func(logger *log.Entry) {
Expand All @@ -27,3 +32,16 @@ func handleErrors(step string, err error) func(logger *log.Entry) {
logger.WithField("step", step).Warnf("encountered error: %v", err)
}
}

func customHttpClient(maxConnections int, idleConnectionTimeout time.Duration) *http.Client {
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = maxConnections
t.MaxConnsPerHost = maxConnections
t.MaxIdleConnsPerHost = maxConnections
t.IdleConnTimeout = idleConnectionTimeout

return &http.Client{
Timeout: idleConnectionTimeout,
Transport: t,
}
}

0 comments on commit 289d24a

Please sign in to comment.