Skip to content

Commit

Permalink
fix: update criteria context on missing msg verifier start
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Dec 24, 2024
1 parent 6dcf177 commit 684cda3
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type MissingMessageVerifier struct {
storenodeRequestor common.StorenodeRequestor
messageTracker MessageTracker

criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterest map[string]*criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.RWMutex

C chan *protocol.Envelope
Expand All @@ -66,7 +66,7 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
messageTracker: messageTracker,
logger: logger.Named("missing-msg-verifier"),
params: params,
criteriaInterest: make(map[string]criteriaInterest),
criteriaInterest: make(map[string]*criteriaInterest),
C: make(chan *protocol.Envelope, 1000),
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
currMessageVerificationRequest.cancel()
}

m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest
m.criteriaInterest[contentFilter.PubsubTopic] = &criteriaInterest
}

func (m *MissingMessageVerifier) setRunning(running bool) {
Expand All @@ -121,6 +121,15 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.ctx = ctx
m.cancel = cancelFunc

// updating context for existing criteria
m.criteriaInterestMu.Lock()
for _, value := range m.criteriaInterest {
ctx, cancel := context.WithCancel(m.ctx)
value.ctx = ctx
value.cancel = cancel
}
m.criteriaInterestMu.Unlock()

go func() {
defer utils.LogOnPanic()
t := time.NewTicker(m.params.interval)
Expand All @@ -134,7 +143,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.criteriaInterestMu.RLock()
critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest))
for _, value := range m.criteriaInterest {
critIntList = append(critIntList, value)
critIntList = append(critIntList, *value)
}
m.criteriaInterestMu.RUnlock()
for _, interest := range critIntList {
Expand Down

0 comments on commit 684cda3

Please sign in to comment.