Skip to content

Commit

Permalink
Add mutex to secure access to senders and subsribers maps
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Dec 18, 2024
1 parent 2d4aeec commit c71a2e4
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 8 deletions.
46 changes: 42 additions & 4 deletions pkg/queue/rabbitmq/internal/event/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/th2-net/th2-common-go/pkg/queue/event"
"github.com/th2-net/th2-common-go/pkg/queue/rabbitmq/internal"
"github.com/th2-net/th2-common-go/pkg/queue/rabbitmq/internal/connection"
"sync"
)

type CommonEventRouter struct {
Expand All @@ -34,6 +35,7 @@ type CommonEventRouter struct {
senders map[string]*CommonEventSender
config *queue.RouterConfig
Logger zerolog.Logger
mutex *sync.RWMutex
}

func NewRouter(
Expand All @@ -47,6 +49,7 @@ func NewRouter(
senders: make(map[string]*CommonEventSender),
config: config,
Logger: logger,
mutex: &sync.RWMutex{},
}
}

Expand Down Expand Up @@ -158,10 +161,17 @@ func (cer *CommonEventRouter) subByPinWithAck(listener event.ConformationListene
func (cer *CommonEventRouter) getSubscriber(pin string, subscriberType internal.SubscriberType) (internal.Subscriber, error) {
queueConfig := cer.config.Queues[pin] // get queue by pin
var result internal.Subscriber
if _, ok := cer.subscribers[pin]; ok {
result = cer.subscribers[pin]
result = cer.findSubscriber(pin)
if result != nil {
return result, nil
}
cer.mutex.Lock()
defer cer.mutex.Unlock()

// check if someone already created the subscriber in a different goroutine
if existing, ok := cer.subscribers[pin]; ok {
return existing, nil
}
result, err := newSubscriber(cer.connManager, &queueConfig, pin, subscriberType)
if err != nil {
return nil, err
Expand All @@ -171,16 +181,44 @@ func (cer *CommonEventRouter) getSubscriber(pin string, subscriberType internal.
return result, nil
}

func (cer *CommonEventRouter) findSubscriber(pin string) internal.Subscriber {
cer.mutex.RLock()
defer cer.mutex.RUnlock()

if existing, ok := cer.subscribers[pin]; ok {
return existing
}
return nil
}

func (cer *CommonEventRouter) getSender(pin string) *CommonEventSender {
queueConfig := cer.config.Queues[pin] // get queue by pin
var result *CommonEventSender
if _, ok := cer.senders[pin]; ok {
result = cer.senders[pin]
result = cer.findSender(pin)
if result != nil {
return result
}

cer.mutex.Lock()
defer cer.mutex.Unlock()

// check if someone already created the sender in a different goroutine
if existing, ok := cer.senders[pin]; ok {
return existing
}
result = &CommonEventSender{ConnManager: cer.connManager, exchangeName: queueConfig.Exchange,
sendQueue: queueConfig.RoutingKey, th2Pin: pin, Logger: log.ForComponent("event_sender")}
cer.senders[pin] = result
cer.Logger.Trace().Str("Pin", pin).Msg("Created sender")
return result
}

func (cer *CommonEventRouter) findSender(pin string) *CommonEventSender {
cer.mutex.RLock()
defer cer.mutex.RUnlock()

if existing, ok := cer.senders[pin]; ok {
return existing
}
return nil
}
49 changes: 45 additions & 4 deletions pkg/queue/rabbitmq/internal/message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/th2-net/th2-common-go/pkg/queue/message"
"github.com/th2-net/th2-common-go/pkg/queue/rabbitmq/internal"
"github.com/th2-net/th2-common-go/pkg/queue/rabbitmq/internal/connection"
"sync"
)

type CommonMessageRouter struct {
Expand All @@ -36,6 +37,7 @@ type CommonMessageRouter struct {
filterStrategy filter.Strategy
config *queue.RouterConfig
Logger zerolog.Logger
mutex *sync.RWMutex
}

func NewRouter(
Expand All @@ -50,6 +52,7 @@ func NewRouter(
filterStrategy: filter.Default,
Logger: logger,
config: config,
mutex: &sync.RWMutex{},
}
}

Expand Down Expand Up @@ -248,10 +251,18 @@ func (cmr *CommonMessageRouter) getSubscriber(pin string, subscriberType interna
// TODO: probably, we should use lock here to make subscriber creation atomic
queueConfig := cmr.config.Queues[pin] // get queue by pin
var result internal.Subscriber
if _, ok := cmr.subscribers[pin]; ok {
result = cmr.subscribers[pin]
result = cmr.findSubscriber(pin)
if result != nil {
return result, nil
}
cmr.mutex.Lock()
defer cmr.mutex.Unlock()

// check if someone already created the subscriber in a different goroutine
if existing, ok := cmr.subscribers[pin]; ok {
return existing, nil
}

result, err := newSubscriber(cmr.connManager, &queueConfig, pin, subscriberType, contentType)
if err != nil {
return nil, err
Expand All @@ -262,16 +273,46 @@ func (cmr *CommonMessageRouter) getSubscriber(pin string, subscriberType interna
return result, nil
}

func (cmr *CommonMessageRouter) findSubscriber(pin string) internal.Subscriber {
cmr.mutex.RLock()
defer cmr.mutex.RUnlock()

if existing, ok := cmr.subscribers[pin]; ok {
return existing
}
return nil
}

func (cmr *CommonMessageRouter) getSender(pin string) *CommonMessageSender {
queueConfig := cmr.config.Queues[pin] // get queue by pin
var result *CommonMessageSender
if _, ok := cmr.senders[pin]; ok {
result = cmr.senders[pin]

result = cmr.findSender(pin)
if result != nil {
return result
}

cmr.mutex.Lock()
defer cmr.mutex.Unlock()

// check if someone already created the sender in a different goroutine
if existing, ok := cmr.senders[pin]; ok {
return existing
}

result = &CommonMessageSender{ConnManager: cmr.connManager, exchangeName: queueConfig.Exchange,
sendQueue: queueConfig.RoutingKey, th2Pin: pin, Logger: log.ForComponent("rabbitmq_message_sender")}
cmr.senders[pin] = result
cmr.Logger.Trace().Str("Pin", pin).Msg("Created sender")
return result
}

func (cmr *CommonMessageRouter) findSender(pin string) *CommonMessageSender {
cmr.mutex.RLock()
defer cmr.mutex.RUnlock()

if existing, ok := cmr.senders[pin]; ok {
return existing
}
return nil
}

0 comments on commit c71a2e4

Please sign in to comment.