Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network switch mangos to nats #306

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Dockerfile-messaging-server
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
FROM golang:1.23.3-alpine3.20 as builder
ARG APP=/app
WORKDIR ${APP}

RUN apk add --no-cache make
# disable cgo for go build
ENV CGO_ENABLED=0

COPY go.mod .
COPY go.sum .

RUN go mod download

COPY Makefile .
COPY cmd cmd
COPY pkg pkg
COPY internal internal

RUN make build-messaging-server-linux-amd64

FROM alpine:3.20
ARG APP=/app
ENV TZ=Etc/UTC \
APP_USER=appuser

STOPSIGNAL SIGINT

RUN addgroup -S $APP_USER \
&& adduser -S $APP_USER -G $APP_USER

RUN apk add --no-cache bind-tools

USER $APP_USER
WORKDIR ${APP}
# Considered as a default HTTP API Port
EXPOSE 8080

COPY --from=builder ${APP}/build/linux-amd64/messaging-server ${APP}/messaging-server

ENTRYPOINT ["./messaging-server"]
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ build-bots-linux-amd64:

build-nodemon-linux-amd64:
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/linux-amd64/nodemon -ldflags="-X 'nodemon/internal.version=$(VERSION)'" ./cmd/nodemon

build-messaging-server-linux-amd64:
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/linux-amd64/messaging-server -ldflags="-X 'nodemon/internal.version=$(VERSION)'" ./cmd/messaging
33 changes: 20 additions & 13 deletions cmd/bots/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@ func main() {
}

type discordBotConfig struct {
nanomsgPubSubURL string
nanomsgPairURL string
discordBotToken string
discordChatID string
logLevel string
development bool
bindAddress string
natsPubSubURL string
natsPairURL string
discordBotToken string
discordChatID string
logLevel string
development bool
bindAddress string
scheme string
}

func newDiscordBotConfigConfig() *discordBotConfig {
c := new(discordBotConfig)
tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url",
"ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket")
tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-discord-url",
"ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket")
tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nats-pubsub-url",
"nats://127.0.0.1:4222", "NATS server URL for pubsub messaging")
tools.StringVarFlagWithEnv(&c.natsPairURL, "nats-pair-discord-url",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still necessary for discord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still type some commands as far as I remember

"nats://127.0.0.1:4222", "NATS server URL for pair messaging")
tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token",
"", "The secret token used to authenticate the bot")
tools.StringVarFlagWithEnv(&c.discordChatID, "discord-chat-id",
Expand All @@ -62,6 +63,8 @@ func newDiscordBotConfigConfig() *discordBotConfig {
tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.")
tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "",
"Local network address to bind the HTTP API of the service on.")
tools.StringVarFlagWithEnv(&c.scheme, "scheme",
"testnet", "Blockchain scheme i.e. mainnet, testnet, stagenet. Used in messaging service")
return c
}

