Skip to content

Commit

Permalink
Better initialisation handling and on/off state
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-maurice committed May 18, 2024
1 parent c11d011 commit 318cf3d
Showing 1 changed file with 21 additions and 35 deletions.
56 changes: 21 additions & 35 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,40 +70,33 @@ type NatsPlugin struct {
config *NatsPluginConfig

natsClient *nats.Conn

enabled bool
}

func (p *NatsPlugin) Enable() error {
p.logger.Info("Initialising NATS plugin")

p.enabled = true

return p.RecycleNatsConnection()
}

func (p *NatsPlugin) Disable() error {
p.logger.Debug("Disabling plugin")
if p.natsClient != nil {
p.natsClient.Close()
}

conn, err := p.getNatsClient()
if err != nil {
p.logger.WithError(err).Error("Failed to connect to NATS")
return err
}

p.natsClient = conn

go func() {
p.logger.Debug("Started listening on NATS messages")
p.consumeMessages()
p.natsClient.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
p.logger.WithError(err).Error("NATS error occured")
})
p.natsClient.SetDisconnectHandler(func(c *nats.Conn) {
p.logger.Warn("NATS connection lost")
})
p.natsClient.SetReconnectHandler(func(c *nats.Conn) {
p.logger.Info("Connection to NATS server restored")
})
}()
p.enabled = false

return nil
}

func (p *NatsPlugin) SetMessageHandler(h plugin.MessageHandler) {
p.msgHandler = h
}

// RecycleNatsConnection replaces the currently used NATS connection.
// If none is establish it just creates a new one, otherwise it closes
// the existing one and creates a new one.
Expand Down Expand Up @@ -137,15 +130,6 @@ func (p *NatsPlugin) RecycleNatsConnection() error {
return nil
}

func (p *NatsPlugin) Disable() error {
p.logger.Debug("Disabling plugin")
if p.natsClient != nil {
p.natsClient.Close()
}

return nil
}

func (p *NatsPlugin) consumeMessages() {
p.logger.Debugf("Listening on subject: %s", p.config.Subject)
p.natsClient.Subscribe(p.config.Subject, func(m *nats.Msg) {
Expand Down Expand Up @@ -230,11 +214,13 @@ func (p *NatsPlugin) ValidateAndSetConfig(c interface{}) error {
}
p.logger.Info("Validated configuration")

p.logger.Debug("Replacing NATS connection with the new config")
err := p.RecycleNatsConnection()
if err != nil {
p.logger.WithError(err).Error("Failed to replace the old NATS connection with the new one")
return err
if p.enabled {
p.logger.Debug("Replacing NATS connection with the new config")
err := p.RecycleNatsConnection()
if err != nil {
p.logger.WithError(err).Error("Failed to replace the old NATS connection with the new one")
return err
}
}

return nil
Expand Down

0 comments on commit 318cf3d

Please sign in to comment.