Skip to content

Commit

Permalink
Merge pull request #152 from kaleido-io/close-revamp
Browse files Browse the repository at this point in the history
Updates to avoid timing windows in WebSocket eventstream resulting in blocked streams in edge case reconnect scenario
  • Loading branch information
gabriel-indik authored Sep 28, 2021
2 parents 8545f1c + 9aeade1 commit e708f38
Show file tree
Hide file tree
Showing 10 changed files with 571 additions and 107 deletions.
500 changes: 500 additions & 0 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/contractgateway/smartcontractgw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type mockWebSocketServer struct {
testChan chan interface{}
}

func (m *mockWebSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) {
return nil, nil, nil, nil
func (m *mockWebSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error) {
return nil, nil, nil
}

func (m *mockWebSocketServer) SendReply(message interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ const (
LevelDBFailedRetriveOriginalKey = "Failed to retrieve the entry for the original key: %s. %s"
// LevelDBFailedRetriveGeneratedID problem retrieving entry - generated ID
LevelDBFailedRetriveGeneratedID = "Failed to retrieve the entry for the generated ID: %s. %s"

// WebSocketClosed websocket was closed
WebSocketClosed = "WebSocket '%s' closed"
)

type Error string
Expand Down
6 changes: 3 additions & 3 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (a *eventStream) eventPoller() {
// We do the reset on the event processing thread, to avoid any concurrency issue.
// It's just an unsubscribe, which clears the resetRequested flag and sets us stale.
if sub.resetRequested {
sub.unsubscribe(ctx, false)
_ = sub.unsubscribe(ctx, false)
// Clear any checkpoint
delete(checkpoint, sub.info.ID)
}
Expand Down Expand Up @@ -597,8 +597,8 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) {
// handler failed, then the ErrorHandling strategy kicks in
processed = (err == nil)
if !processed {
log.Errorf("%s: Batch %d attempt %d failed. ErrorHandling=%s BlockedRetryDelay=%ds",
a.spec.ID, batchNumber, attempt, a.spec.ErrorHandling, a.spec.BlockedRetryDelaySec)
log.Errorf("%s: Batch %d attempt %d failed. ErrorHandling=%s BlockedRetryDelay=%ds err=%s",
a.spec.ID, batchNumber, attempt, a.spec.ErrorHandling, a.spec.BlockedRetryDelaySec, err)
processed = (a.spec.ErrorHandling == ErrorHandlingSkip)
}
}
Expand Down
91 changes: 22 additions & 69 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ func TestProcessEventsEnd2EndWebSocket(t *testing.T) {
WebSocket: &webSocketActionInfo{},
Timestamps: false,
}, db, 200)
mockWebSocket.receiver <- fmt.Errorf("Spurious ack from a previvous socket - to be ignored")

s := setupTestSubscription(assert, sm, stream, "mySubName")
assert.Equal("mySubName", s.Name)
Expand Down Expand Up @@ -1001,7 +1002,6 @@ func TestInterruptWebSocketReceive(t *testing.T) {
sender: make(chan interface{}),
broadcast: make(chan interface{}),
receiver: make(chan error),
closing: make(chan struct{}),
}
es := &eventStream{
wsChannels: wsChannels,
Expand Down Expand Up @@ -1339,6 +1339,27 @@ func TestUpdateStreamSwapType(t *testing.T) {
assert.EqualError(err, "The type of an event stream cannot be changed")
}

func TestUpdateStreamInProgress(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
BatchSize: 5,
Webhook: &webhookActionInfo{},
}, db, 200)
defer svr.Close()
defer close(eventStream)
defer stream.stop()

stream.updateInProgress = true
_, err := stream.update(&StreamInfo{})
assert.Regexp("Update to event stream already in progress", err)
}

func TestUpdateWebSocketBadDistributionMode(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
Expand Down Expand Up @@ -1402,74 +1423,6 @@ func TestUpdateWebSocket(t *testing.T) {
assert.NoError(err)
}

func TestWebSocketClientClosedOnSend(t *testing.T) {

dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
BatchSize: 5,
Type: "websocket",
WebSocket: &webSocketActionInfo{
Topic: "test1",
},
}, db, 200)
defer svr.Close()
defer close(eventStream)
defer stream.stop()