Expand All @@ -70,6 +73,9 @@ func (c *discordBotConfig) validate(zap *zap.Logger) error {
zap.Error("discord bot token is required")
return common.ErrInvalidParameters
}
if c.scheme == "" {
zap.Error("the blockchain scheme must be specified")
}
if c.discordChatID == "" {
zap.Error("discord chat ID is required")
return common.ErrInvalidParameters
Expand Down Expand Up @@ -111,6 +117,7 @@ func runDiscordBot() error {
logger,
requestChan,
responseChan,
cfg.scheme,
)
if initErr != nil {
return errors.Wrap(initErr, "failed to init discord bot")
Expand Down Expand Up @@ -178,15 +185,15 @@ func runMessagingClients(
responseChan chan pair.Response,
) {
go func() {
clientErr := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, discordBotEnv, logger)
clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsPubSubURL, discordBotEnv, logger, cfg.scheme)
if clientErr != nil {
logger.Fatal("failed to start sub messaging client", zap.Error(clientErr))
return
}
}()

go func() {
err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, requestChan, responseChan, logger)
err := messaging.StartPairMessagingClient(ctx, cfg.natsPairURL, requestChan, responseChan, logger, cfg.scheme)
if err != nil {
logger.Fatal("failed to start pair messaging client", zap.Error(err))
return
Expand Down
99 changes: 67 additions & 32 deletions cmd/bots/internal/common/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (

"codnect.io/chrono"
"github.com/bwmarrin/discordgo"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/crypto"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol"
"go.uber.org/zap"
"gopkg.in/telebot.v3"
)
Expand All @@ -46,19 +45,25 @@ const (

var errUnknownAlertType = errors.New("received unknown alert type")

type AlertSubscription struct {
alertName entities.AlertName
subscription *nats.Subscription
}

type subscriptions struct {
mu *sync.RWMutex
subs map[entities.AlertType]entities.AlertName
subs map[entities.AlertType]AlertSubscription
}

func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName) {
func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName,
subscription *nats.Subscription) {
s.mu.Lock()
s.subs[alertType] = alertName
s.subs[alertType] = AlertSubscription{alertName, subscription}
s.mu.Unlock()
}

// Read returns alert name.
func (s *subscriptions) Read(alertType entities.AlertType) (entities.AlertName, bool) {
func (s *subscriptions) Read(alertType entities.AlertType) (AlertSubscription, bool) {
s.mu.RLock()
elem, ok := s.subs[alertType]
s.mu.RUnlock()
Expand All @@ -80,12 +85,15 @@ func (s *subscriptions) MapR(f func()) {
type DiscordBotEnvironment struct {
ChatID string
Bot *discordgo.Session
subSocket protocol.Socket
Subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
nc *nats.Conn
alertHandlerFunc func(msg *nats.Msg)
topic string
}

func NewDiscordBotEnvironment(
Expand All @@ -94,18 +102,20 @@ func NewDiscordBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *DiscordBotEnvironment {
return &DiscordBotEnvironment{
Bot: bot,
ChatID: chatID,
Subscriptions: subscriptions{
subs: make(map[entities.AlertType]entities.AlertName),
subs: make(map[entities.AlertType]AlertSubscription),
mu: new(sync.RWMutex),
},
zap: zap,
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -121,10 +131,6 @@ func (dscBot *DiscordBotEnvironment) Start() error {
return nil
}

func (dscBot *DiscordBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
dscBot.subSocket = subSocket
}

func (dscBot *DiscordBotEnvironment) SendMessage(msg string) {
_, err := dscBot.Bot.ChannelMessageSend(dscBot.ChatID, msg)
if err != nil {
Expand Down Expand Up @@ -189,16 +195,28 @@ func (dscBot *DiscordBotEnvironment) SendAlertMessage(msg generalMessaging.Alert
dscBot.unhandledAlertMessages.Add(alertID, messageID)
}

func (dscBot *DiscordBotEnvironment) SetNatsConnection(nc *nats.Conn) {
dscBot.nc = nc
}

func (dscBot *DiscordBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) {
dscBot.alertHandlerFunc = alertHandlerFunc
}

func (dscBot *DiscordBotEnvironment) SetTopic(topic string) {
dscBot.topic = topic
}

func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error {
for alertType, alertName := range entities.GetAllAlertTypesAndNames() {
if dscBot.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
subscription, err := dscBot.nc.Subscribe(dscBot.topic+string(alertType), dscBot.alertHandlerFunc)
if err != nil {
return err
return errors.Wrap(err, "failed to subscribe to alert")
}
dscBot.Subscriptions.Add(alertType, alertName)
dscBot.Subscriptions.Add(alertType, alertName, subscription)
dscBot.zap.Sugar().Infof("subscribed to %s", alertName)
}
return nil
Expand Down Expand Up @@ -244,13 +262,16 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) {
type TelegramBotEnvironment struct {
ChatID int64
Bot *telebot.Bot
Mute bool // If it used elsewhere, should be protected by mutex
subSocket protocol.Socket
Mute bool // If it used elsewhere, should be protected by mutex.
subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
nc *nats.Conn
alertHandlerFunc func(msg *nats.Msg)
topic string
}

func NewTelegramBotEnvironment(
Expand All @@ -260,19 +281,21 @@ func NewTelegramBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *TelegramBotEnvironment {
return &TelegramBotEnvironment{
Bot: bot,
ChatID: chatID,
Mute: mute,
subscriptions: subscriptions{
subs: make(map[entities.AlertType]entities.AlertName),
subs: make(map[entities.AlertType]AlertSubscription),
mu: new(sync.RWMutex),
},
zap: zap,
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -289,10 +312,6 @@ func (tgEnv *TelegramBotEnvironment) Start(ctx context.Context) error {
return nil
}

func (tgEnv *TelegramBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
tgEnv.subSocket = subSocket
}

func (tgEnv *TelegramBotEnvironment) SendAlertMessage(msg generalMessaging.AlertMessage) {
if tgEnv.Mute {
tgEnv.zap.Info("received an alert, but asleep now")
Expand Down Expand Up @@ -425,11 +444,11 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error {
if tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
subscription, err := tgEnv.nc.Subscribe(tgEnv.topic+string(alertType), tgEnv.alertHandlerFunc)
if err != nil {
return err
return errors.Wrap(err, "failed to subscribe to alert")
}
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.subscriptions.Add(alertType, alertName, subscription)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)
}

Expand All @@ -446,12 +465,13 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
subscription, err := tgEnv.nc.Subscribe(tgEnv.topic+string(alertType), tgEnv.alertHandlerFunc)
if err != nil {
return errors.Wrap(err, "failed to subscribe to alert")
}
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.subscriptions.Add(alertType, alertName, subscription)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)

return nil
}

Expand All @@ -464,20 +484,35 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale
if !tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)})
alertSub, ok := tgEnv.subscriptions.Read(alertType)
if !ok {
return errors.Errorf("subscription didn't exist even though I was subscribed to it")
}
err := alertSub.subscription.Unsubscribe()
if err != nil {
return errors.Wrap(err, "failed to unsubscribe from alert")
return errors.New("failed to unsubscribe from alert")
}
ok = tgEnv.IsAlreadySubscribed(alertType)
if !ok {
return errors.New("failed to unsubscribe from alert: was not subscribed to it")
return errors.New("tried to unsubscribe from alert, but still subscribed to it")
}
tgEnv.subscriptions.Delete(alertType)
tgEnv.zap.Sugar().Infof("Telegram bot unsubscribed from %s", alertName)
return nil
}

func (tgEnv *TelegramBotEnvironment) SetNatsConnection(nc *nats.Conn) {
tgEnv.nc = nc
}

func (tgEnv *TelegramBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) {
tgEnv.alertHandlerFunc = alertHandlerFunc
}

func (tgEnv *TelegramBotEnvironment) SetTopic(topic string) {
tgEnv.topic = topic
}

type subscribed struct {
AlertName string
}
Expand All @@ -501,7 +536,7 @@ func (tgEnv *TelegramBotEnvironment) SubscriptionsList() (string, error) {
var subscribedTo []subscribed
tgEnv.subscriptions.MapR(func() {
for _, alertName := range tgEnv.subscriptions.subs {
s := subscribed{AlertName: string(alertName) + "\n\n"}
s := subscribed{AlertName: string(alertName.alertName) + "\n\n"}
subscribedTo = append(subscribedTo, s)
}
})
Expand Down
Loading
Loading