Skip to content

Commit

Permalink
fix: close subs on identifying network change
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Dec 24, 2024
1 parent 5893927 commit c976ba0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
63 changes: 43 additions & 20 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// filterSubscriptions is the map of filter subscription IDs to subscriptions

const filterSubBatchSize = 90
const initNetworkConnType = 255

type appFilterMap map[string]filterConfig

Expand All @@ -43,6 +44,7 @@ type FilterManager struct {
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}

type SubDetails struct {
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType

//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
Expand Down Expand Up @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() {
}
}

// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batch-limit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created

func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
Expand Down Expand Up @@ -182,37 +185,57 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}

func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
}

func (mgr *FilterManager) closeAllSubscriptions() {
mgr.Lock()
mgr.logger.Debug("closing all filter subscriptions", zap.Int("subs-count", len(mgr.filterSubscriptions)))
for _, asub := range mgr.filterSubscriptions {
for _, sub := range asub.sub.subs {
sub.SetClosing()
}
}
mgr.Unlock()
}

// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) {
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if mgr.networkConnType != initNetworkConnType && //checking for initialization condition
mgr.networkConnType != connType { // this means ip address of the node has changed which can cause issues in filter-push and hence resubscribing all filters
// resubscribe all existing filters
mgr.closeAllSubscriptions()
}
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
mgr.checkAndProcessQueue(pubsubTopic)
mgr.Unlock()
}

mgr.networkConnType = connType
mgr.onlineChecker.SetOnline(newStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() {
// Mock peers going down
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())

fm.OnConnectionStatusChange("", false)
fm.OnConnectionStatusChange("", false, 0)
time.Sleep(2 * time.Second)
fm.OnConnectionStatusChange("", true)
fm.OnConnectionStatusChange("", true, 0)
s.ConnectToFullNode(s.LightNode, s.FullNode)
time.Sleep(3 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
logger.Warn("received message push from unknown peer")
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
if err := stream.Reset(); err != nil {
Expand Down

0 comments on commit c976ba0

Please sign in to comment.