Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to avoid timing windows in WebSocket eventstream resulting in blocked streams in edge case reconnect scenario #152

Merged
merged 3 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
500 changes: 500 additions & 0 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/contractgateway/smartcontractgw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type mockWebSocketServer struct {
testChan chan interface{}
}

func (m *mockWebSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) {
return nil, nil, nil, nil
func (m *mockWebSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error) {
return nil, nil, nil
}

func (m *mockWebSocketServer) SendReply(message interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ const (
LevelDBFailedRetriveOriginalKey = "Failed to retrieve the entry for the original key: %s. %s"
// LevelDBFailedRetriveGeneratedID problem retrieving entry - generated ID
LevelDBFailedRetriveGeneratedID = "Failed to retrieve the entry for the generated ID: %s. %s"

// WebSocketClosed websocket was closed
WebSocketClosed = "WebSocket '%s' closed"
)

type Error string
Expand Down
6 changes: 3 additions & 3 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (a *eventStream) eventPoller() {
// We do the reset on the event processing thread, to avoid any concurrency issue.
// It's just an unsubscribe, which clears the resetRequested flag and sets us stale.
if sub.resetRequested {
sub.unsubscribe(ctx, false)
_ = sub.unsubscribe(ctx, false)
// Clear any checkpoint
delete(checkpoint, sub.info.ID)
}
Expand Down Expand Up @@ -597,8 +597,8 @@ func (a *eventStream) processBatch(batchNumber uint64, events []*eventData) {
// handler failed, then the ErrorHandling strategy kicks in
processed = (err == nil)
if !processed {
log.Errorf("%s: Batch %d attempt %d failed. ErrorHandling=%s BlockedRetryDelay=%ds",
a.spec.ID, batchNumber, attempt, a.spec.ErrorHandling, a.spec.BlockedRetryDelaySec)
log.Errorf("%s: Batch %d attempt %d failed. ErrorHandling=%s BlockedRetryDelay=%ds err=%s",
a.spec.ID, batchNumber, attempt, a.spec.ErrorHandling, a.spec.BlockedRetryDelaySec, err)
processed = (a.spec.ErrorHandling == ErrorHandlingSkip)
}
}
Expand Down
70 changes: 1 addition & 69 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ func TestProcessEventsEnd2EndWebSocket(t *testing.T) {
WebSocket: &webSocketActionInfo{},
Timestamps: false,
}, db, 200)
mockWebSocket.receiver <- fmt.Errorf("Spurious ack from a previvous socket - to be ignored")

s := setupTestSubscription(assert, sm, stream, "mySubName")
assert.Equal("mySubName", s.Name)
Expand Down Expand Up @@ -1001,7 +1002,6 @@ func TestInterruptWebSocketReceive(t *testing.T) {
sender: make(chan interface{}),
broadcast: make(chan interface{}),
receiver: make(chan error),
closing: make(chan struct{}),
}
es := &eventStream{
wsChannels: wsChannels,
Expand Down Expand Up @@ -1402,74 +1402,6 @@ func TestUpdateWebSocket(t *testing.T) {
assert.NoError(err)
}

func TestWebSocketClientClosedOnSend(t *testing.T) {

dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
BatchSize: 5,
Type: "websocket",
WebSocket: &webSocketActionInfo{
Topic: "test1",
},
}, db, 200)
defer svr.Close()
defer close(eventStream)
defer stream.stop()

mws := stream.wsChannels.(*mockWebSocket)
wsa := stream.action.(*webSocketAction)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wsa.attemptBatch(0, 0, []*eventData{})
wg.Done()
}()

close(mws.closing)
wg.Wait()

}

func TestWebSocketClientClosedOnReceive(t *testing.T) {

dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
BatchSize: 5,
Type: "websocket",
WebSocket: &webSocketActionInfo{
Topic: "test1",
},
}, db, 200)
defer svr.Close()
defer close(eventStream)
defer stream.stop()

mws := stream.wsChannels.(*mockWebSocket)
wsa := stream.action.(*webSocketAction)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wsa.attemptBatch(0, 0, []*eventData{})
wg.Done()
}()

