Skip to content

Commit

Permalink
Merge pull request #146 from nepet/update-version
Browse files Browse the repository at this point in the history
swap: Update version
  • Loading branch information
wtogami authored Oct 15, 2022
2 parents bf7bb2f + ae1aa11 commit d1d88e4
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 23 deletions.
2 changes: 1 addition & 1 deletion clightning/clightning_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
}

// get polls
polls, err := l.cl.pollService.GetPolls()
polls, err := l.cl.pollService.GetCompatiblePolls()
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/peerswap-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func run() error {
<-initChan
log.SetLogger(clightning.NewGlightninglogger(lightningPlugin.Plugin))
log.Infof("PeerSwap CLN starting up with commit %s", GitCommit)
log.Infof("DB version: %s, Protocol version: %d", version.GetCurrentVersion(), swap.PEERSWAP_PROTOCOL_VERSION)
config, err := lightningPlugin.GetConfig()
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/peerswaplnd/peerswapd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func run() error {
return err
}
log.Infof("PeerSwap LND starting up with commit %s and cfg: %s", GitCommit, cfg)
log.Infof("DB version: %s, Protocol version: %d", version.GetCurrentVersion(), swap.PEERSWAP_PROTOCOL_VERSION)

// setup lnd connection
cc, err := lnd.GetClientConnection(ctx, cfg.LndConfig)
Expand Down
2 changes: 1 addition & 1 deletion lnd/messagelistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (m *MessageListener) Start() error {
}

peerId := hex.EncodeToString(msg.Peer)
log.Debugf("[MsgListener]: Received custom message type %s from %s", messages.MessageTypeToHexString(messages.MessageType(msg.Type)), peerId)
// log.Debugf("[MsgListener]: Received custom message type %s from %s", messages.MessageTypeToHexString(messages.MessageType(msg.Type)), peerId)

m.Lock()
for _, handler := range m.handlers {
Expand Down
2 changes: 1 addition & 1 deletion peerswaprpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (p *PeerswapServer) ListPeers(ctx context.Context, request *ListPeersReques
return nil, err
}

polls, err := p.pollService.GetPolls()
polls, err := p.pollService.GetCompatiblePolls()
if err != nil {
return nil, err
}
Expand Down
68 changes: 55 additions & 13 deletions poll/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"time"

"github.com/elementsproject/peerswap/log"
"github.com/elementsproject/peerswap/swap"

"github.com/elementsproject/peerswap/messages"
)

const version uint64 = 0

type PollNotFoundErr string

func (p PollNotFoundErr) Error() string {
Expand Down Expand Up @@ -48,9 +47,10 @@ type Store interface {
}

type PollInfo struct {
Assets []string `json:"assets"`
PeerAllowed bool
LastSeen time.Time
ProtocolVersion uint64 `json:"version"`
Assets []string `json:"assets"`
PeerAllowed bool
LastSeen time.Time
}
type Service struct {
sync.RWMutex
Expand All @@ -63,6 +63,7 @@ type Service struct {
policy Policy
peers PeerGetter
store Store
tmpStore map[string]string
removeDuration time.Duration
}

Expand All @@ -78,6 +79,7 @@ func NewService(tickDuration time.Duration, removeDuration time.Duration, store
policy: policy,
peers: peers,
store: store,
tmpStore: make(map[string]string),
removeDuration: removeDuration,
}

Expand Down Expand Up @@ -114,7 +116,7 @@ func (s *Service) Stop() {
// Poll sends the POLL message to a single peer.
func (s *Service) Poll(peer string) {
poll := PollMessage{
Version: version,
Version: swap.PEERSWAP_PROTOCOL_VERSION,
Assets: s.assets,
PeerAllowed: s.policy.IsPeerAllowed(peer),
}
Expand All @@ -140,7 +142,7 @@ func (s *Service) PollAllPeers() {
// single peer.
func (s *Service) RequestPoll(peer string) {
request := RequestPollMessage{
Version: version,
Version: swap.PEERSWAP_PROTOCOL_VERSION,
Assets: s.assets,
PeerAllowed: s.policy.IsPeerAllowed(peer),
}
Expand Down Expand Up @@ -181,10 +183,22 @@ func (s *Service) MessageHandler(peerId string, msgType string, payload []byte)
return err
}
s.store.Update(peerId, PollInfo{
Assets: msg.Assets,
PeerAllowed: msg.PeerAllowed,
LastSeen: time.Now(),
ProtocolVersion: msg.Version,
Assets: msg.Assets,
PeerAllowed: msg.PeerAllowed,
LastSeen: time.Now(),
})
if ti, ok := s.tmpStore[peerId]; ok {
if ti == string(payload) {
return nil
}
}
if msg.Version != swap.PEERSWAP_PROTOCOL_VERSION {
log.Debugf("Received poll from INCOMPATIBLE peer %s: %s", peerId, string(payload))
} else {
log.Debugf("Received poll from peer %s: %s", peerId, string(payload))
}
s.tmpStore[peerId] = string(payload)
return nil
case messages.MESSAGETYPE_REQUEST_POLL:
var msg RequestPollMessage
Expand All @@ -193,12 +207,24 @@ func (s *Service) MessageHandler(peerId string, msgType string, payload []byte)
return err
}
s.store.Update(peerId, PollInfo{
Assets: msg.Assets,
PeerAllowed: msg.PeerAllowed,
LastSeen: time.Now(),
ProtocolVersion: msg.Version,
Assets: msg.Assets,
PeerAllowed: msg.PeerAllowed,
LastSeen: time.Now(),
})
// Send a poll on request
s.Poll(peerId)
if ti, ok := s.tmpStore[peerId]; ok {
if ti == string(payload) {
return nil
}
}
if msg.Version != swap.PEERSWAP_PROTOCOL_VERSION {
log.Debugf("Received poll from INCOMPATIBLE peer %s: %s", peerId, string(payload))
} else {
log.Debugf("Received poll from peer %s: %s", peerId, string(payload))
}
s.tmpStore[peerId] = string(payload)
return nil
default:
return nil
Expand All @@ -209,6 +235,22 @@ func (s *Service) GetPolls() (map[string]PollInfo, error) {
return s.store.GetAll()
}

// GetCompatiblePolls returns all polls from peers that are running the same
// protocol version.
func (s *Service) GetCompatiblePolls() (map[string]PollInfo, error) {
var compPeers = make(map[string]PollInfo)
peers, err := s.store.GetAll()
if err != nil {
return nil, err
}
for id, p := range peers {
if p.ProtocolVersion == swap.PEERSWAP_PROTOCOL_VERSION {
compPeers[id] = p
}
}
return compPeers, nil
}

// GetPollFrom returns the PollInfo for a single peer with peerId. Returns a
// PollNotFoundErr if no PollInfo for the peer is present.
func (s *Service) GetPollFrom(peerId string) (*PollInfo, error) {
Expand Down
10 changes: 8 additions & 2 deletions swap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
PEERSWAP_PROTOCOL_VERSION = 2
PEERSWAP_PROTOCOL_VERSION = 3
)

var (
Expand Down Expand Up @@ -147,12 +147,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
msgBytes := []byte(payload)
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
switch msgType {
default:
// Do nothing here, as it will spam the cln log.
return nil
case messages.MESSAGETYPE_SWAPOUTREQUEST:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapOutRequestMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand All @@ -163,6 +163,7 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_SWAPOUTAGREEMENT:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapOutAgreementMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand All @@ -183,6 +184,7 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_OPENINGTXBROADCASTED:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *OpeningTxBroadcastedMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand All @@ -203,6 +205,7 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_CANCELED:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *CancelMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand All @@ -223,6 +226,7 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_SWAPINREQUEST:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapInRequestMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand All @@ -233,6 +237,7 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_SWAPINAGREEMENT:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapInAgreementMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand All @@ -253,6 +258,7 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_COOPCLOSE:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *CoopCloseMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,6 @@ func clnclnElementsSetup(t *testing.T, fundAmt uint64) (*testframework.BitcoinNo
t.Fatalf("ListPeers %v", err)
}

lightningds[1].WaitForLog(fmt.Sprintf("From: %s got msgtype: a463", lightningds[0].Info.Id), testframework.TIMEOUT)
lightningds[0].WaitForLog(fmt.Sprintf("From: %s got msgtype: a463", lightningds[1].Info.Id), testframework.TIMEOUT)

syncPoll(&clnPollableNode{lightningds[0]}, &clnPollableNode{lightningds[1]})

return bitcoind, liquidd, []*CLightningNodeWithLiquid{{lightningds[0]}, {lightningds[1]}}, scid
Expand Down
4 changes: 2 additions & 2 deletions test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (n *clnPollableNode) TriggerPoll() error {
}

func (n *clnPollableNode) AwaitPollFrom(node pollableNode) error {
return n.WaitForLog(fmt.Sprintf("From: %s got msgtype: a463", node.GetId()), testframework.TIMEOUT)
return n.WaitForLog(fmt.Sprintf("Received poll from peer %s", node.GetId()), testframework.TIMEOUT)
}

type peerswapPollableNode struct {
Expand All @@ -132,7 +132,7 @@ func (n *peerswapPollableNode) TriggerPoll() error {
}

func (n *peerswapPollableNode) AwaitPollFrom(node pollableNode) error {
return n.WaitForLog(fmt.Sprintf("From: %s got msgtype: a463", node.GetId()), testframework.TIMEOUT)
return n.WaitForLog(fmt.Sprintf("Received poll from peer %s", node.GetId()), testframework.TIMEOUT)
}

func syncPoll(a, b pollableNode) error {
Expand Down
7 changes: 7 additions & 0 deletions version/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package version

import (
"fmt"

"go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -59,6 +60,12 @@ func (vs *VersionService) SafeUpgrade(swapService ActiveSwapGetter) error {

}

// GetCurrentVersion returns the hardcoded implementation version, sometimes
// also referred to as database version.
func GetCurrentVersion() string {
return version
}

type ActiveSwapsError struct {
version string
}
Expand Down

0 comments on commit d1d88e4

Please sign in to comment.