mws := stream.wsChannels.(*mockWebSocket)
wsa := stream.action.(*webSocketAction)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wsa.attemptBatch(0, 0, []*eventData{})
wg.Done()
}()

close(mws.closing)
wg.Wait()

}

func TestWebSocketClientClosedOnReceive(t *testing.T) {

dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
BatchSize: 5,
Type: "websocket",
WebSocket: &webSocketActionInfo{
Topic: "test1",
},
}, db, 200)
defer svr.Close()
defer close(eventStream)
defer stream.stop()

mws := stream.wsChannels.(*mockWebSocket)
wsa := stream.action.(*webSocketAction)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wsa.attemptBatch(0, 0, []*eventData{})
wg.Done()
}()

<-mws.sender

close(mws.closing)
wg.Wait()

}

func TestUpdateStreamMissingWebhookURL(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
Expand Down
8 changes: 3 additions & 5 deletions internal/events/submanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ type mockWebSocket struct {
sender chan interface{}
broadcast chan interface{}
receiver chan error
closing chan struct{}
}

func (m *mockWebSocket) GetChannels(namespace string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) {
func (m *mockWebSocket) GetChannels(namespace string) (chan<- interface{}, chan<- interface{}, <-chan error) {
m.capturedNamespace = namespace
return m.sender, m.broadcast, m.receiver, m.closing
return m.sender, m.broadcast, m.receiver
}

func (m *mockWebSocket) SendReply(message interface{}) {}
Expand All @@ -66,8 +65,7 @@ func newMockWebSocket() *mockWebSocket {
return &mockWebSocket{
sender: make(chan interface{}),
broadcast: make(chan interface{}),
receiver: make(chan error),
closing: make(chan struct{}),
receiver: make(chan error, 1),
}
}

Expand Down
28 changes: 19 additions & 9 deletions internal/events/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package events

import (
"github.com/hyperledger/firefly-ethconnect/internal/errors"
log "github.com/sirupsen/logrus"
)

type webSocketAction struct {
Expand Down Expand Up @@ -44,7 +45,7 @@ func (w *webSocketAction) attemptBatch(batchNumber, attempt uint64, events []*ev
}

// Get a blocking channel to send and receive on our chosen namespace
sender, broadcaster, receiver, closing := w.es.wsChannels.GetChannels(topic)
sender, broadcaster, receiver := w.es.wsChannels.GetChannels(topic)

var channel chan<- interface{}
switch w.spec.DistributionMode {
Expand All @@ -54,28 +55,37 @@ func (w *webSocketAction) attemptBatch(batchNumber, attempt uint64, events []*ev
channel = sender
}

// Clear out any current ack/error
purging := true
for purging {
select {
case err1 := <-receiver:
log.Warnf("Cleared out suprious ack (could be from previous disonnect). err=%s", err1)
default:
purging = false
}
}

// Sent the batch of events
select {
case channel <- events:
break
case <-w.es.updateInterrupt:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedSend)
case <-closing:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedSend)
err = errors.Errorf(errors.EventStreamsWebSocketInterruptedSend)
}

// If we ever add more distribution modes, we may want to change this logic from a simple if statement
if w.spec.DistributionMode != DistributionModeBroadcast {
if err == nil && w.spec.DistributionMode != DistributionModeBroadcast {
// Wait for the next ack or exception
select {
case err = <-receiver:
break
case <-w.es.updateInterrupt:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedReceive)
case <-closing:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedReceive)
err = errors.Errorf(errors.EventStreamsWebSocketInterruptedReceive)
}
// Pass back any exception from the client
}

// Pass back any exception from the client
log.Infof("Attempt batch %d complete. ok=%t", batchNumber, err == nil)
return err
}
5 changes: 3 additions & 2 deletions internal/ws/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (c *webSocketConnection) close() {
c.mux.Unlock()

for _, t := range c.topics {
c.server.cycleTopic(t)
c.server.cycleTopic(c.id, t)
log.Infof("WS/%s: Websocket closed while active on topic '%s'", c.id, t.topic)
}
c.server.connectionClosed(c)
log.Infof("WS/%s: Disconnected", c.id)
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *webSocketConnection) sender() {
cases = buildCases()
} else {
// Message from one of the existing topics
c.conn.WriteJSON(value.Interface())
_ = c.conn.WriteJSON(value.Interface())
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions internal/ws/wsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/hyperledger/firefly-ethconnect/internal/errors"
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
)

