Skip to content

Commit

Permalink
Added version information to peers storage. (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickeskov authored Aug 19, 2021
1 parent 57fe238 commit 86b30fc
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 19 deletions.
106 changes: 90 additions & 16 deletions pkg/node/peer_manager/storage/cbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
peersStorageDir = "peers_storage"
// if you change peers storage data format, you have to increment peersStorageCurrentVersion
peersStorageCurrentVersion = 1
peersStorageDir = "peers_storage"
)

type CBORStorage struct {
Expand All @@ -27,10 +31,10 @@ type CBORStorage struct {

func NewCBORStorage(baseDir string, now time.Time) (*CBORStorage, error) {
storageDir := filepath.Join(baseDir, peersStorageDir)
return newCBORStorageInDir(storageDir, now)
return newCBORStorageInDir(storageDir, now, peersStorageCurrentVersion)
}

func newCBORStorageInDir(storageDir string, now time.Time) (*CBORStorage, error) {
func newCBORStorageInDir(storageDir string, now time.Time, currVersion int) (*CBORStorage, error) {
if err := os.MkdirAll(storageDir, os.ModePerm); err != nil {
return nil, errors.Wrapf(err, "failed to create peers storage directory %q", storageDir)
}
Expand All @@ -39,30 +43,50 @@ func newCBORStorageInDir(storageDir string, now time.Time) (*CBORStorage, error)
if err := createFileIfNotExist(knownFile); err != nil {
return nil, errors.Wrap(err, "failed to create known peers storage file")
}

suspendedFile := suspendedFilePath(storageDir)
if err := createFileIfNotExist(suspendedFile); err != nil {
return nil, errors.Wrap(err, "failed to create suspended peers storage file")
}

known := knownPeers{}
if err := unmarshalCborFromFile(knownFile, &known); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load known peers from file %q", knownFile)
}

suspended := suspendedPeers{}
if err := unmarshalCborFromFile(suspendedFile, &suspended); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load suspended peers from file %q", suspendedFile)
}

storage := &CBORStorage{
storageDir: storageDir,
suspended: suspended,
suspended: suspendedPeers{},
suspendedFilePath: suspendedFile,
known: known,
known: knownPeers{},
knownFilePath: knownFile,
}

versionFile := storageVersionFilePath(storageDir)
oldVersion, err := getPeersStorageVersion(versionFile)
switch {
case err == io.EOF:
// Empty version file, set new version
if err := storage.invalidateStorageAndUpdateVersion(versionFile, currVersion, oldVersion); err != nil {
return nil, errors.Wrap(err, "failed set version to new peers storage")
}
case err != nil:
return nil, errors.Wrap(err, "failed to validate peers storage version")
}

if oldVersion != currVersion {
// Invalidating old peers storage
zap.S().Debugf(
"Detected different peers storage versions: old='%d', current='%d'. Removing old peers storage.",
oldVersion,
currVersion,
)
if err := storage.invalidateStorageAndUpdateVersion(versionFile, currVersion, oldVersion); err != nil {
return nil, errors.Wrap(err, "failed invalidate storage and set new version to peers storage")
}
}

if err := unmarshalCborFromFile(knownFile, &storage.known); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load known peers from file %q", knownFile)
}
if err := unmarshalCborFromFile(suspendedFile, &storage.suspended); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load suspended peers from file %q", suspendedFile)
}

