From df0d89ffc430f69e4631a782ac725102ebaf0001 Mon Sep 17 00:00:00 2001 From: esuwu Date: Wed, 13 Nov 2024 02:25:29 -0600 Subject: [PATCH 1/4] Changed pubsub communication to nats --- cmd/bots/internal/common/environment.go | 40 +++++----- cmd/bots/internal/common/messaging/bots.go | 3 - .../common/messaging/pubsub_client.go | 69 +++++----------- cmd/nodemon/nodemon.go | 17 ++-- go.mod | 16 ++-- go.sum | 33 +++++--- pkg/messaging/pubsub/server.go | 80 ++++++++++++++----- 7 files changed, 143 insertions(+), 115 deletions(-) diff --git a/cmd/bots/internal/common/environment.go b/cmd/bots/internal/common/environment.go index e1ea1a76..de73a70c 100644 --- a/cmd/bots/internal/common/environment.go +++ b/cmd/bots/internal/common/environment.go @@ -243,10 +243,10 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) { } type TelegramBotEnvironment struct { - ChatID int64 - Bot *telebot.Bot - Mute bool // If it used elsewhere, should be protected by mutex - subSocket protocol.Socket + ChatID int64 + Bot *telebot.Bot + Mute bool // If it used elsewhere, should be protected by mutex + //subSocket protocol.Socket subscriptions subscriptions zap *zap.Logger requestType chan<- pair.Request @@ -290,10 +290,6 @@ func (tgEnv *TelegramBotEnvironment) Start(ctx context.Context) error { return nil } -func (tgEnv *TelegramBotEnvironment) SetSubSocket(subSocket protocol.Socket) { - tgEnv.subSocket = subSocket -} - func (tgEnv *TelegramBotEnvironment) SendAlertMessage(msg generalMessaging.AlertMessage) { if tgEnv.Mute { tgEnv.zap.Info("received an alert, but asleep now") @@ -426,10 +422,11 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error { if tgEnv.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) - if err != nil { - return err - } + // todo fix this. send (topic, handlerFunc) into this function + //err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) + //if err != nil { + // return err + //} tgEnv.subscriptions.Add(alertType, alertName) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) } @@ -447,10 +444,11 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) - if err != nil { - return errors.Wrap(err, "failed to subscribe to alert") - } + // todo fix this. send (topic, handlerFunc) into this function + //err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) + //if err != nil { + // return errors.Wrap(err, "failed to subscribe to alert") + //} tgEnv.subscriptions.Add(alertType, alertName) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) return nil @@ -465,11 +463,11 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale if !tgEnv.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName) } - - err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)}) - if err != nil { - return errors.Wrap(err, "failed to unsubscribe from alert") - } + // TODO fix this + //err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)}) + //if err != nil { + // return errors.Wrap(err, "failed to unsubscribe from alert") + //} ok = tgEnv.IsAlreadySubscribed(alertType) if !ok { return errors.New("failed to unsubscribe from alert: was not subscribed to it") diff --git a/cmd/bots/internal/common/messaging/bots.go b/cmd/bots/internal/common/messaging/bots.go index fe5aad83..45aed7be 100644 --- a/cmd/bots/internal/common/messaging/bots.go +++ b/cmd/bots/internal/common/messaging/bots.go @@ -2,14 +2,11 @@ package messaging import ( "nodemon/pkg/messaging" - - "go.nanomsg.org/mangos/v3/protocol" ) type Bot interface { SendAlertMessage(msg messaging.AlertMessage) SendMessage(msg string) SubscribeToAllAlerts() error - SetSubSocket(subSocket protocol.Socket) IsEligibleForAction(chatID string) bool } diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index 40af251b..719d431d 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -2,77 +2,48 @@ package messaging import ( "context" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - "go.nanomsg.org/mangos/v3/protocol/sub" _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports "go.uber.org/zap" "nodemon/pkg/messaging" ) -func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, logger *zap.Logger) error { - subSocket, sockErr := sub.NewSocket() - if sockErr != nil { - return sockErr - } - defer func() { - _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed - }() +const pubSubTopic = "pubsub" - bot.SetSubSocket(subSocket) +func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, logger *zap.Logger) error { - if dialErr := subSocket.Dial(nanomsgURL); dialErr != nil { - return dialErr + // Connect to a NATS server + nc, err := nats.Connect(natsServerURL) + if err != nil { + zap.S().Fatalf("Failed to connect to nats server: %v", err) + return err } + defer nc.Close() + _, err = nc.Subscribe(pubSubTopic, func(msg *nats.Msg) { + err := handleReceivedMessage(msg.Data, bot) + if err != nil { + zap.S().Errorf("failed to handle received message from pubsub server %v", err) + } + }) + if err != nil { + zap.S().Fatalf("Failed to subscribe to block updates: %v", err) + return err + } if err := bot.SubscribeToAllAlerts(); err != nil { return err } - done := runSubLoop(ctx, subSocket, logger, bot) - <-ctx.Done() logger.Info("stopping sub messaging service...") - <-done logger.Info("sub messaging service finished") return nil } -func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logger, bot Bot) <-chan struct{} { - sockCh := make(chan struct{}) - go func() { // run socket closer goroutine - defer close(sockCh) - <-ctx.Done() - _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed - }() - ch := make(chan struct{}) - go func(done chan<- struct{}, closedSock <-chan struct{}) { - defer func() { - <-closedSock - close(done) - }() - for { - if ctx.Err() != nil { - return - } - if err := recvMessage(subSocket, bot); err != nil { - if errors.Is(err, protocol.ErrClosed) { // socket is closed, this means that context is canceled - return - } - logger.Error("failed to receive message", zap.Error(err)) - } - } - }(ch, sockCh) - return ch -} - -func recvMessage(subSocket protocol.Socket, bot Bot) error { - msg, err := subSocket.Recv() // this operation is blocking, we have to close the socket to interrupt this block - if err != nil { - return errors.Wrap(err, "failed to receive message from sub socket") - } +func handleReceivedMessage(msg []byte, bot Bot) error { alertMsg, err := messaging.NewAlertMessageFromBytes(msg) if err != nil { return errors.Wrap(err, "failed to parse alert message from bytes") diff --git a/cmd/nodemon/nodemon.go b/cmd/nodemon/nodemon.go index 9b7bd147..8c2a8827 100644 --- a/cmd/nodemon/nodemon.go +++ b/cmd/nodemon/nodemon.go @@ -4,6 +4,7 @@ import ( "context" stderrs "errors" "flag" + "github.com/nats-io/nats-server/v2/server" "log" "net/url" "os" @@ -189,9 +190,11 @@ type nodemonConfig struct { bindAddress string interval time.Duration timeout time.Duration - nanomsgPubSubURL string + natsPubSubURL string nanomsgPairTelegramURL string nanomsgPairDiscordURL string + natsMaxPayloadSize int + natsTimeout time.Duration retention time.Duration apiReadTimeout time.Duration baseTargetThreshold uint64 @@ -215,12 +218,16 @@ func newNodemonConfig() *nodemonConfig { defaultNetworkTimeout, "Network timeout, seconds. Default value is 15") tools.Uint64VarFlagWithEnv(&c.baseTargetThreshold, "base-target-threshold", 0, "Base target threshold. Must be specified") - tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url", - "ipc:///tmp/nano-msg-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairTelegramURL, "nano-msg-pair-telegram-url", + tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nats-msg-pubsub-url", + "nats://127.0.0.1:4222", "Nats URL for pubsub socket") + tools.StringVarFlagWithEnv(&c.nanomsgPairTelegramURL, "nats-msg-pair-telegram-url", "", "Nanomsg IPC URL for pair socket") tools.StringVarFlagWithEnv(&c.nanomsgPairDiscordURL, "nano-msg-pair-discord-url", "", "Nanomsg IPC URL for pair socket") + tools.DurationVarFlagWithEnv(&c.natsTimeout, "nats-server-timeout", + server.AUTH_TIMEOUT, "Nanomsg IPC URL for pair socket") + tools.IntVarFlagWithEnv(&c.natsMaxPayloadSize, "nats-msg-payload-size", 1024*1024, + "The size of the payload passed between nats server and clients.Default 1MB") tools.DurationVarFlagWithEnv(&c.retention, "retention", defaultRetentionDuration, "Events retention duration. Default value is 12h") tools.DurationVarFlagWithEnv(&c.apiReadTimeout, "api-read-timeout", defaultAPIReadTimeout, @@ -415,7 +422,7 @@ func runMessagingServices( pew specific.PrivateNodesEventsWriter, ) { go func() { - pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.nanomsgPubSubURL, alerts, logger) + pubSubErr := pubsub.StartPubMessagingServer(ctx, int32(cfg.natsMaxPayloadSize), cfg.natsPubSubURL, alerts, logger) if pubSubErr != nil { logger.Fatal("failed to start pub messaging server", zap.Error(pubSubErr)) } diff --git a/go.mod b/go.mod index 4fb5828f..b9e7621f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/go-chi/chi v4.1.2+incompatible github.com/hashicorp/vault/api v1.15.0 github.com/hashicorp/vault/api/auth/userpass v0.8.0 + github.com/nats-io/nats-server/v2 v2.10.22 + github.com/nats-io/nats.go v1.37.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 github.com/stoewer/go-strcase v1.3.0 @@ -50,14 +52,18 @@ require ( github.com/ingonyama-zk/icicle v1.1.0 // indirect github.com/ingonyama-zk/iciclegnark v0.1.0 // indirect github.com/jinzhu/copier v0.4.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/minio/highwayhash v1.0.3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.5.8 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect @@ -78,13 +84,13 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.27.0 // indirect + golang.org/x/crypto v0.28.0 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect - golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + golang.org/x/time v0.7.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/grpc v1.67.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/go.sum b/go.sum index 582377c7..32ec5bb1 100644 --- a/go.sum +++ b/go.sum @@ -337,8 +337,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -379,6 +379,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -405,6 +407,16 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA= +github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -564,8 +576,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -767,8 +779,9 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -780,13 +793,13 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= +golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/messaging/pubsub/server.go b/pkg/messaging/pubsub/server.go index 9179ed89..158d5734 100644 --- a/pkg/messaging/pubsub/server.go +++ b/pkg/messaging/pubsub/server.go @@ -2,49 +2,85 @@ package pubsub import ( "context" - "strings" - + "fmt" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports "nodemon/pkg/entities" "nodemon/pkg/messaging" + "strconv" + "strings" - "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - "go.nanomsg.org/mangos/v3/protocol/pub" - _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports + "github.com/nats-io/nats-server/v2/server" "go.uber.org/zap" ) +const ConnectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT +const pubsubTopic = "pubsub" + +func parseHostAndPortFromUrl(natsPubSubUrl string) (string, int, error) { + // Find the position of "://" and trim everything before it + withoutProtocol := strings.SplitN(natsPubSubUrl, "://", 2) + if len(withoutProtocol) != 2 { + return "", 0, errors.New(fmt.Sprintf("failed to split the URL into pieces, URL: %s", natsPubSubUrl)) + } + + // Split the remaining part into host and port + hostPort := strings.Split(withoutProtocol[1], ":") + if len(hostPort) != 2 { + return "", 0, errors.New(fmt.Sprintf("failed to split host port string into host and port, %s", hostPort)) + } + + host := hostPort[0] + // Convert the port to an integer + port, err := strconv.Atoi(hostPort[1]) + if err != nil { + return "", 0, errors.Errorf("failed to parse port, %v", err) + } + return host, port, nil +} + func StartPubMessagingServer( ctx context.Context, - nanomsgURL string, + maxPayloadSize int32, + natsPubSubUrl string, // expected nats://host:port alerts <-chan entities.Alert, logger *zap.Logger, ) error { - if len(nanomsgURL) == 0 || len(strings.Fields(nanomsgURL)) > 1 { - return errors.New("invalid nanomsg IPC URL for pub sub socket") + if len(natsPubSubUrl) == 0 { + return errors.New("invalid nats pubsub URL") + } + host, port, err := parseHostAndPortFromUrl(natsPubSubUrl) + if err != nil { + return err } - socketPub, sockErr := pub.NewSocket() - if sockErr != nil { - return sockErr + opts := &server.Options{ + MaxPayload: maxPayloadSize, + Host: host, + Port: port, } - defer func(socketPub protocol.Socket) { - if err := socketPub.Close(); err != nil { - logger.Error("Failed to close pub socket", zap.Error(err)) - } - }(socketPub) + s, err := server.NewServer(opts) + if err != nil { + return errors.Errorf("failed to create NATS server: %v", err) + } + go s.Start() - if err := socketPub.Listen(nanomsgURL); err != nil { - return err + if !s.ReadyForConnections(ConnectionsTimeoutDefault) { + return errors.Errorf("NATS Server not ready for connections") } - loopErr := enterLoop(ctx, alerts, logger, socketPub) + logger.Info("NATS PubSub Server is running...") + + socket, err := nats.Connect(fmt.Sprintf("nats://%s:%d", host, port)) + + loopErr := enterLoop(ctx, alerts, logger, socket) if loopErr != nil && !errors.Is(loopErr, context.Canceled) { return loopErr } return nil } -func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, socketPub protocol.Socket) error { +func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, socket *nats.Conn) error { for { select { case <-ctx.Done(): @@ -62,7 +98,7 @@ func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Lo logger.Error("Failed to marshal binary alert message", zap.Error(err)) continue } - err = socketPub.Send(data) + err = socket.Publish(pubsubTopic, data) if err != nil { logger.Error("Failed to send alert to socket", zap.Error(err)) } From d1cbca662cb7b7e2ff9c9a0af88187c02c7141c7 Mon Sep 17 00:00:00 2001 From: esuwu Date: Fri, 22 Nov 2024 03:46:12 -0600 Subject: [PATCH 2/4] Moved the nats server to cmd --- cmd/bots/discord/discord.go | 22 +-- cmd/bots/internal/common/environment.go | 29 ++-- .../internal/common/messaging/pair_client.go | 148 ++++++------------ .../common/messaging/pubsub_client.go | 17 +- cmd/bots/telegram/telegram.go | 13 +- cmd/messaging/nats_server.go | 96 ++++++++++++ cmd/nodemon/nodemon.go | 68 ++++---- go.mod | 8 +- go.sum | 20 +-- pkg/analysis/analyzer_test.go | 4 +- pkg/analysis/criteria/base_target_test.go | 8 +- .../criteria/challenged_block_test.go | 4 +- pkg/analysis/criteria/height_test.go | 4 +- pkg/analysis/criteria/incomplete_test.go | 4 +- pkg/analysis/criteria/state_hash_test.go | 4 +- pkg/analysis/criteria/unreachable_test.go | 4 +- pkg/messaging/helper.go | 38 +++++ pkg/messaging/pair/server.go | 111 ++++++------- pkg/messaging/pubsub/server.go | 67 +------- 19 files changed, 340 insertions(+), 329 deletions(-) create mode 100644 cmd/messaging/nats_server.go create mode 100644 pkg/messaging/helper.go diff --git a/cmd/bots/discord/discord.go b/cmd/bots/discord/discord.go index 8e375e30..96afafc9 100644 --- a/cmd/bots/discord/discord.go +++ b/cmd/bots/discord/discord.go @@ -38,20 +38,20 @@ func main() { } type discordBotConfig struct { - nanomsgPubSubURL string - nanomsgPairURL string - discordBotToken string - discordChatID string - logLevel string - development bool - bindAddress string + natsPubSubURL string + natsPairURL string + discordBotToken string + discordChatID string + logLevel string + development bool + bindAddress string } func newDiscordBotConfigConfig() *discordBotConfig { c := new(discordBotConfig) - tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url", + tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nano-msg-pubsub-url", "ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-discord-url", + tools.StringVarFlagWithEnv(&c.natsPairURL, "nano-msg-pair-discord-url", "ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket") tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token", "", "The secret token used to authenticate the bot") @@ -178,7 +178,7 @@ func runMessagingClients( responseChan chan pair.Response, ) { go func() { - clientErr := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, discordBotEnv, logger) + clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsPubSubURL, discordBotEnv, logger) if clientErr != nil { logger.Fatal("failed to start sub messaging client", zap.Error(clientErr)) return @@ -186,7 +186,7 @@ func runMessagingClients( }() go func() { - err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, requestChan, responseChan, logger) + err := messaging.StartPairMessagingClient(ctx, cfg.natsPairURL, requestChan, responseChan, logger) if err != nil { logger.Fatal("failed to start pair messaging client", zap.Error(err)) return diff --git a/cmd/bots/internal/common/environment.go b/cmd/bots/internal/common/environment.go index de73a70c..b3ca9896 100644 --- a/cmd/bots/internal/common/environment.go +++ b/cmd/bots/internal/common/environment.go @@ -243,10 +243,9 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) { } type TelegramBotEnvironment struct { - ChatID int64 - Bot *telebot.Bot - Mute bool // If it used elsewhere, should be protected by mutex - //subSocket protocol.Socket + ChatID int64 + Bot *telebot.Bot + Mute bool // If it used elsewhere, should be protected by mutex. subscriptions subscriptions zap *zap.Logger requestType chan<- pair.Request @@ -423,10 +422,10 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } // todo fix this. send (topic, handlerFunc) into this function - //err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) - //if err != nil { - // return err - //} + // err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}). + // if err != nil {. + // return err. + // }. tgEnv.subscriptions.Add(alertType, alertName) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) } @@ -445,10 +444,10 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy } // todo fix this. send (topic, handlerFunc) into this function - //err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) - //if err != nil { - // return errors.Wrap(err, "failed to subscribe to alert") - //} + // err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}). + // if err != nil {. + // return errors.Wrap(err, "failed to subscribe to alert"). + // }. tgEnv.subscriptions.Add(alertType, alertName) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) return nil @@ -464,10 +463,10 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName) } // TODO fix this - //err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)}) - //if err != nil { + // err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)}) + // if err != nil { // return errors.Wrap(err, "failed to unsubscribe from alert") - //} + // } ok = tgEnv.IsAlreadySubscribed(alertType) if !ok { return errors.New("failed to unsubscribe from alert: was not subscribed to it") diff --git a/cmd/bots/internal/common/messaging/pair_client.go b/cmd/bots/internal/common/messaging/pair_client.go index 61d69585..4777e863 100644 --- a/cmd/bots/internal/common/messaging/pair_client.go +++ b/cmd/bots/internal/common/messaging/pair_client.go @@ -4,87 +4,37 @@ import ( "bytes" "context" "encoding/json" - stderr "errors" "fmt" "strings" "time" - "nodemon/pkg/entities" - "nodemon/pkg/messaging/pair" + "go.uber.org/zap" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - pairProtocol "go.nanomsg.org/mangos/v3/protocol/pair" - "go.uber.org/zap" -) -const ( - defaultSendTimeout = 5 * time.Second - defaultRecvTimeout = 5 * time.Second + "nodemon/pkg/entities" + "nodemon/pkg/messaging" + "nodemon/pkg/messaging/pair" ) const defaultResponseTimeout = 5 * time.Second -type sendRecvDeadlineSocketWrapper struct { - protocol.Socket -} - -func (w sendRecvDeadlineSocketWrapper) Send(data []byte) (err error) { - defer func() { // reset deadline in any case - setOptErr := w.Socket.SetOption(protocol.OptionSendDeadline, time.Duration(0)) - if setOptErr != nil { - if err != nil { - err = stderr.Join(err, setOptErr) - } else { - err = setOptErr - } - } - }() - // set deadline for current send - if setOptErr := w.Socket.SetOption(protocol.OptionSendDeadline, defaultSendTimeout); setOptErr != nil { - return setOptErr - } - return w.Socket.Send(data) -} - -func (w sendRecvDeadlineSocketWrapper) Recv() (_ []byte, err error) { - defer func() { // reset deadline in any case - setOptErr := w.Socket.SetOption(protocol.OptionRecvDeadline, time.Duration(0)) - if setOptErr != nil { - if err != nil { - err = stderr.Join(err, setOptErr) - } else { - err = setOptErr - } - } - }() - // set deadline for current recv - if setOptErr := w.Socket.SetOption(protocol.OptionRecvDeadline, defaultRecvTimeout); setOptErr != nil { - return nil, setOptErr - } - return w.Socket.Recv() -} - func StartPairMessagingClient( ctx context.Context, - nanomsgURL string, + natsServerURL string, requestPair <-chan pair.Request, responsePair chan<- pair.Response, logger *zap.Logger, ) error { - pairSocket, sockErr := pairProtocol.NewSocket() - if sockErr != nil { - return errors.Wrap(sockErr, "failed to get new pair socket") - } - defer func() { - _ = pairSocket.Close() // can be ignored, only possible error is protocol.ErrClosed - }() - - if err := pairSocket.Dial(nanomsgURL); err != nil { - return errors.Wrap(err, "failed to dial on pair socket") + nc, err := nats.Connect(natsServerURL) + if err != nil { + zap.S().Fatalf("Failed to connect to nats server: %v", err) + return err } + defer nc.Close() - done := runPairLoop(ctx, requestPair, sendRecvDeadlineSocketWrapper{pairSocket}, logger, responsePair) + done := runPairLoop(ctx, requestPair, nc, logger, responsePair) <-ctx.Done() logger.Info("stopping pair messaging service...") @@ -96,7 +46,7 @@ func StartPairMessagingClient( func runPairLoop( ctx context.Context, requestPair <-chan pair.Request, - pairSocket protocol.Socket, + nc *nats.Conn, logger *zap.Logger, responsePair chan<- pair.Response, ) <-chan struct{} { @@ -114,7 +64,7 @@ func runPairLoop( message := &bytes.Buffer{} message.WriteByte(byte(request.RequestType())) - err := handlePairRequest(ctx, request, pairSocket, message, logger, responsePair) + err := handlePairRequest(ctx, request, nc, message, logger, responsePair) if err != nil { logger.Error("failed to handle pair request", zap.String("request-type", fmt.Sprintf("(%T)", request)), @@ -130,28 +80,30 @@ func runPairLoop( func handlePairRequest( ctx context.Context, request pair.Request, - pairSocket protocol.Socket, + nc *nats.Conn, message *bytes.Buffer, logger *zap.Logger, responsePair chan<- pair.Response, ) error { switch r := request.(type) { case *pair.NodesListRequest: - return handleNodesListRequest(ctx, pairSocket, message, logger, responsePair) + return handleNodesListRequest(ctx, nc, message, logger, responsePair) case *pair.InsertNewNodeRequest: - return handleInsertNewNodeRequest(r.URL, message, pairSocket) + return handleInsertNewNodeRequest(r.URL, message, nc) case *pair.UpdateNodeRequest: - return handleUpdateNodeRequest(r.URL, r.Alias, message, pairSocket) + return handleUpdateNodeRequest(r.URL, r.Alias, message, nc) case *pair.DeleteNodeRequest: message.WriteString(r.URL) - if sendErr := pairSocket.Send(message.Bytes()); sendErr != nil { - return errors.Wrap(sendErr, "failed handle delete node request and send data to a pair socket") + // ignore response + _, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) + if err != nil { + return errors.Wrap(err, "failed to receive message from nodemon") } return nil case *pair.NodesStatusRequest: - return handleNodesStatementsRequest(ctx, r.URLs, message, pairSocket, logger, responsePair) + return handleNodesStatementsRequest(ctx, r.URLs, message, nc, logger, responsePair) case *pair.NodeStatementRequest: - return handleNodesStatementRequest(ctx, r.URL, r.Height, logger, message, pairSocket, responsePair) + return handleNodesStatementRequest(ctx, r.URL, r.Height, logger, message, nc, responsePair) default: return errors.New("unknown request type to pair socket") } @@ -163,7 +115,7 @@ func handleNodesStatementRequest( height int, logger *zap.Logger, message *bytes.Buffer, - pairSocket protocol.Socket, + nc *nats.Conn, responsePair chan<- pair.Response, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) @@ -175,17 +127,14 @@ func handleNodesStatementRequest( } message.Write(req) - err = pairSocket.Send(message.Bytes()) - if err != nil { - return errors.Wrap(err, "failed to send a request to pair socket") - } - response, err := pairSocket.Recv() + response, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { - return errors.Wrap(err, "failed to receive message from pair socket") + return errors.Wrap(err, "failed to receive message from nodemon") } + nodeStatementResp := pair.NodeStatementResponse{} - err = json.Unmarshal(response, &nodeStatementResp) + err = json.Unmarshal(response.Data, &nodeStatementResp) if err != nil { return errors.Wrap(err, "failed to unmarshal message from pair socket") } @@ -195,7 +144,7 @@ func handleNodesStatementRequest( case <-ctx.Done(): logger.Error("failed to send node statement response, timeout exceeded", zap.Duration("timeout", defaultResponseTimeout), - zap.ByteString("node-statement-response", response), + zap.ByteString("node-statement-response", response.Data), zap.Error(ctx.Err()), ) return ctx.Err() @@ -206,7 +155,7 @@ func handleNodesStatementsRequest( ctx context.Context, urls []string, message *bytes.Buffer, - pairSocket protocol.Socket, + nc *nats.Conn, logger *zap.Logger, responsePair chan<- pair.Response, ) error { @@ -214,17 +163,13 @@ func handleNodesStatementsRequest( defer cancel() message.WriteString(strings.Join(urls, ",")) - err := pairSocket.Send(message.Bytes()) - if err != nil { - return errors.Wrap(err, "failed to send a request to pair socket") - } - response, err := pairSocket.Recv() + response, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { - return errors.Wrap(err, "failed to receive message from pair socket") + return errors.Wrap(err, "failed to receive message from nodemon") } nodesStatusResp := pair.NodesStatementsResponse{} - err = json.Unmarshal(response, &nodesStatusResp) + err = json.Unmarshal(response.Data, &nodesStatusResp) if err != nil { return errors.Wrap(err, "failed to unmarshal message from pair socket") } @@ -234,30 +179,32 @@ func handleNodesStatementsRequest( case <-ctx.Done(): logger.Error("failed to send nodes status response, timeout exceeded", zap.Duration("timeout", defaultResponseTimeout), - zap.ByteString("nodes-status-response", response), + zap.ByteString("nodes-status-response", response.Data), zap.Error(ctx.Err()), ) return ctx.Err() } } -func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, pairSocket protocol.Socket) error { +func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, nc *nats.Conn) error { node := entities.Node{URL: url, Enabled: true, Alias: alias} nodeInfo, err := json.Marshal(node) if err != nil { return errors.Wrap(err, "failed to marshal node's info") } message.Write(nodeInfo) - err = pairSocket.Send(message.Bytes()) + // ignore a response + _, err = nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to send message") } return nil } -func handleInsertNewNodeRequest(url string, message *bytes.Buffer, pairSocket protocol.Socket) error { +func handleInsertNewNodeRequest(url string, message *bytes.Buffer, nc *nats.Conn) error { message.WriteString(url) - err := pairSocket.Send(message.Bytes()) + // ignore a response + _, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to send message") } @@ -266,7 +213,7 @@ func handleInsertNewNodeRequest(url string, message *bytes.Buffer, pairSocket pr func handleNodesListRequest( ctx context.Context, - pairSocket protocol.Socket, + nc *nats.Conn, message *bytes.Buffer, logger *zap.Logger, responsePair chan<- pair.Response, @@ -274,17 +221,12 @@ func handleNodesListRequest( ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() - err := pairSocket.Send(message.Bytes()) - if err != nil { - return errors.Wrap(err, "failed to send message") - } - - response, err := pairSocket.Recv() + response, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to receive message") } nodeList := pair.NodesListResponse{} - err = json.Unmarshal(response, &nodeList) + err = json.Unmarshal(response.Data, &nodeList) if err != nil { return errors.Wrap(err, "failed to unmarshal message") } @@ -294,7 +236,7 @@ func handleNodesListRequest( case <-ctx.Done(): logger.Error("failed to send nodes list response, timeout exceeded", zap.Duration("timeout", defaultResponseTimeout), - zap.ByteString("nodes-status-response", response), + zap.ByteString("nodes-status-response", response.Data), zap.Error(ctx.Err()), ) return ctx.Err() diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index 719d431d..04d0f2be 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -2,8 +2,8 @@ package messaging import ( "context" - "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go" "github.com/pkg/errors" _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports "go.uber.org/zap" @@ -11,10 +11,7 @@ import ( "nodemon/pkg/messaging" ) -const pubSubTopic = "pubsub" - func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, logger *zap.Logger) error { - // Connect to a NATS server nc, err := nats.Connect(natsServerURL) if err != nil { @@ -23,18 +20,18 @@ func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, } defer nc.Close() - _, err = nc.Subscribe(pubSubTopic, func(msg *nats.Msg) { - err := handleReceivedMessage(msg.Data, bot) - if err != nil { - zap.S().Errorf("failed to handle received message from pubsub server %v", err) + _, err = nc.Subscribe(messaging.PubSubTopic, func(msg *nats.Msg) { + hndlErr := handleReceivedMessage(msg.Data, bot) + if hndlErr != nil { + zap.S().Errorf("failed to handle received message from pubsub server %v", hndlErr) } }) if err != nil { zap.S().Fatalf("Failed to subscribe to block updates: %v", err) return err } - if err := bot.SubscribeToAllAlerts(); err != nil { - return err + if subscrErr := bot.SubscribeToAllAlerts(); subscrErr != nil { + return subscrErr } <-ctx.Done() diff --git a/cmd/bots/telegram/telegram.go b/cmd/bots/telegram/telegram.go index db4c06ed..dd61c03b 100644 --- a/cmd/bots/telegram/telegram.go +++ b/cmd/bots/telegram/telegram.go @@ -40,8 +40,7 @@ func main() { } type telegramBotConfig struct { - nanomsgPubSubURL string - nanomsgPairURL string + natsMessagingURL string behavior string webhookLocalAddress string // only for webhook method publicURL string // only for webhook method @@ -54,10 +53,8 @@ type telegramBotConfig struct { func newTelegramBotConfig() *telegramBotConfig { c := new(telegramBotConfig) - tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url", - "ipc:///tmp/telegram/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-telegram-url", - "ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket") + tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-pubsub-url", + "nats://127.0.0.1:4222", "Nats URL for pubsub socket") tools.StringVarFlagWithEnv(&c.behavior, "behavior", "webhook", "Behavior is either webhook or polling") tools.StringVarFlagWithEnv(&c.webhookLocalAddress, "webhook-local-address", @@ -178,14 +175,14 @@ func runMessagingClients( pairResponse chan<- pair.Response, ) { go func() { - err := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, tgBotEnv, logger) + err := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, tgBotEnv, logger) if err != nil { logger.Fatal("failed to start sub messaging service", zap.Error(err)) } }() go func() { - err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, pairRequest, pairResponse, logger) + err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, pairRequest, pairResponse, logger) if err != nil { logger.Fatal("failed to start pair messaging service", zap.Error(err)) } diff --git a/cmd/messaging/nats_server.go b/cmd/messaging/nats_server.go new file mode 100644 index 00000000..e123f6d1 --- /dev/null +++ b/cmd/messaging/nats_server.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math" + "os" + "os/signal" + "syscall" + "time" + + "go.uber.org/zap" + + "nodemon/pkg/messaging" + "nodemon/pkg/tools" + + "github.com/nats-io/nats-server/v2/server" +) + +const NatsMaxPayloadSize int32 = 1024 * 1024 // 1 MB +const ConnectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT + +type natsConfig struct { + serverURL string + maxPayload int64 + logLevel string + development bool + connectionTimeoutDefault time.Duration +} + +func parseNatsConfig() *natsConfig { + c := new(natsConfig) + tools.StringVarFlagWithEnv(&c.serverURL, "nats-url", + "nats://127.0.0.1:4222", "NATS server URL") + tools.Int64VarFlagWithEnv(&c.maxPayload, "max-payload", int64(NatsMaxPayloadSize), + "Max server payload size in bytes") + tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", + "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") + tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") + tools.DurationVarFlagWithEnv(&c.connectionTimeoutDefault, "connection-timeout", ConnectionsTimeoutDefault, + "HTTP API read timeout. Default value is 30s.") + return c +} + +func main() { + ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer done() + + cfg := parseNatsConfig() + flag.Parse() + + logger, _, err := tools.SetupZapLogger(cfg.logLevel, cfg.development) + if err != nil { + log.Printf("Failed to setup zap logger: %v", err) + return + } + defer func(zap *zap.Logger) { + if syncErr := zap.Sync(); syncErr != nil { + log.Println(syncErr) + } + }(logger) + + host, port, err := messaging.ParseHostAndPortFromURL(cfg.serverURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to parse host and port %v", err)) + } + + if cfg.maxPayload > math.MaxInt32 || cfg.maxPayload < math.MinInt32 { + logger.Fatal("max payload is too big or too small, must be in range of int32") + } + + opts := &server.Options{ + MaxPayload: int32(cfg.maxPayload), + Host: host, + Port: port, + NoSigs: true, + } + s, err := server.NewServer(opts) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to create NATS server, %v", err)) + } + go s.Start() + defer func() { + s.Shutdown() + s.WaitForShutdown() + }() + if !s.ReadyForConnections(cfg.connectionTimeoutDefault) { + logger.Fatal("NATS server is not ready for connections") + } + logger.Info(fmt.Sprintf("NATS Server is running on host %v, port %d", host, port)) + <-ctx.Done() + + logger.Info("NATS Server finished") +} diff --git a/cmd/nodemon/nodemon.go b/cmd/nodemon/nodemon.go index 8c2a8827..f883092f 100644 --- a/cmd/nodemon/nodemon.go +++ b/cmd/nodemon/nodemon.go @@ -4,7 +4,6 @@ import ( "context" stderrs "errors" "flag" - "github.com/nats-io/nats-server/v2/server" "log" "net/url" "os" @@ -13,15 +12,12 @@ import ( "syscall" "time" - "github.com/pkg/errors" - - "nodemon/internal" - "nodemon/pkg/analysis/l2" - "go.uber.org/zap" + "nodemon/internal" "nodemon/pkg/analysis" "nodemon/pkg/analysis/criteria" + "nodemon/pkg/analysis/l2" "nodemon/pkg/api" "nodemon/pkg/clients" "nodemon/pkg/entities" @@ -32,6 +28,9 @@ import ( "nodemon/pkg/storing/nodes" "nodemon/pkg/storing/specific" "nodemon/pkg/tools" + + "github.com/nats-io/nats-server/v2/server" + "github.com/pkg/errors" ) const ( @@ -183,25 +182,24 @@ func (n *nodemonVaultConfig) validate(logger *zap.Logger) error { } type nodemonConfig struct { - storage string - nodes string - L2nodeName string - L2nodeURL string - bindAddress string - interval time.Duration - timeout time.Duration - natsPubSubURL string - nanomsgPairTelegramURL string - nanomsgPairDiscordURL string - natsMaxPayloadSize int - natsTimeout time.Duration - retention time.Duration - apiReadTimeout time.Duration - baseTargetThreshold uint64 - logLevel string - development bool - vault *nodemonVaultConfig - l2 *nodemonL2Config + storage string + nodes string + L2nodeName string + L2nodeURL string + bindAddress string + interval time.Duration + timeout time.Duration + natsMessagingURL string + natsPairTelegram bool + natsPairDiscord bool + natsTimeout time.Duration + retention time.Duration + apiReadTimeout time.Duration + baseTargetThreshold uint64 + logLevel string + development bool + vault *nodemonVaultConfig + l2 *nodemonL2Config } func newNodemonConfig() *nodemonConfig { @@ -218,21 +216,17 @@ func newNodemonConfig() *nodemonConfig { defaultNetworkTimeout, "Network timeout, seconds. Default value is 15") tools.Uint64VarFlagWithEnv(&c.baseTargetThreshold, "base-target-threshold", 0, "Base target threshold. Must be specified") - tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nats-msg-pubsub-url", + tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-pubsub-url", "nats://127.0.0.1:4222", "Nats URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairTelegramURL, "nats-msg-pair-telegram-url", - "", "Nanomsg IPC URL for pair socket") - tools.StringVarFlagWithEnv(&c.nanomsgPairDiscordURL, "nano-msg-pair-discord-url", - "", "Nanomsg IPC URL for pair socket") tools.DurationVarFlagWithEnv(&c.natsTimeout, "nats-server-timeout", server.AUTH_TIMEOUT, "Nanomsg IPC URL for pair socket") - tools.IntVarFlagWithEnv(&c.natsMaxPayloadSize, "nats-msg-payload-size", 1024*1024, - "The size of the payload passed between nats server and clients.Default 1MB") tools.DurationVarFlagWithEnv(&c.retention, "retention", defaultRetentionDuration, "Events retention duration. Default value is 12h") tools.DurationVarFlagWithEnv(&c.apiReadTimeout, "api-read-timeout", defaultAPIReadTimeout, "HTTP API read timeout. Default value is 30s.") tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") + tools.BoolVarFlagWithEnv(&c.natsPairDiscord, "bot-requests-discord", false, "Should let discord bot send commands?") + tools.BoolVarFlagWithEnv(&c.natsPairTelegram, "bot-requests-telegram", true, "Should let telegram bot send commands?") tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") c.vault = newNodemonVaultConfig() @@ -266,9 +260,9 @@ func (c *nodemonConfig) validate(logger *zap.Logger) error { return stderrs.Join(c.vault.validate(logger), c.l2.validate(logger)) } -func (c *nodemonConfig) runDiscordPairServer() bool { return c.nanomsgPairDiscordURL != "" } +func (c *nodemonConfig) runDiscordPairServer() bool { return c.natsPairDiscord } -func (c *nodemonConfig) runTelegramPairServer() bool { return c.nanomsgPairTelegramURL != "" } +func (c *nodemonConfig) runTelegramPairServer() bool { return c.natsPairTelegram } func (c *nodemonConfig) runAnalyzers( ctx context.Context, @@ -422,7 +416,7 @@ func runMessagingServices( pew specific.PrivateNodesEventsWriter, ) { go func() { - pubSubErr := pubsub.StartPubMessagingServer(ctx, int32(cfg.natsMaxPayloadSize), cfg.natsPubSubURL, alerts, logger) + pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.natsMessagingURL, alerts, logger) if pubSubErr != nil { logger.Fatal("failed to start pub messaging server", zap.Error(pubSubErr)) } @@ -430,7 +424,7 @@ func runMessagingServices( if cfg.runTelegramPairServer() { go func() { - pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairTelegramURL, ns, es, pew, logger) + pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger) if pairErr != nil { logger.Fatal("failed to start pair messaging server", zap.Error(pairErr)) } @@ -439,7 +433,7 @@ func runMessagingServices( if cfg.runDiscordPairServer() { go func() { - pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairDiscordURL, ns, es, pew, logger) + pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger) if pairErr != nil { logger.Fatal("failed to start pair messaging server", zap.Error(pairErr)) } diff --git a/go.mod b/go.mod index b9e7621f..9600d265 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/consensys/gnark-crypto v0.14.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/fatih/color v1.18.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-jose/go-jose/v4 v4.0.1 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -68,6 +69,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/ronanh/intcomp v1.1.0 // indirect github.com/rs/zerolog v1.33.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect @@ -85,10 +87,10 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.28.0 // indirect - golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/net v0.29.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.26.0 // indirect + golang.org/x/sync v0.9.0 // indirect + golang.org/x/sys v0.27.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.7.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/go.sum b/go.sum index 32ec5bb1..e8c511cb 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= @@ -459,8 +459,8 @@ github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoG github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/ronanh/intcomp v1.1.0 h1:i54kxmpmSoOZFcWPMWryuakN0vLxLswASsGa07zkvLU= github.com/ronanh/intcomp v1.1.0/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -588,8 +588,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -696,8 +696,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -780,8 +780,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/analysis/analyzer_test.go b/pkg/analysis/analyzer_test.go index 1d01923c..c780cee5 100644 --- a/pkg/analysis/analyzer_test.go +++ b/pkg/analysis/analyzer_test.go @@ -179,7 +179,9 @@ func runTestCase(zap *zap.Logger, test testCase) func(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - stateHashAlert := *actualAlert.(*entities.StateHashAlert) + actualA, ok := actualAlert.(*entities.StateHashAlert) + require.True(t, ok) + stateHashAlert := *actualA require.Contains(t, test.expectedAlerts, stateHashAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") diff --git a/pkg/analysis/criteria/base_target_test.go b/pkg/analysis/criteria/base_target_test.go index a67752cb..4218f66f 100644 --- a/pkg/analysis/criteria/base_target_test.go +++ b/pkg/analysis/criteria/base_target_test.go @@ -78,7 +78,9 @@ func TestBaseTargetCriterion_Analyze(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - baseTargetAlert := *actualAlert.(*entities.BaseTargetAlert) + actualA, ok := actualAlert.(*entities.BaseTargetAlert) + require.True(t, ok) + baseTargetAlert := *actualA require.Contains(t, test.expectedAlerts, baseTargetAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") @@ -125,7 +127,9 @@ func TestNoBaseTargetAlertCriterion_Analyze(t *testing.T) { select { case actualAlert, ok := <-alerts: if ok { - baseTargetAlert := *actualAlert.(*entities.BaseTargetAlert) + actualA, okay := actualAlert.(*entities.BaseTargetAlert) + require.True(t, okay) + baseTargetAlert := *actualA require.Fail(t, "unexpected alert: %v", baseTargetAlert) } return diff --git a/pkg/analysis/criteria/challenged_block_test.go b/pkg/analysis/criteria/challenged_block_test.go index 08e4bb21..38e08f05 100644 --- a/pkg/analysis/criteria/challenged_block_test.go +++ b/pkg/analysis/criteria/challenged_block_test.go @@ -92,7 +92,9 @@ func TestChallengedBlockCriterion_Analyze(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - challengedBlockAlert := *actualAlert.(*entities.ChallengedBlockAlert) + actualA, ok := actualAlert.(*entities.ChallengedBlockAlert) + require.True(t, ok) + challengedBlockAlert := *actualA require.Contains(t, test.expectedAlerts, challengedBlockAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") diff --git a/pkg/analysis/criteria/height_test.go b/pkg/analysis/criteria/height_test.go index ad9d330f..3749c3ab 100644 --- a/pkg/analysis/criteria/height_test.go +++ b/pkg/analysis/criteria/height_test.go @@ -101,7 +101,9 @@ func TestHeightCriterion_Analyze(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - heightAlert := *actualAlert.(*entities.HeightAlert) + actualA, ok := actualAlert.(*entities.HeightAlert) + require.True(t, ok) + heightAlert := *actualA require.Contains(t, test.expectedAlerts, heightAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") diff --git a/pkg/analysis/criteria/incomplete_test.go b/pkg/analysis/criteria/incomplete_test.go index 3c3f9274..d1b54d8d 100644 --- a/pkg/analysis/criteria/incomplete_test.go +++ b/pkg/analysis/criteria/incomplete_test.go @@ -132,7 +132,9 @@ func TestIncompleteCriterion_Analyze(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - incompleteAlert := *actualAlert.(*entities.IncompleteAlert) + actualA, ok := actualAlert.(*entities.IncompleteAlert) + require.True(t, ok) + incompleteAlert := *actualA require.Contains(t, test.expectedAlerts, incompleteAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") diff --git a/pkg/analysis/criteria/state_hash_test.go b/pkg/analysis/criteria/state_hash_test.go index c254784a..cb1bf228 100644 --- a/pkg/analysis/criteria/state_hash_test.go +++ b/pkg/analysis/criteria/state_hash_test.go @@ -245,7 +245,9 @@ func TestStateHashCriterion_Analyze(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - stateHashAlert := *actualAlert.(*entities.StateHashAlert) + actualA, ok := actualAlert.(*entities.StateHashAlert) + require.True(t, ok) + stateHashAlert := *actualA require.Contains(t, test.expectedAlerts, stateHashAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") diff --git a/pkg/analysis/criteria/unreachable_test.go b/pkg/analysis/criteria/unreachable_test.go index c4bd40ec..32aeeccc 100644 --- a/pkg/analysis/criteria/unreachable_test.go +++ b/pkg/analysis/criteria/unreachable_test.go @@ -96,7 +96,9 @@ func TestUnreachableCriterion_Analyze(t *testing.T) { for j := range test.expectedAlerts { select { case actualAlert := <-alerts: - unreachableAlert := *actualAlert.(*entities.UnreachableAlert) + actualA, ok := actualAlert.(*entities.UnreachableAlert) + require.True(t, ok) + unreachableAlert := *actualA require.Contains(t, test.expectedAlerts, unreachableAlert, "test case #%d", j+1) case <-time.After(5 * time.Second): require.Fail(t, "timeout exceeded") diff --git a/pkg/messaging/helper.go b/pkg/messaging/helper.go new file mode 100644 index 00000000..eee7cabf --- /dev/null +++ b/pkg/messaging/helper.go @@ -0,0 +1,38 @@ +package messaging + +import ( + "fmt" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +const ( + PubSubTopic = "alerts" + BotRequestsTopic = "bot_requests" + + numberOfStringsAfterSlash = 2 +) + +func ParseHostAndPortFromURL(natsPubSubURL string) (string, int, error) { + // Find the position of "://" and trim everything before it + withoutProtocol := strings.SplitN(natsPubSubURL, "://", numberOfStringsAfterSlash) + if len(withoutProtocol) != numberOfStringsAfterSlash { + return "", 0, fmt.Errorf("failed to split the URL into pieces, URL: %s", natsPubSubURL) + } + + // Split the remaining part into host and port + hostPort := strings.Split(withoutProtocol[1], ":") + if len(hostPort) != numberOfStringsAfterSlash { + return "", 0, fmt.Errorf("failed to split host port string into host and port, %s", hostPort) + } + + host := hostPort[0] + // Convert the port to an integer + port, err := strconv.Atoi(hostPort[1]) + if err != nil { + return "", 0, errors.Errorf("failed to parse port, %v", err) + } + return host, port, nil +} diff --git a/pkg/messaging/pair/server.go b/pkg/messaging/pair/server.go index 4d2dd828..e6f45eb9 100644 --- a/pkg/messaging/pair/server.go +++ b/pkg/messaging/pair/server.go @@ -7,85 +7,66 @@ import ( "time" "nodemon/pkg/entities" + "nodemon/pkg/messaging" "nodemon/pkg/storing/events" "nodemon/pkg/storing/nodes" "nodemon/pkg/storing/specific" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - "go.nanomsg.org/mangos/v3/protocol" - "go.nanomsg.org/mangos/v3/protocol/pair" "go.uber.org/zap" ) +const okMessage = "ok" + func StartPairMessagingServer( ctx context.Context, - nanomsgURL string, + natsPairURL string, ns nodes.Storage, es *events.Storage, pew specific.PrivateNodesEventsWriter, logger *zap.Logger, ) error { - if len(nanomsgURL) == 0 || len(strings.Fields(nanomsgURL)) > 1 { - return errors.New("invalid nanomsg IPC URL for pair socket") - } - socket, sockErr := pair.NewSocket() - if sockErr != nil { - return sockErr - } - defer func(socketPair protocol.Socket) { - if err := socketPair.Close(); err != nil { - logger.Error("Failed to close pair socket", zap.Error(err)) - } - }(socket) - - if err := socket.Listen(nanomsgURL); err != nil { + nc, err := nats.Connect(natsPairURL) + if err != nil { + logger.Fatal("Failed to connect to nats server", zap.Error(err)) return err } + defer nc.Close() - loopErr := enterLoop(ctx, socket, logger, ns, es, pew) - if loopErr != nil && !errors.Is(loopErr, context.Canceled) { - return loopErr + if len(natsPairURL) == 0 { + return errors.New("invalid nats URL for pair messaging") } - return nil -} -func enterLoop( - ctx context.Context, - socket protocol.Socket, - logger *zap.Logger, - ns nodes.Storage, - es *events.Storage, - pew specific.PrivateNodesEventsWriter, -) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - rawMsg, recvErr := socket.Recv() - if recvErr != nil { - logger.Error("Failed to receive a message from pair socket", zap.Error(recvErr)) - return recvErr - } - err := handleMessage(rawMsg, ns, logger, socket, es, pew) - if err != nil { - return err - } + _, subErr := nc.Subscribe(messaging.BotRequestsTopic, func(request *nats.Msg) { + response, handleErr := handleMessage(request.Data, ns, logger, es, pew) + if handleErr != nil { + logger.Error("failed to handle bot request", zap.Error(handleErr)) + return + } + respndErr := request.Respond(response) + if respndErr != nil { + logger.Error("failed to respond to bot request", zap.Error(respndErr)) + return } + }) + if subErr != nil { + return subErr } + <-ctx.Done() + return nil } func handleMessage( rawMsg []byte, ns nodes.Storage, logger *zap.Logger, - socket protocol.Socket, es *events.Storage, pew specific.PrivateNodesEventsWriter, -) error { +) ([]byte, error) { if len(rawMsg) == 0 { logger.Warn("empty raw message received from pair socket") - return nil + return nil, nil } var ( t = RequestPairType(rawMsg[0]) @@ -93,13 +74,17 @@ func handleMessage( ) switch t { case RequestNodeListType: - if err := handleNodesRequest(ns, false, logger, socket); err != nil { - return err + response, err := handleNodesRequest(ns, false, logger) + if err != nil { + return nil, err } + return response, nil case RequestSpecificNodeListType: - if err := handleNodesRequest(ns, true, logger, socket); err != nil { - return err + response, err := handleNodesRequest(ns, true, logger) + if err != nil { + return nil, err } + return response, nil case RequestInsertNewNodeType: insertRegularNodeIfNew(msg, ns, logger) case RequestInsertSpecificNewNodeType: @@ -109,11 +94,13 @@ func handleMessage( case RequestDeleteNodeType: handleDeleteNodeRequest(msg, ns, logger) case RequestNodesStatusType, RequestNodeStatementType: - handleNodesStatementsRequest(msg, es, logger, socket) + response := handleNodesStatementsRequest(msg, es, logger) + return response, nil default: logger.Error("Unknown request type", zap.Int("type", int(t)), zap.Binary("message", msg)) } - return nil + // nats considers a message delivered only if there was a not nil response + return []byte(okMessage), nil } func insertNodeIfNew(url string, ns nodes.Storage, specific bool, logger *zap.Logger) bool { @@ -160,27 +147,24 @@ func handleUpdateNodeRequest(msg []byte, logger *zap.Logger, ns nodes.Storage) { } } -func handleNodesRequest(ns nodes.Storage, specific bool, logger *zap.Logger, socketPair protocol.Socket) error { +func handleNodesRequest(ns nodes.Storage, specific bool, logger *zap.Logger) ([]byte, error) { nodesList, err := ns.Nodes(specific) if err != nil { logger.Error("Failed to get list of nodes from storage", zap.Error(err), zap.Bool("specific", specific), ) - return err + return nil, err } response := NodesListResponse{Nodes: nodesList} marshaledResponse, err := json.Marshal(response) if err != nil { logger.Error("Failed to marshal node list to json", zap.Error(err)) + return nil, errors.Wrapf(err, "Failed to marshal node list to json") } - err = socketPair.Send(marshaledResponse) - if err != nil { - logger.Error("Failed to send a node list to pair socket", zap.Error(err)) - } - return nil + return marshaledResponse, nil } -func handleNodesStatementsRequest(msg []byte, es *events.Storage, logger *zap.Logger, socketPair protocol.Socket) { +func handleNodesStatementsRequest(msg []byte, es *events.Storage, logger *zap.Logger) []byte { listOfNodes := strings.Split(string(msg), ",") var nodesStatusResp NodesStatementsResponse @@ -211,8 +195,5 @@ func handleNodesStatementsRequest(msg []byte, es *events.Storage, logger *zap.Lo if err != nil { logger.Error("Failed to marshal node status to json", zap.Error(err)) } - err = socketPair.Send(response) - if err != nil { - logger.Error("Failed to send a response from pair socket", zap.Error(err)) - } + return response } diff --git a/pkg/messaging/pubsub/server.go b/pkg/messaging/pubsub/server.go index 158d5734..f782606e 100644 --- a/pkg/messaging/pubsub/server.go +++ b/pkg/messaging/pubsub/server.go @@ -2,77 +2,26 @@ package pubsub import ( "context" - "fmt" + "github.com/nats-io/nats.go" "github.com/pkg/errors" - _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports - "nodemon/pkg/entities" - "nodemon/pkg/messaging" - "strconv" - "strings" - "github.com/nats-io/nats-server/v2/server" "go.uber.org/zap" -) - -const ConnectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT -const pubsubTopic = "pubsub" - -func parseHostAndPortFromUrl(natsPubSubUrl string) (string, int, error) { - // Find the position of "://" and trim everything before it - withoutProtocol := strings.SplitN(natsPubSubUrl, "://", 2) - if len(withoutProtocol) != 2 { - return "", 0, errors.New(fmt.Sprintf("failed to split the URL into pieces, URL: %s", natsPubSubUrl)) - } - - // Split the remaining part into host and port - hostPort := strings.Split(withoutProtocol[1], ":") - if len(hostPort) != 2 { - return "", 0, errors.New(fmt.Sprintf("failed to split host port string into host and port, %s", hostPort)) - } - host := hostPort[0] - // Convert the port to an integer - port, err := strconv.Atoi(hostPort[1]) - if err != nil { - return "", 0, errors.Errorf("failed to parse port, %v", err) - } - return host, port, nil -} + "nodemon/pkg/entities" + "nodemon/pkg/messaging" +) func StartPubMessagingServer( ctx context.Context, - maxPayloadSize int32, - natsPubSubUrl string, // expected nats://host:port + natsPubSubURL string, // expected nats://host:port alerts <-chan entities.Alert, logger *zap.Logger, ) error { - if len(natsPubSubUrl) == 0 { - return errors.New("invalid nats pubsub URL") - } - host, port, err := parseHostAndPortFromUrl(natsPubSubUrl) + socket, err := nats.Connect(natsPubSubURL) if err != nil { return err } - - opts := &server.Options{ - MaxPayload: maxPayloadSize, - Host: host, - Port: port, - } - s, err := server.NewServer(opts) - if err != nil { - return errors.Errorf("failed to create NATS server: %v", err) - } - go s.Start() - - if !s.ReadyForConnections(ConnectionsTimeoutDefault) { - return errors.Errorf("NATS Server not ready for connections") - } - logger.Info("NATS PubSub Server is running...") - - socket, err := nats.Connect(fmt.Sprintf("nats://%s:%d", host, port)) - loopErr := enterLoop(ctx, alerts, logger, socket) if loopErr != nil && !errors.Is(loopErr, context.Canceled) { return loopErr @@ -80,7 +29,7 @@ func StartPubMessagingServer( return nil } -func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, socket *nats.Conn) error { +func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, nc *nats.Conn) error { for { select { case <-ctx.Done(): @@ -98,7 +47,7 @@ func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Lo logger.Error("Failed to marshal binary alert message", zap.Error(err)) continue } - err = socket.Publish(pubsubTopic, data) + err = nc.Publish(messaging.PubSubTopic, data) if err != nil { logger.Error("Failed to send alert to socket", zap.Error(err)) } From 9cd6bf694b04c47684d05fa790d8fd7c088349d3 Mon Sep 17 00:00:00 2001 From: esuwu Date: Mon, 9 Dec 2024 22:17:21 -0600 Subject: [PATCH 3/4] Added a nodemon-built-in optional messaging server, dockerfile and make command --- Dockerfile-messaging-server | 40 ++++++ Makefile | 3 + cmd/bots/discord/discord.go | 19 ++- cmd/bots/internal/common/environment.go | 21 ++-- cmd/bots/internal/common/initial/initial.go | 6 +- .../internal/common/messaging/pair_client.go | 52 ++++---- .../common/messaging/pubsub_client.go | 6 +- cmd/bots/telegram/telegram.go | 13 +- cmd/messaging/messaging_server.go | 70 +++++++++++ cmd/messaging/nats_server.go | 96 --------------- cmd/nodemon/nodemon.go | 114 +++++++++++++----- go.mod | 2 - go.sum | 7 -- pkg/messaging/helper.go | 32 ----- pkg/messaging/messaging_server.go | 52 ++++++++ pkg/messaging/pair/server.go | 9 +- pkg/messaging/pubsub/server.go | 10 +- 17 files changed, 332 insertions(+), 220 deletions(-) create mode 100644 Dockerfile-messaging-server create mode 100644 cmd/messaging/messaging_server.go delete mode 100644 cmd/messaging/nats_server.go create mode 100644 pkg/messaging/messaging_server.go diff --git a/Dockerfile-messaging-server b/Dockerfile-messaging-server new file mode 100644 index 00000000..533f7ebf --- /dev/null +++ b/Dockerfile-messaging-server @@ -0,0 +1,40 @@ +FROM golang:1.23.3-alpine3.20 as builder +ARG APP=/app +WORKDIR ${APP} + +RUN apk add --no-cache make +# disable cgo for go build +ENV CGO_ENABLED=0 + +COPY go.mod . +COPY go.sum . + +RUN go mod download + +COPY Makefile . +COPY cmd cmd +COPY pkg pkg +COPY internal internal + +RUN make build-messaging-server-linux-amd64 + +FROM alpine:3.20 +ARG APP=/app +ENV TZ=Etc/UTC \ + APP_USER=appuser + +STOPSIGNAL SIGINT + +RUN addgroup -S $APP_USER \ + && adduser -S $APP_USER -G $APP_USER + +RUN apk add --no-cache bind-tools + +USER $APP_USER +WORKDIR ${APP} +# Considered as a default HTTP API Port +EXPOSE 8080 + +COPY --from=builder ${APP}/build/linux-amd64/messaging-server ${APP}/messaging-server + +ENTRYPOINT ["./messaging-server"] diff --git a/Makefile b/Makefile index 5abdb78e..95f934b7 100644 --- a/Makefile +++ b/Makefile @@ -35,3 +35,6 @@ build-bots-linux-amd64: build-nodemon-linux-amd64: @CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/linux-amd64/nodemon -ldflags="-X 'nodemon/internal.version=$(VERSION)'" ./cmd/nodemon + +build-messaging-server-linux-amd64: + @CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/linux-amd64/messaging-server -ldflags="-X 'nodemon/internal.version=$(VERSION)'" ./cmd/messaging diff --git a/cmd/bots/discord/discord.go b/cmd/bots/discord/discord.go index 96afafc9..113a0986 100644 --- a/cmd/bots/discord/discord.go +++ b/cmd/bots/discord/discord.go @@ -45,14 +45,15 @@ type discordBotConfig struct { logLevel string development bool bindAddress string + scheme string } func newDiscordBotConfigConfig() *discordBotConfig { c := new(discordBotConfig) - tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nano-msg-pubsub-url", - "ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket") - tools.StringVarFlagWithEnv(&c.natsPairURL, "nano-msg-pair-discord-url", - "ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket") + tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nats-pubsub-url", + "nats://127.0.0.1:4222", "NATS server URL for pubsub messaging") + tools.StringVarFlagWithEnv(&c.natsPairURL, "nats-pair-discord-url", + "nats://127.0.0.1:4222", "NATS server URL for pair messaging") tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token", "", "The secret token used to authenticate the bot") tools.StringVarFlagWithEnv(&c.discordChatID, "discord-chat-id", @@ -62,6 +63,8 @@ func newDiscordBotConfigConfig() *discordBotConfig { tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "", "Local network address to bind the HTTP API of the service on.") + tools.StringVarFlagWithEnv(&c.scheme, "scheme", + "testnet", "Blockchain scheme i.e. mainnet, testnet, stagenet. Used in messaging service") return c } @@ -70,6 +73,9 @@ func (c *discordBotConfig) validate(zap *zap.Logger) error { zap.Error("discord bot token is required") return common.ErrInvalidParameters } + if c.scheme == "" { + zap.Error("the blockchain scheme must be specified") + } if c.discordChatID == "" { zap.Error("discord chat ID is required") return common.ErrInvalidParameters @@ -111,6 +117,7 @@ func runDiscordBot() error { logger, requestChan, responseChan, + cfg.scheme, ) if initErr != nil { return errors.Wrap(initErr, "failed to init discord bot") @@ -178,7 +185,7 @@ func runMessagingClients( responseChan chan pair.Response, ) { go func() { - clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsPubSubURL, discordBotEnv, logger) + clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsPubSubURL, discordBotEnv, logger, cfg.scheme) if clientErr != nil { logger.Fatal("failed to start sub messaging client", zap.Error(clientErr)) return @@ -186,7 +193,7 @@ func runMessagingClients( }() go func() { - err := messaging.StartPairMessagingClient(ctx, cfg.natsPairURL, requestChan, responseChan, logger) + err := messaging.StartPairMessagingClient(ctx, cfg.natsPairURL, requestChan, responseChan, logger, cfg.scheme) if err != nil { logger.Fatal("failed to start pair messaging client", zap.Error(err)) return diff --git a/cmd/bots/internal/common/environment.go b/cmd/bots/internal/common/environment.go index 036655be..d972e8d2 100644 --- a/cmd/bots/internal/common/environment.go +++ b/cmd/bots/internal/common/environment.go @@ -22,8 +22,6 @@ import ( "github.com/bwmarrin/discordgo" "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" - "go.nanomsg.org/mangos/v3" - "go.nanomsg.org/mangos/v3/protocol" "go.uber.org/zap" "gopkg.in/telebot.v3" ) @@ -80,12 +78,12 @@ func (s *subscriptions) MapR(f func()) { type DiscordBotEnvironment struct { ChatID string Bot *discordgo.Session - subSocket protocol.Socket Subscriptions subscriptions zap *zap.Logger requestType chan<- pair.Request responsePairType <-chan pair.Response unhandledAlertMessages unhandledAlertMessages + scheme string } func NewDiscordBotEnvironment( @@ -94,6 +92,7 @@ func NewDiscordBotEnvironment( zap *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) *DiscordBotEnvironment { return &DiscordBotEnvironment{ Bot: bot, @@ -106,6 +105,7 @@ func NewDiscordBotEnvironment( requestType: requestType, responsePairType: responsePairType, unhandledAlertMessages: newUnhandledAlertMessages(), + scheme: scheme, } } @@ -121,10 +121,6 @@ func (dscBot *DiscordBotEnvironment) Start() error { return nil } -func (dscBot *DiscordBotEnvironment) SetSubSocket(subSocket protocol.Socket) { - dscBot.subSocket = subSocket -} - func (dscBot *DiscordBotEnvironment) SendMessage(msg string) { _, err := dscBot.Bot.ChannelMessageSend(dscBot.ChatID, msg) if err != nil { @@ -194,10 +190,10 @@ func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error { if dscBot.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}) - if err != nil { - return err - } + // err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}). + // if err != nil { + // return err + // } dscBot.Subscriptions.Add(alertType, alertName) dscBot.zap.Sugar().Infof("subscribed to %s", alertName) } @@ -250,6 +246,7 @@ type TelegramBotEnvironment struct { requestType chan<- pair.Request responsePairType <-chan pair.Response unhandledAlertMessages unhandledAlertMessages + scheme string } func NewTelegramBotEnvironment( @@ -259,6 +256,7 @@ func NewTelegramBotEnvironment( zap *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) *TelegramBotEnvironment { return &TelegramBotEnvironment{ Bot: bot, @@ -272,6 +270,7 @@ func NewTelegramBotEnvironment( requestType: requestType, responsePairType: responsePairType, unhandledAlertMessages: newUnhandledAlertMessages(), + scheme: scheme, } } diff --git a/cmd/bots/internal/common/initial/initial.go b/cmd/bots/internal/common/initial/initial.go index fcf0d939..de68a9ce 100644 --- a/cmd/bots/internal/common/initial/initial.go +++ b/cmd/bots/internal/common/initial/initial.go @@ -19,6 +19,7 @@ func InitTgBot(behavior string, logger *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) (*common.TelegramBotEnvironment, error) { botSettings, err := config.NewTgBotSettings(behavior, webhookLocalAddress, publicURL, botToken) if err != nil { @@ -31,7 +32,7 @@ func InitTgBot(behavior string, logger.Sugar().Debugf("telegram chat id for sending alerts is %d", chatID) - tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType) + tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType, scheme) return tgBotEnv, nil } @@ -41,6 +42,7 @@ func InitDiscordBot( logger *zap.Logger, requestType chan<- pair.Request, responsePairType <-chan pair.Response, + scheme string, ) (*common.DiscordBotEnvironment, error) { bot, err := discordgo.New("Bot " + botToken) if err != nil { @@ -49,6 +51,6 @@ func InitDiscordBot( logger.Sugar().Debugf("discord chat id for sending alerts is %s", chatID) bot.Identify.Intents = discordgo.IntentsGuildMessages | discordgo.IntentsMessageContent - dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType) + dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType, scheme) return dscBotEnv, nil } diff --git a/cmd/bots/internal/common/messaging/pair_client.go b/cmd/bots/internal/common/messaging/pair_client.go index 4777e863..6d82fe7b 100644 --- a/cmd/bots/internal/common/messaging/pair_client.go +++ b/cmd/bots/internal/common/messaging/pair_client.go @@ -26,6 +26,7 @@ func StartPairMessagingClient( requestPair <-chan pair.Request, responsePair chan<- pair.Response, logger *zap.Logger, + scheme string, ) error { nc, err := nats.Connect(natsServerURL) if err != nil { @@ -34,7 +35,7 @@ func StartPairMessagingClient( } defer nc.Close() - done := runPairLoop(ctx, requestPair, nc, logger, responsePair) + done := runPairLoop(ctx, requestPair, nc, logger, responsePair, scheme) <-ctx.Done() logger.Info("stopping pair messaging service...") @@ -49,6 +50,7 @@ func runPairLoop( nc *nats.Conn, logger *zap.Logger, responsePair chan<- pair.Response, + scheme string, ) <-chan struct{} { ch := make(chan struct{}) go func(done chan<- struct{}) { @@ -64,7 +66,7 @@ func runPairLoop( message := &bytes.Buffer{} message.WriteByte(byte(request.RequestType())) - err := handlePairRequest(ctx, request, nc, message, logger, responsePair) + err := handlePairRequest(ctx, request, nc, message, logger, responsePair, scheme) if err != nil { logger.Error("failed to handle pair request", zap.String("request-type", fmt.Sprintf("(%T)", request)), @@ -84,26 +86,21 @@ func handlePairRequest( message *bytes.Buffer, logger *zap.Logger, responsePair chan<- pair.Response, + scheme string, ) error { switch r := request.(type) { case *pair.NodesListRequest: - return handleNodesListRequest(ctx, nc, message, logger, responsePair) + return handleNodesListRequest(ctx, nc, message, logger, responsePair, scheme) case *pair.InsertNewNodeRequest: - return handleInsertNewNodeRequest(r.URL, message, nc) + return handleInsertNewNodeRequest(r.URL, message, nc, scheme) case *pair.UpdateNodeRequest: - return handleUpdateNodeRequest(r.URL, r.Alias, message, nc) + return handleUpdateNodeRequest(r.URL, r.Alias, message, nc, scheme) case *pair.DeleteNodeRequest: - message.WriteString(r.URL) - // ignore response - _, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) - if err != nil { - return errors.Wrap(err, "failed to receive message from nodemon") - } - return nil + return handleDeleteNodeRequest(r.URL, message, nc, scheme) case *pair.NodesStatusRequest: - return handleNodesStatementsRequest(ctx, r.URLs, message, nc, logger, responsePair) + return handleNodesStatementsRequest(ctx, r.URLs, message, nc, logger, responsePair, scheme) case *pair.NodeStatementRequest: - return handleNodesStatementRequest(ctx, r.URL, r.Height, logger, message, nc, responsePair) + return handleNodesStatementRequest(ctx, r.URL, r.Height, logger, message, nc, responsePair, scheme) default: return errors.New("unknown request type to pair socket") } @@ -117,6 +114,7 @@ func handleNodesStatementRequest( message *bytes.Buffer, nc *nats.Conn, responsePair chan<- pair.Response, + scheme string, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() @@ -128,7 +126,7 @@ func handleNodesStatementRequest( message.Write(req) - response, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) + response, err := nc.Request(messaging.BotRequestsTopic+scheme, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to receive message from nodemon") } @@ -158,13 +156,14 @@ func handleNodesStatementsRequest( nc *nats.Conn, logger *zap.Logger, responsePair chan<- pair.Response, + scheme string, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() message.WriteString(strings.Join(urls, ",")) - response, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) + response, err := nc.Request(messaging.BotRequestsTopic+scheme, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to receive message from nodemon") } @@ -186,7 +185,7 @@ func handleNodesStatementsRequest( } } -func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, nc *nats.Conn) error { +func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, nc *nats.Conn, scheme string) error { node := entities.Node{URL: url, Enabled: true, Alias: alias} nodeInfo, err := json.Marshal(node) if err != nil { @@ -194,17 +193,27 @@ func handleUpdateNodeRequest(url, alias string, message *bytes.Buffer, nc *nats. } message.Write(nodeInfo) // ignore a response - _, err = nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) + _, err = nc.Request(messaging.BotRequestsTopic+scheme, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to send message") } return nil } -func handleInsertNewNodeRequest(url string, message *bytes.Buffer, nc *nats.Conn) error { +func handleDeleteNodeRequest(url string, message *bytes.Buffer, nc *nats.Conn, scheme string) error { + message.WriteString(url) + // ignore response + _, err := nc.Request(messaging.BotRequestsTopic+scheme, message.Bytes(), defaultResponseTimeout) + if err != nil { + return errors.Wrap(err, "failed to receive message from nodemon") + } + return nil +} + +func handleInsertNewNodeRequest(url string, message *bytes.Buffer, nc *nats.Conn, scheme string) error { message.WriteString(url) // ignore a response - _, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) + _, err := nc.Request(messaging.BotRequestsTopic+scheme, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to send message") } @@ -217,11 +226,12 @@ func handleNodesListRequest( message *bytes.Buffer, logger *zap.Logger, responsePair chan<- pair.Response, + scheme string, ) error { ctx, cancel := context.WithTimeout(ctx, defaultResponseTimeout) defer cancel() - response, err := nc.Request(messaging.BotRequestsTopic, message.Bytes(), defaultResponseTimeout) + response, err := nc.Request(messaging.BotRequestsTopic+scheme, message.Bytes(), defaultResponseTimeout) if err != nil { return errors.Wrap(err, "failed to receive message") } diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index 04d0f2be..678bf617 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -5,13 +5,13 @@ import ( "github.com/nats-io/nats.go" "github.com/pkg/errors" - _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports "go.uber.org/zap" "nodemon/pkg/messaging" ) -func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, logger *zap.Logger) error { +func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, + logger *zap.Logger, scheme string) error { // Connect to a NATS server nc, err := nats.Connect(natsServerURL) if err != nil { @@ -20,7 +20,7 @@ func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, } defer nc.Close() - _, err = nc.Subscribe(messaging.PubSubTopic, func(msg *nats.Msg) { + _, err = nc.Subscribe(messaging.PubSubTopic+scheme, func(msg *nats.Msg) { hndlErr := handleReceivedMessage(msg.Data, bot) if hndlErr != nil { zap.S().Errorf("failed to handle received message from pubsub server %v", hndlErr) diff --git a/cmd/bots/telegram/telegram.go b/cmd/bots/telegram/telegram.go index dd61c03b..9d527d42 100644 --- a/cmd/bots/telegram/telegram.go +++ b/cmd/bots/telegram/telegram.go @@ -49,6 +49,7 @@ type telegramBotConfig struct { logLevel string development bool bindAddress string + scheme string } func newTelegramBotConfig() *telegramBotConfig { @@ -70,6 +71,8 @@ func newTelegramBotConfig() *telegramBotConfig { tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "", "Local network address to bind the HTTP API of the service on.") + tools.StringVarFlagWithEnv(&c.scheme, "scheme", + "testnet", "Blockchain scheme i.e. mainnet, testnet, stagenet") return c } @@ -82,6 +85,9 @@ func (c *telegramBotConfig) validate(logger *zap.Logger) error { logger.Error("public url is required for webhook method") return common.ErrInvalidParameters } + if c.scheme == "" { + logger.Error("the blockchain scheme must be specified") + } if c.tgChatID == 0 { logger.Error("telegram chat ID is required") return common.ErrInvalidParameters @@ -118,8 +124,7 @@ func runTelegramBot() error { responseChan := make(chan pair.Response) tgBotEnv, initErr := initial.InitTgBot(cfg.behavior, cfg.webhookLocalAddress, cfg.publicURL, - cfg.tgBotToken, cfg.tgChatID, logger, requestChan, responseChan, - ) + cfg.tgBotToken, cfg.tgChatID, logger, requestChan, responseChan, cfg.scheme) if initErr != nil { logger.Fatal("failed to initialize telegram bot", zap.Error(initErr)) } @@ -175,14 +180,14 @@ func runMessagingClients( pairResponse chan<- pair.Response, ) { go func() { - err := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, tgBotEnv, logger) + err := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, tgBotEnv, logger, cfg.scheme) if err != nil { logger.Fatal("failed to start sub messaging service", zap.Error(err)) } }() go func() { - err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, pairRequest, pairResponse, logger) + err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, pairRequest, pairResponse, logger, cfg.scheme) if err != nil { logger.Fatal("failed to start pair messaging service", zap.Error(err)) } diff --git a/cmd/messaging/messaging_server.go b/cmd/messaging/messaging_server.go new file mode 100644 index 00000000..d03e72fa --- /dev/null +++ b/cmd/messaging/messaging_server.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "flag" + "log" + "nodemon/pkg/messaging" + "os" + "os/signal" + "syscall" + "time" + + "go.uber.org/zap" + + "nodemon/pkg/tools" + + "github.com/nats-io/nats-server/v2/server" +) + +const natsMaxPayloadSize int32 = 1024 * 1024 // 1 MB +const connectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT + +type natsConfig struct { + serverURL string + maxPayload int64 + logLevel string + development bool + connectionTimeout time.Duration +} + +func parseNatsConfig() *natsConfig { + c := new(natsConfig) + tools.StringVarFlagWithEnv(&c.serverURL, "nats-url", + "nats://127.0.0.1:4222", "NATS server URL") + tools.Int64VarFlagWithEnv(&c.maxPayload, "max-payload", int64(natsMaxPayloadSize), + "Max server payload size in bytes") + tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", + "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") + tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") + tools.DurationVarFlagWithEnv(&c.connectionTimeout, "connection-timeout", connectionsTimeoutDefault, + "HTTP API read timeout. Default value is 30s.") + return c +} + +func main() { + ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer done() + + cfg := parseNatsConfig() + flag.Parse() + + logger, _, err := tools.SetupZapLogger(cfg.logLevel, cfg.development) + if err != nil { + log.Printf("Failed to setup zap logger: %v", err) + return + } + defer func(zap *zap.Logger) { + if syncErr := zap.Sync(); syncErr != nil { + log.Println(syncErr) + } + }(logger) + + err = messaging.RunNatsMessagingServer(cfg.serverURL, logger, cfg.maxPayload, cfg.connectionTimeout) + if err != nil { + log.Printf("Failed run nats messaging server: %v", err) + return + } + <-ctx.Done() + logger.Info("NATS Server finished") +} diff --git a/cmd/messaging/nats_server.go b/cmd/messaging/nats_server.go deleted file mode 100644 index e123f6d1..00000000 --- a/cmd/messaging/nats_server.go +++ /dev/null @@ -1,96 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "log" - "math" - "os" - "os/signal" - "syscall" - "time" - - "go.uber.org/zap" - - "nodemon/pkg/messaging" - "nodemon/pkg/tools" - - "github.com/nats-io/nats-server/v2/server" -) - -const NatsMaxPayloadSize int32 = 1024 * 1024 // 1 MB -const ConnectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT - -type natsConfig struct { - serverURL string - maxPayload int64 - logLevel string - development bool - connectionTimeoutDefault time.Duration -} - -func parseNatsConfig() *natsConfig { - c := new(natsConfig) - tools.StringVarFlagWithEnv(&c.serverURL, "nats-url", - "nats://127.0.0.1:4222", "NATS server URL") - tools.Int64VarFlagWithEnv(&c.maxPayload, "max-payload", int64(NatsMaxPayloadSize), - "Max server payload size in bytes") - tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", - "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") - tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.") - tools.DurationVarFlagWithEnv(&c.connectionTimeoutDefault, "connection-timeout", ConnectionsTimeoutDefault, - "HTTP API read timeout. Default value is 30s.") - return c -} - -func main() { - ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer done() - - cfg := parseNatsConfig() - flag.Parse() - - logger, _, err := tools.SetupZapLogger(cfg.logLevel, cfg.development) - if err != nil { - log.Printf("Failed to setup zap logger: %v", err) - return - } - defer func(zap *zap.Logger) { - if syncErr := zap.Sync(); syncErr != nil { - log.Println(syncErr) - } - }(logger) - - host, port, err := messaging.ParseHostAndPortFromURL(cfg.serverURL) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to parse host and port %v", err)) - } - - if cfg.maxPayload > math.MaxInt32 || cfg.maxPayload < math.MinInt32 { - logger.Fatal("max payload is too big or too small, must be in range of int32") - } - - opts := &server.Options{ - MaxPayload: int32(cfg.maxPayload), - Host: host, - Port: port, - NoSigs: true, - } - s, err := server.NewServer(opts) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to create NATS server, %v", err)) - } - go s.Start() - defer func() { - s.Shutdown() - s.WaitForShutdown() - }() - if !s.ReadyForConnections(cfg.connectionTimeoutDefault) { - logger.Fatal("NATS server is not ready for connections") - } - logger.Info(fmt.Sprintf("NATS Server is running on host %v, port %d", host, port)) - <-ctx.Done() - - logger.Info("NATS Server finished") -} diff --git a/cmd/nodemon/nodemon.go b/cmd/nodemon/nodemon.go index f883092f..601b1a3f 100644 --- a/cmd/nodemon/nodemon.go +++ b/cmd/nodemon/nodemon.go @@ -6,6 +6,7 @@ import ( "flag" "log" "net/url" + "nodemon/pkg/messaging" "os" "os/signal" "strings" @@ -38,6 +39,9 @@ const ( defaultPollingInterval = 60 * time.Second defaultRetentionDuration = 12 * time.Hour defaultAPIReadTimeout = 30 * time.Second + + natsMaxPayloadSize int32 = 1024 * 1024 // 1 MB + connectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT ) var ( @@ -142,6 +146,12 @@ type nodemonVaultConfig struct { secretPath string } +type natsOptionalConfig struct { + serverURL string + maxPayload int64 + connectionTimeoutDefault time.Duration +} + func newNodemonVaultConfig() *nodemonVaultConfig { c := new(nodemonVaultConfig) tools.StringVarFlagWithEnv(&c.address, "vault-address", "", "Vault server address.") @@ -154,6 +164,17 @@ func newNodemonVaultConfig() *nodemonVaultConfig { return c } +func newNatsOptionalConfig() *natsOptionalConfig { + c := new(natsOptionalConfig) + tools.StringVarFlagWithEnv(&c.serverURL, "nats-url", + "nats://127.0.0.1:4222", "NATS server URL") + tools.Int64VarFlagWithEnv(&c.maxPayload, "max-payload", int64(natsMaxPayloadSize), + "Max server payload size in bytes") + tools.DurationVarFlagWithEnv(&c.connectionTimeoutDefault, "connection-timeout", connectionsTimeoutDefault, + "HTTP API read timeout. Default value is 30s.") + return c +} + func (n *nodemonVaultConfig) present() bool { return n.address != "" } @@ -200,6 +221,8 @@ type nodemonConfig struct { development bool vault *nodemonVaultConfig l2 *nodemonL2Config + scheme string + natsOptionalConfig *natsOptionalConfig } func newNodemonConfig() *nodemonConfig { @@ -229,8 +252,11 @@ func newNodemonConfig() *nodemonConfig { tools.BoolVarFlagWithEnv(&c.natsPairTelegram, "bot-requests-telegram", true, "Should let telegram bot send commands?") tools.StringVarFlagWithEnv(&c.logLevel, "log-level", "INFO", "Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.") + tools.StringVarFlagWithEnv(&c.scheme, "scheme", + "testnet", "Blockchain scheme i.e. mainnet, testnet, stagenet") c.vault = newNodemonVaultConfig() c.l2 = newNodemonL2Config() + c.natsOptionalConfig = newNatsOptionalConfig() return c } @@ -308,26 +334,11 @@ func run() error { ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer done() - ns, err := createNodesStorage(ctx, cfg, logger) - if err != nil { - return err - } - defer func(cs nodes.Storage) { - if closeErr := cs.Close(); closeErr != nil { - logger.Error("failed to close nodes storage", zap.Error(closeErr)) - } - }(ns) - - es, err := events.NewStorage(cfg.retention, logger) + ns, es, err := initializeStorages(ctx, cfg, logger) if err != nil { - logger.Error("failed to initialize events storage", zap.Error(err)) return err } - defer func(es *events.Storage) { - if closeErr := es.Close(); closeErr != nil { - logger.Error("failed to close events storage", zap.Error(closeErr)) - } - }(es) + defer closeStorages(ns, es, logger) scraper, err := scraping.NewScraper(ns, es, cfg.interval, cfg.timeout, logger) if err != nil { @@ -341,26 +352,73 @@ func run() error { return err } + a, serviceErr := startServices(ctx, cfg, ns, es, scraper, privateNodesHandler, atom, logger) + if serviceErr != nil { + return serviceErr + } + + <-ctx.Done() + a.Shutdown() + logger.Info("shutting down") + return nil +} + +func initializeStorages(ctx context.Context, cfg *nodemonConfig, + logger *zap.Logger) (nodes.Storage, *events.Storage, error) { + ns, err := createNodesStorage(ctx, cfg, logger) + if err != nil { + logger.Error("failed to initialize nodes storage", zap.Error(err)) + return nil, nil, err + } + + es, err := events.NewStorage(cfg.retention, logger) + if err != nil { + logger.Error("failed to initialize events storage", zap.Error(err)) + return nil, nil, err + } + + return ns, es, nil +} + +func closeStorages(ns nodes.Storage, es *events.Storage, logger *zap.Logger) { + if err := ns.Close(); err != nil { + logger.Error("failed to close nodes storage", zap.Error(err)) + } + if err := es.Close(); err != nil { + logger.Error("failed to close events storage", zap.Error(err)) + } +} + +func startServices(ctx context.Context, cfg *nodemonConfig, ns nodes.Storage, es *events.Storage, + scraper *scraping.Scraper, privateNodesHandler *specific.PrivateNodesHandler, + atom *zap.AtomicLevel, logger *zap.Logger) (*api.API, error) { notifications := scraper.Start(ctx) - notifications = privateNodesHandler.Run(notifications) // wraps scrapper's notification with private nodes handler - pew := privateNodesHandler.PrivateNodesEventsWriter() + notifications = privateNodesHandler.Run(notifications) // wraps scraper's notifications + pew := privateNodesHandler.PrivateNodesEventsWriter() a, err := api.NewAPI(cfg.bindAddress, ns, es, cfg.apiReadTimeout, logger, pew, atom, cfg.development) if err != nil { logger.Error("failed to initialize API", zap.Error(err)) - return err + return nil, err } + if apiErr := a.Start(); apiErr != nil { logger.Error("failed to start API", zap.Error(apiErr)) - return apiErr + return a, apiErr } cfg.runAnalyzers(ctx, cfg, es, ns, logger, pew, notifications) - <-ctx.Done() - a.Shutdown() - logger.Info("shutting down") - return nil + if cfg.natsOptionalConfig.serverURL != "" { + err = messaging.RunNatsMessagingServer(cfg.natsOptionalConfig.serverURL, logger, + cfg.natsOptionalConfig.maxPayload, cfg.natsOptionalConfig.connectionTimeoutDefault) + if err != nil { + logger.Error("failed to start NATS server", zap.Error(err)) + return a, err + } + } + + return a, err } func createNodesStorage(ctx context.Context, cfg *nodemonConfig, logger *zap.Logger) (nodes.Storage, error) { @@ -416,7 +474,7 @@ func runMessagingServices( pew specific.PrivateNodesEventsWriter, ) { go func() { - pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.natsMessagingURL, alerts, logger) + pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.natsMessagingURL, alerts, logger, cfg.scheme) if pubSubErr != nil { logger.Fatal("failed to start pub messaging server", zap.Error(pubSubErr)) } @@ -424,7 +482,7 @@ func runMessagingServices( if cfg.runTelegramPairServer() { go func() { - pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger) + pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger, cfg.scheme) if pairErr != nil { logger.Fatal("failed to start pair messaging server", zap.Error(pairErr)) } @@ -433,7 +491,7 @@ func runMessagingServices( if cfg.runDiscordPairServer() { go func() { - pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger) + pairErr := pair.StartPairMessagingServer(ctx, cfg.natsMessagingURL, ns, es, pew, logger, cfg.scheme) if pairErr != nil { logger.Fatal("failed to start pair messaging server", zap.Error(pairErr)) } diff --git a/go.mod b/go.mod index 9600d265..752dbef5 100644 --- a/go.mod +++ b/go.mod @@ -16,14 +16,12 @@ require ( github.com/stretchr/testify v1.9.0 github.com/tidwall/buntdb v1.3.2 github.com/wavesplatform/gowaves v0.10.7-0.20240927070807-c256c5d98bfa - go.nanomsg.org/mangos/v3 v3.4.2 go.uber.org/zap v1.27.0 gopkg.in/telebot.v3 v3.3.8 ) require ( filippo.io/edwards25519 v1.1.0 // indirect - github.com/Microsoft/go-winio v0.6.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.14.2 // indirect github.com/blang/semver/v4 v4.0.0 // indirect diff --git a/go.sum b/go.sum index e8c511cb..32e2b86c 100644 --- a/go.sum +++ b/go.sum @@ -61,9 +61,6 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= -github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= -github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -151,7 +148,6 @@ github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUork github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/gdamore/optopia v0.2.0/go.mod h1:YKYEwo5C1Pa617H7NlPcmQXl+vG6YnSSNB44n8dNL0Q= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= @@ -475,7 +471,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= @@ -545,8 +540,6 @@ go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dY go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.4/go.mod h1:Ud+VUwIi9/uQHOMA+4ekToJ12lTxlv0zB/+DHwTGEbU= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= -go.nanomsg.org/mangos/v3 v3.4.2 h1:gHlopxjWvJcVCcUilQIsRQk9jdj6/HB7wrTiUN8Ki7Q= -go.nanomsg.org/mangos/v3 v3.4.2/go.mod h1:8+hjBMQub6HvXmuGvIq6hf19uxGQIjCofmc62lbedLA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/pkg/messaging/helper.go b/pkg/messaging/helper.go index eee7cabf..6c314058 100644 --- a/pkg/messaging/helper.go +++ b/pkg/messaging/helper.go @@ -1,38 +1,6 @@ package messaging -import ( - "fmt" - "strconv" - "strings" - - "github.com/pkg/errors" -) - const ( PubSubTopic = "alerts" BotRequestsTopic = "bot_requests" - - numberOfStringsAfterSlash = 2 ) - -func ParseHostAndPortFromURL(natsPubSubURL string) (string, int, error) { - // Find the position of "://" and trim everything before it - withoutProtocol := strings.SplitN(natsPubSubURL, "://", numberOfStringsAfterSlash) - if len(withoutProtocol) != numberOfStringsAfterSlash { - return "", 0, fmt.Errorf("failed to split the URL into pieces, URL: %s", natsPubSubURL) - } - - // Split the remaining part into host and port - hostPort := strings.Split(withoutProtocol[1], ":") - if len(hostPort) != numberOfStringsAfterSlash { - return "", 0, fmt.Errorf("failed to split host port string into host and port, %s", hostPort) - } - - host := hostPort[0] - // Convert the port to an integer - port, err := strconv.Atoi(hostPort[1]) - if err != nil { - return "", 0, errors.Errorf("failed to parse port, %v", err) - } - return host, port, nil -} diff --git a/pkg/messaging/messaging_server.go b/pkg/messaging/messaging_server.go new file mode 100644 index 00000000..68a118ee --- /dev/null +++ b/pkg/messaging/messaging_server.go @@ -0,0 +1,52 @@ +package messaging + +import ( + "fmt" + "math" + "net" + "strconv" + "time" + + "go.uber.org/zap" + + "github.com/nats-io/nats-server/v2/server" + "github.com/pkg/errors" +) + +func RunNatsMessagingServer(serverURL string, logger *zap.Logger, + maxPayload int64, connectionTimeout time.Duration) error { + host, portString, err := net.SplitHostPort(serverURL) + if err != nil { + return errors.Errorf("failed to parse host and port: %v", err) + } + + port, err := strconv.Atoi(portString) + if err != nil { + return errors.Errorf("failed to parse port from the URL: %v", err) + } + + if maxPayload > math.MaxInt32 || maxPayload < math.MinInt32 { + return errors.Errorf("max payload is too big or too small, must be in range of int32") + } + + opts := &server.Options{ + MaxPayload: int32(maxPayload), + Host: host, + Port: port, + NoSigs: true, + } + s, err := server.NewServer(opts) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to create NATS server, %v", err)) + } + go s.Start() + defer func() { + s.Shutdown() + s.WaitForShutdown() + }() + if !s.ReadyForConnections(connectionTimeout) { + logger.Fatal("NATS server is not ready for connections") + } + logger.Info(fmt.Sprintf("NATS Server is running on %s:%d", host, port)) + return nil +} diff --git a/pkg/messaging/pair/server.go b/pkg/messaging/pair/server.go index e6f45eb9..d5aee6ff 100644 --- a/pkg/messaging/pair/server.go +++ b/pkg/messaging/pair/server.go @@ -26,6 +26,7 @@ func StartPairMessagingServer( es *events.Storage, pew specific.PrivateNodesEventsWriter, logger *zap.Logger, + scheme string, ) error { nc, err := nats.Connect(natsPairURL) if err != nil { @@ -38,15 +39,15 @@ func StartPairMessagingServer( return errors.New("invalid nats URL for pair messaging") } - _, subErr := nc.Subscribe(messaging.BotRequestsTopic, func(request *nats.Msg) { + _, subErr := nc.Subscribe(messaging.BotRequestsTopic+scheme, func(request *nats.Msg) { response, handleErr := handleMessage(request.Data, ns, logger, es, pew) if handleErr != nil { logger.Error("failed to handle bot request", zap.Error(handleErr)) return } - respndErr := request.Respond(response) - if respndErr != nil { - logger.Error("failed to respond to bot request", zap.Error(respndErr)) + respondErr := request.Respond(response) + if respondErr != nil { + logger.Error("failed to respond to bot request", zap.Error(respondErr)) return } }) diff --git a/pkg/messaging/pubsub/server.go b/pkg/messaging/pubsub/server.go index f782606e..85a5fa8a 100644 --- a/pkg/messaging/pubsub/server.go +++ b/pkg/messaging/pubsub/server.go @@ -17,19 +17,21 @@ func StartPubMessagingServer( natsPubSubURL string, // expected nats://host:port alerts <-chan entities.Alert, logger *zap.Logger, + scheme string, ) error { - socket, err := nats.Connect(natsPubSubURL) + nc, err := nats.Connect(natsPubSubURL) if err != nil { return err } - loopErr := enterLoop(ctx, alerts, logger, socket) + loopErr := enterLoop(ctx, alerts, logger, nc, scheme) if loopErr != nil && !errors.Is(loopErr, context.Canceled) { return loopErr } return nil } -func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Logger, nc *nats.Conn) error { +func enterLoop(ctx context.Context, alerts <-chan entities.Alert, + logger *zap.Logger, nc *nats.Conn, scheme string) error { for { select { case <-ctx.Done(): @@ -47,7 +49,7 @@ func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger *zap.Lo logger.Error("Failed to marshal binary alert message", zap.Error(err)) continue } - err = nc.Publish(messaging.PubSubTopic, data) + err = nc.Publish(messaging.PubSubTopic+scheme, data) if err != nil { logger.Error("Failed to send alert to socket", zap.Error(err)) } From ea9f9c08d49e687ed31259d005ae11db848ec134 Mon Sep 17 00:00:00 2001 From: esuwu Date: Mon, 9 Dec 2024 23:15:08 -0600 Subject: [PATCH 4/4] Fixed subscription to alerts --- cmd/bots/internal/common/environment.go | 99 +++++++++++++------ cmd/bots/internal/common/messaging/bots.go | 5 + .../common/messaging/pubsub_client.go | 11 +-- pkg/messaging/pubsub/server.go | 3 +- 4 files changed, 81 insertions(+), 37 deletions(-) diff --git a/cmd/bots/internal/common/environment.go b/cmd/bots/internal/common/environment.go index d972e8d2..17219706 100644 --- a/cmd/bots/internal/common/environment.go +++ b/cmd/bots/internal/common/environment.go @@ -20,6 +20,7 @@ import ( "codnect.io/chrono" "github.com/bwmarrin/discordgo" + "github.com/nats-io/nats.go" "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" "go.uber.org/zap" @@ -44,19 +45,25 @@ const ( var errUnknownAlertType = errors.New("received unknown alert type") +type AlertSubscription struct { + alertName entities.AlertName + subscription *nats.Subscription +} + type subscriptions struct { mu *sync.RWMutex - subs map[entities.AlertType]entities.AlertName + subs map[entities.AlertType]AlertSubscription } -func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName) { +func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName, + subscription *nats.Subscription) { s.mu.Lock() - s.subs[alertType] = alertName + s.subs[alertType] = AlertSubscription{alertName, subscription} s.mu.Unlock() } // Read returns alert name. -func (s *subscriptions) Read(alertType entities.AlertType) (entities.AlertName, bool) { +func (s *subscriptions) Read(alertType entities.AlertType) (AlertSubscription, bool) { s.mu.RLock() elem, ok := s.subs[alertType] s.mu.RUnlock() @@ -84,6 +91,9 @@ type DiscordBotEnvironment struct { responsePairType <-chan pair.Response unhandledAlertMessages unhandledAlertMessages scheme string + nc *nats.Conn + alertHandlerFunc func(msg *nats.Msg) + topic string } func NewDiscordBotEnvironment( @@ -98,7 +108,7 @@ func NewDiscordBotEnvironment( Bot: bot, ChatID: chatID, Subscriptions: subscriptions{ - subs: make(map[entities.AlertType]entities.AlertName), + subs: make(map[entities.AlertType]AlertSubscription), mu: new(sync.RWMutex), }, zap: zap, @@ -185,16 +195,28 @@ func (dscBot *DiscordBotEnvironment) SendAlertMessage(msg generalMessaging.Alert dscBot.unhandledAlertMessages.Add(alertID, messageID) } +func (dscBot *DiscordBotEnvironment) SetNatsConnection(nc *nats.Conn) { + dscBot.nc = nc +} + +func (dscBot *DiscordBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) { + dscBot.alertHandlerFunc = alertHandlerFunc +} + +func (dscBot *DiscordBotEnvironment) SetTopic(topic string) { + dscBot.topic = topic +} + func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error { for alertType, alertName := range entities.GetAllAlertTypesAndNames() { if dscBot.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - // err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}). - // if err != nil { - // return err - // } - dscBot.Subscriptions.Add(alertType, alertName) + subscription, err := dscBot.nc.Subscribe(dscBot.topic+string(alertType), dscBot.alertHandlerFunc) + if err != nil { + return errors.Wrap(err, "failed to subscribe to alert") + } + dscBot.Subscriptions.Add(alertType, alertName, subscription) dscBot.zap.Sugar().Infof("subscribed to %s", alertName) } return nil @@ -247,6 +269,9 @@ type TelegramBotEnvironment struct { responsePairType <-chan pair.Response unhandledAlertMessages unhandledAlertMessages scheme string + nc *nats.Conn + alertHandlerFunc func(msg *nats.Msg) + topic string } func NewTelegramBotEnvironment( @@ -263,7 +288,7 @@ func NewTelegramBotEnvironment( ChatID: chatID, Mute: mute, subscriptions: subscriptions{ - subs: make(map[entities.AlertType]entities.AlertName), + subs: make(map[entities.AlertType]AlertSubscription), mu: new(sync.RWMutex), }, zap: zap, @@ -419,12 +444,11 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error { if tgEnv.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - // todo fix this. send (topic, handlerFunc) into this function - // err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}). - // if err != nil {. - // return err. - // }. - tgEnv.subscriptions.Add(alertType, alertName) + subscription, err := tgEnv.nc.Subscribe(tgEnv.topic+string(alertType), tgEnv.alertHandlerFunc) + if err != nil { + return errors.Wrap(err, "failed to subscribe to alert") + } + tgEnv.subscriptions.Add(alertType, alertName, subscription) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) } @@ -441,13 +465,13 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName) } - // todo fix this. send (topic, handlerFunc) into this function - // err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}). - // if err != nil {. - // return errors.Wrap(err, "failed to subscribe to alert"). - // }. - tgEnv.subscriptions.Add(alertType, alertName) + subscription, err := tgEnv.nc.Subscribe(tgEnv.topic+string(alertType), tgEnv.alertHandlerFunc) + if err != nil { + return errors.Wrap(err, "failed to subscribe to alert") + } + tgEnv.subscriptions.Add(alertType, alertName, subscription) tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName) + return nil } @@ -460,20 +484,35 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale if !tgEnv.IsAlreadySubscribed(alertType) { return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName) } - // TODO fix this - // err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)}) - // if err != nil { - // return errors.Wrap(err, "failed to unsubscribe from alert") - // } + alertSub, ok := tgEnv.subscriptions.Read(alertType) + if !ok { + return errors.Errorf("subscription didn't exist even though I was subscribed to it") + } + err := alertSub.subscription.Unsubscribe() + if err != nil { + return errors.New("failed to unsubscribe from alert") + } ok = tgEnv.IsAlreadySubscribed(alertType) if !ok { - return errors.New("failed to unsubscribe from alert: was not subscribed to it") + return errors.New("tried to unsubscribe from alert, but still subscribed to it") } tgEnv.subscriptions.Delete(alertType) tgEnv.zap.Sugar().Infof("Telegram bot unsubscribed from %s", alertName) return nil } +func (tgEnv *TelegramBotEnvironment) SetNatsConnection(nc *nats.Conn) { + tgEnv.nc = nc +} + +func (tgEnv *TelegramBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) { + tgEnv.alertHandlerFunc = alertHandlerFunc +} + +func (tgEnv *TelegramBotEnvironment) SetTopic(topic string) { + tgEnv.topic = topic +} + type subscribed struct { AlertName string } @@ -497,7 +536,7 @@ func (tgEnv *TelegramBotEnvironment) SubscriptionsList() (string, error) { var subscribedTo []subscribed tgEnv.subscriptions.MapR(func() { for _, alertName := range tgEnv.subscriptions.subs { - s := subscribed{AlertName: string(alertName) + "\n\n"} + s := subscribed{AlertName: string(alertName.alertName) + "\n\n"} subscribedTo = append(subscribedTo, s) } }) diff --git a/cmd/bots/internal/common/messaging/bots.go b/cmd/bots/internal/common/messaging/bots.go index 45aed7be..62c0a2ae 100644 --- a/cmd/bots/internal/common/messaging/bots.go +++ b/cmd/bots/internal/common/messaging/bots.go @@ -2,11 +2,16 @@ package messaging import ( "nodemon/pkg/messaging" + + "github.com/nats-io/nats.go" ) type Bot interface { SendAlertMessage(msg messaging.AlertMessage) SendMessage(msg string) + SetNatsConnection(nc *nats.Conn) + SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) + SetTopic(topic string) SubscribeToAllAlerts() error IsEligibleForAction(chatID string) bool } diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index 678bf617..f42e5b46 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -19,17 +19,16 @@ func StartSubMessagingClient(ctx context.Context, natsServerURL string, bot Bot, return err } defer nc.Close() - - _, err = nc.Subscribe(messaging.PubSubTopic+scheme, func(msg *nats.Msg) { + bot.SetNatsConnection(nc) + bot.SetTopic(messaging.PubSubTopic + scheme) + alertHandlerFunc := func(msg *nats.Msg) { hndlErr := handleReceivedMessage(msg.Data, bot) if hndlErr != nil { zap.S().Errorf("failed to handle received message from pubsub server %v", hndlErr) } - }) - if err != nil { - zap.S().Fatalf("Failed to subscribe to block updates: %v", err) - return err } + bot.SetAlertHandlerFunc(alertHandlerFunc) + if subscrErr := bot.SubscribeToAllAlerts(); subscrErr != nil { return subscrErr } diff --git a/pkg/messaging/pubsub/server.go b/pkg/messaging/pubsub/server.go index 85a5fa8a..185fbe4d 100644 --- a/pkg/messaging/pubsub/server.go +++ b/pkg/messaging/pubsub/server.go @@ -49,7 +49,8 @@ func enterLoop(ctx context.Context, alerts <-chan entities.Alert, logger.Error("Failed to marshal binary alert message", zap.Error(err)) continue } - err = nc.Publish(messaging.PubSubTopic+scheme, data) + topic := messaging.PubSubTopic + scheme + err = nc.Publish(topic+string(alert.Type()), data) if err != nil { logger.Error("Failed to send alert to socket", zap.Error(err)) }