<-mws.sender

close(mws.closing)
wg.Wait()

}

func TestUpdateStreamMissingWebhookURL(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
Expand Down
8 changes: 3 additions & 5 deletions internal/events/submanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ type mockWebSocket struct {
sender chan interface{}
broadcast chan interface{}
receiver chan error
closing chan struct{}
}

func (m *mockWebSocket) GetChannels(namespace string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) {
func (m *mockWebSocket) GetChannels(namespace string) (chan<- interface{}, chan<- interface{}, <-chan error) {
m.capturedNamespace = namespace
return m.sender, m.broadcast, m.receiver, m.closing
return m.sender, m.broadcast, m.receiver
}

func (m *mockWebSocket) SendReply(message interface{}) {}
Expand All @@ -66,8 +65,7 @@ func newMockWebSocket() *mockWebSocket {
return &mockWebSocket{
sender: make(chan interface{}),
broadcast: make(chan interface{}),
receiver: make(chan error),
closing: make(chan struct{}),
receiver: make(chan error, 1),
}
}

Expand Down
28 changes: 19 additions & 9 deletions internal/events/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package events

import (
"github.com/hyperledger/firefly-ethconnect/internal/errors"
log "github.com/sirupsen/logrus"
)

type webSocketAction struct {
Expand Down Expand Up @@ -44,7 +45,7 @@ func (w *webSocketAction) attemptBatch(batchNumber, attempt uint64, events []*ev
}

// Get a blocking channel to send and receive on our chosen namespace
sender, broadcaster, receiver, closing := w.es.wsChannels.GetChannels(topic)
sender, broadcaster, receiver := w.es.wsChannels.GetChannels(topic)

var channel chan<- interface{}
switch w.spec.DistributionMode {
Expand All @@ -54,28 +55,37 @@ func (w *webSocketAction) attemptBatch(batchNumber, attempt uint64, events []*ev
channel = sender
}

// Clear out any current ack/error
purging := true
for purging {
select {
case err1 := <-receiver:
log.Warnf("Cleared out suprious ack (could be from previous disonnect). err=%s", err1)
default:
purging = false
}
}

// Sent the batch of events
select {
case channel <- events:
break
case <-w.es.updateInterrupt:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedSend)
case <-closing:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedSend)
err = errors.Errorf(errors.EventStreamsWebSocketInterruptedSend)
}

// If we ever add more distribution modes, we may want to change this logic from a simple if statement
if w.spec.DistributionMode != DistributionModeBroadcast {
if err == nil && w.spec.DistributionMode != DistributionModeBroadcast {
// Wait for the next ack or exception
select {
case err = <-receiver:
break
case <-w.es.updateInterrupt:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedReceive)
case <-closing:
return errors.Errorf(errors.EventStreamsWebSocketInterruptedReceive)
err = errors.Errorf(errors.EventStreamsWebSocketInterruptedReceive)
}
// Pass back any exception from the client
}

// Pass back any exception from the client
log.Infof("Attempt batch %d complete. ok=%t", batchNumber, err == nil)
return err
}
5 changes: 3 additions & 2 deletions internal/ws/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (c *webSocketConnection) close() {
c.mux.Unlock()

for _, t := range c.topics {
c.server.cycleTopic(t)
c.server.cycleTopic(c.id, t)
log.Infof("WS/%s: Websocket closed while active on topic '%s'", c.id, t.topic)
}
c.server.connectionClosed(c)
log.Infof("WS/%s: Disconnected", c.id)
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *webSocketConnection) sender() {
cases = buildCases()
} else {
// Message from one of the existing topics
c.conn.WriteJSON(value.Interface())
_ = c.conn.WriteJSON(value.Interface())
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions internal/ws/wsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/hyperledger/firefly-ethconnect/internal/errors"
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
)

// WebSocketChannels is provided to allow us to do a blocking send to a namespace that will complete once a client connects on it
// We also provide a channel to listen on for closing of the connection, to allow a select to wake on a blocking send
type WebSocketChannels interface {
GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{})
GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error)
SendReply(message interface{})
}