if len(storage.suspended) != 0 {
// Remove expired peers
if err := storage.RefreshSuspended(now); err != nil {
Expand Down Expand Up @@ -274,6 +298,24 @@ func (bs *CBORStorage) DropStorage() error {
return nil
}

func (bs *CBORStorage) invalidateStorageAndUpdateVersion(versionFile string, currVersion, oldVersion int) error {
if err := bs.DropStorage(); err != nil {
return errors.Wrapf(err,
"failed to drop peers storage in case of different versions, old='%d', current='%d'",
oldVersion,
currVersion,
)
}
if err := updatePeersStorageVersion(versionFile, currVersion); err != nil {
return errors.Wrapf(err,
"failed to update peers storage file, old='%d', current='%d'",
oldVersion,
currVersion,
)
}
return nil
}

func (bs *CBORStorage) unsafeSyncKnown(newEntries []KnownPeer, backup knownPeers) error {
if err := marshalToCborAndSyncToFile(bs.knownFilePath, bs.known); err != nil {
// In case of failure restore initial state from backup
Expand Down Expand Up @@ -391,6 +433,10 @@ func suspendedFilePath(storageDir string) string {
return filepath.Join(storageDir, "peers_suspended.cbor")
}

func storageVersionFilePath(storageDir string) string {
return filepath.Join(storageDir, "peers_storage_version.txt")
}

func createFileIfNotExist(path string) (err error) {
knownFile, err := os.OpenFile(path, os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
Expand All @@ -403,3 +449,31 @@ func createFileIfNotExist(path string) (err error) {
}()
return nil
}

func updatePeersStorageVersion(storageVersionFile string, newVersion int) error {
stringVersion := strconv.Itoa(newVersion)
err := ioutil.WriteFile(storageVersionFile, []byte(stringVersion), 0644)
if err != nil {
return errors.Wrapf(err, "failed to write data in file %q", storageVersionFile)
}
return nil
}

func getPeersStorageVersion(storageVersionFile string) (int, error) {
if err := createFileIfNotExist(storageVersionFile); err != nil {
return 0, errors.Wrap(err, "failed to create if not exists storage version file")
}
versionData, err := ioutil.ReadFile(storageVersionFile)
if err != nil {
return 0, errors.Wrapf(err, "failed to read from file %q", storageVersionFile)
}
if len(versionData) == 0 {
// it's a new peers storage
return 0, io.EOF
}
oldVersion, err := strconv.Atoi(string(versionData))
if err != nil {
return 0, errors.Wrapf(err, "failed to parse peers storage version from file %q", storageVersionFile)
}
return oldVersion, nil
}
33 changes: 30 additions & 3 deletions pkg/node/peer_manager/storage/cbor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *binaryStorageCborSuite) SetupTest() {
}
}()
now := time.Now()
storage, err := newCBORStorageInDir(tmpdir, now)
storage, err := newCBORStorageInDir(tmpdir, now, peersStorageCurrentVersion)
require.NoError(s.T(), err)

s.storage = storage
Expand Down Expand Up @@ -297,7 +297,7 @@ func (s *binaryStorageCborSuite) TestCBORStorageSuspended() {
}()

newNow := now.Add(suspendDuration)
storage, err := newCBORStorageInDir(s.storage.storageDir, newNow)
storage, err := newCBORStorageInDir(s.storage.storageDir, newNow, peersStorageCurrentVersion)
require.NoError(s.T(), err)
s.storage = storage

Expand Down Expand Up @@ -333,7 +333,7 @@ func (s *binaryStorageCborSuite) TestCBORStorageSuspended() {
})
}

func (s *binaryStorageCborSuite) TestCBORStorageDrops() {
func (s *binaryStorageCborSuite) TestCBORStorageDropsAndVersioning() {
suspendDuration := time.Minute * 5
now := s.now.Truncate(time.Millisecond)
suspended := []SuspendedPeer{
Expand Down Expand Up @@ -422,4 +422,31 @@ func (s *binaryStorageCborSuite) TestCBORStorageDrops() {
checkSuspendedStorageFile()
checkKnownStorageFile()
})

s.Run("drop peers storage in case of different version", func() {
versionFilePath := storageVersionFilePath(s.storage.storageDir)
defer func() {
storage, err := newCBORStorageInDir(s.storage.storageDir, s.now, peersStorageCurrentVersion)
require.NoError(s.T(), err)
s.storage = storage

version, err := getPeersStorageVersion(versionFilePath)
require.NoError(s.T(), err)
require.Equal(s.T(), peersStorageCurrentVersion, version)

require.NoError(s.T(), s.storage.AddSuspended(suspended))
require.NoError(s.T(), s.storage.AddKnown(known))
}()

storage, err := newCBORStorageInDir(s.storage.storageDir, s.now, -1)
require.NoError(s.T(), err)
s.storage = storage

version, err := getPeersStorageVersion(versionFilePath)
require.NoError(s.T(), err)
require.Equal(s.T(), -1, version)

checkSuspendedStorageFile()
checkKnownStorageFile()
})
}

0 comments on commit 86b30fc

Please sign in to comment.