// WebSocketChannels is provided to allow us to do a blocking send to a namespace that will complete once a client connects on it
// We also provide a channel to listen on for closing of the connection, to allow a select to wake on a blocking send
type WebSocketChannels interface {
GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{})
GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error)
SendReply(message interface{})
}

Expand Down Expand Up @@ -56,7 +57,6 @@ type webSocketTopic struct {
senderChannel chan interface{}
broadcastChannel chan interface{}
receiverChannel chan error
closingChannel chan struct{}
}

// NewWebSocketServer create a new server with a simplified interface
Expand Down Expand Up @@ -91,14 +91,16 @@ func (s *webSocketServer) handler(w http.ResponseWriter, r *http.Request, p http
s.connections[c.id] = c
}

func (s *webSocketServer) cycleTopic(t *webSocketTopic) {
func (s *webSocketServer) cycleTopic(connInfo string, t *webSocketTopic) {
s.mux.Lock()
defer s.mux.Unlock()

// When a connection that was listening on a topic closes, we need to wake anyone
// that was listening for a response
close(t.closingChannel)
t.closingChannel = make(chan struct{})
select {
case t.receiverChannel <- errors.Errorf(errors.WebSocketClosed, connInfo):
default:
}
}

func (s *webSocketServer) connectionClosed(c *webSocketConnection) {
Expand Down Expand Up @@ -130,7 +132,6 @@ func (s *webSocketServer) getTopic(topic string) *webSocketTopic {
senderChannel: make(chan interface{}),
broadcastChannel: make(chan interface{}),
receiverChannel: make(chan error, 1),
closingChannel: make(chan struct{}),
}
s.topics[topic] = t
s.topicMap[topic] = make(map[string]*webSocketConnection)
Expand All @@ -143,9 +144,9 @@ func (s *webSocketServer) getTopic(topic string) *webSocketTopic {
return t
}

func (s *webSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) {
func (s *webSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error) {
t := s.getTopic(topic)
return t.senderChannel, t.broadcastChannel, t.receiverChannel, t.closingChannel
return t.senderChannel, t.broadcastChannel, t.receiverChannel
}

func (s *webSocketServer) ListenOnTopic(c *webSocketConnection, topic string) {
Expand Down
16 changes: 7 additions & 9 deletions internal/ws/wsserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestConnectSendReceiveCycle(t *testing.T) {
Type: "listen",
})

s, _, r, _ := w.GetChannels("")
s, _, r := w.GetChannels("")

s <- "Hello World"

Expand Down Expand Up @@ -111,8 +111,8 @@ func TestConnectTopicIsolation(t *testing.T) {
Topic: "topic2",
})

s1, _, r1, _ := w.GetChannels("topic1")
s2, _, r2, _ := w.GetChannels("topic2")
s1, _, r1 := w.GetChannels("topic1")
s2, _, r2 := w.GetChannels("topic2")

s1 <- "Hello Number 1"
s2 <- "Hello Number 2"
Expand Down Expand Up @@ -155,16 +155,14 @@ func TestConnectAbandonRequest(t *testing.T) {
c.WriteJSON(&webSocketCommandMessage{
Type: "listen",
})
_, _, r, closing := w.GetChannels("")
_, _, r := w.GetChannels("")

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
select {
case <-r:
break
case <-closing:
break
}
wg.Done()
}()
Expand Down Expand Up @@ -247,7 +245,7 @@ func TestBroadcast(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

_, b, _, _ := w.GetChannels(topic)
_, b, _ := w.GetChannels(topic)
b <- "Hello World"

var val string
Expand Down Expand Up @@ -284,7 +282,7 @@ func TestBroadcastDefaultTopic(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

_, b, _, _ := w.GetChannels(topic)
_, b, _ := w.GetChannels(topic)
b <- "Hello World"

var val string
Expand Down Expand Up @@ -321,7 +319,7 @@ func TestRecvNotOk(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

_, b, _, _ := w.GetChannels(topic)
_, b, _ := w.GetChannels(topic)
close(b)
w.Close()
}
Expand Down

0 comments on commit e708f38

Please sign in to comment.