Expand Down Expand Up @@ -56,7 +57,6 @@ type webSocketTopic struct {
senderChannel chan interface{}
broadcastChannel chan interface{}
receiverChannel chan error
closingChannel chan struct{}
}

// NewWebSocketServer create a new server with a simplified interface
Expand Down Expand Up @@ -91,14 +91,16 @@ func (s *webSocketServer) handler(w http.ResponseWriter, r *http.Request, p http
s.connections[c.id] = c
}

func (s *webSocketServer) cycleTopic(t *webSocketTopic) {
func (s *webSocketServer) cycleTopic(connInfo string, t *webSocketTopic) {
s.mux.Lock()
defer s.mux.Unlock()

// When a connection that was listening on a topic closes, we need to wake anyone
// that was listening for a response
close(t.closingChannel)
t.closingChannel = make(chan struct{})
select {
case t.receiverChannel <- errors.Errorf(errors.WebSocketClosed, connInfo):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the meat of the change.

default:
}
}

func (s *webSocketServer) connectionClosed(c *webSocketConnection) {
Expand Down Expand Up @@ -130,7 +132,6 @@ func (s *webSocketServer) getTopic(topic string) *webSocketTopic {
senderChannel: make(chan interface{}),
broadcastChannel: make(chan interface{}),
receiverChannel: make(chan error, 1),
closingChannel: make(chan struct{}),
}
s.topics[topic] = t
s.topicMap[topic] = make(map[string]*webSocketConnection)
Expand All @@ -143,9 +144,9 @@ func (s *webSocketServer) getTopic(topic string) *webSocketTopic {
return t
}

func (s *webSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) {
func (s *webSocketServer) GetChannels(topic string) (chan<- interface{}, chan<- interface{}, <-chan error) {
t := s.getTopic(topic)
return t.senderChannel, t.broadcastChannel, t.receiverChannel, t.closingChannel
return t.senderChannel, t.broadcastChannel, t.receiverChannel
}

func (s *webSocketServer) ListenOnTopic(c *webSocketConnection, topic string) {
Expand Down
16 changes: 7 additions & 9 deletions internal/ws/wsserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestConnectSendReceiveCycle(t *testing.T) {
Type: "listen",
})

s, _, r, _ := w.GetChannels("")
s, _, r := w.GetChannels("")

s <- "Hello World"

Expand Down Expand Up @@ -111,8 +111,8 @@ func TestConnectTopicIsolation(t *testing.T) {
Topic: "topic2",
})

s1, _, r1, _ := w.GetChannels("topic1")
s2, _, r2, _ := w.GetChannels("topic2")
s1, _, r1 := w.GetChannels("topic1")
s2, _, r2 := w.GetChannels("topic2")

s1 <- "Hello Number 1"
s2 <- "Hello Number 2"
Expand Down Expand Up @@ -155,16 +155,14 @@ func TestConnectAbandonRequest(t *testing.T) {
c.WriteJSON(&webSocketCommandMessage{
Type: "listen",
})
_, _, r, closing := w.GetChannels("")
_, _, r := w.GetChannels("")

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
select {
case <-r:
break
case <-closing:
break
}
wg.Done()
}()
Expand Down Expand Up @@ -247,7 +245,7 @@ func TestBroadcast(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

_, b, _, _ := w.GetChannels(topic)
_, b, _ := w.GetChannels(topic)
b <- "Hello World"

var val string
Expand Down Expand Up @@ -284,7 +282,7 @@ func TestBroadcastDefaultTopic(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

_, b, _, _ := w.GetChannels(topic)
_, b, _ := w.GetChannels(topic)
b <- "Hello World"

var val string
Expand Down Expand Up @@ -321,7 +319,7 @@ func TestRecvNotOk(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

_, b, _, _ := w.GetChannels(topic)
_, b, _ := w.GetChannels(topic)
close(b)
w.Close()
}
Expand Down