Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network switch mangos to nats #306

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Dockerfile-messaging-server
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
FROM golang:1.23.3-alpine3.20 as builder
ARG APP=/app
WORKDIR ${APP}

RUN apk add --no-cache make
# disable cgo for go build
ENV CGO_ENABLED=0

COPY go.mod .
COPY go.sum .

RUN go mod download

COPY Makefile .
COPY cmd cmd
COPY pkg pkg
COPY internal internal

RUN make build-messaging-server-linux-amd64

FROM alpine:3.20
ARG APP=/app
ENV TZ=Etc/UTC \
APP_USER=appuser

STOPSIGNAL SIGINT

RUN addgroup -S $APP_USER \
&& adduser -S $APP_USER -G $APP_USER

RUN apk add --no-cache bind-tools

USER $APP_USER
WORKDIR ${APP}
# Considered as a default HTTP API Port
EXPOSE 8080

COPY --from=builder ${APP}/build/linux-amd64/messaging-server ${APP}/messaging-server

ENTRYPOINT ["./messaging-server"]
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ build-bots-linux-amd64:

build-nodemon-linux-amd64:
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/linux-amd64/nodemon -ldflags="-X 'nodemon/internal.version=$(VERSION)'" ./cmd/nodemon

build-messaging-server-linux-amd64:
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/linux-amd64/messaging-server -ldflags="-X 'nodemon/internal.version=$(VERSION)'" ./cmd/messaging
33 changes: 20 additions & 13 deletions cmd/bots/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@ func main() {
}

type discordBotConfig struct {
nanomsgPubSubURL string
nanomsgPairURL string
discordBotToken string
discordChatID string
logLevel string
development bool
bindAddress string
natsPubSubURL string
natsPairURL string
discordBotToken string
discordChatID string
logLevel string
development bool
bindAddress string
scheme string
}

func newDiscordBotConfigConfig() *discordBotConfig {
c := new(discordBotConfig)
tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url",
"ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket")
tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-discord-url",
"ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket")
tools.StringVarFlagWithEnv(&c.natsPubSubURL, "nats-pubsub-url",
"nats://127.0.0.1:4222", "NATS server URL for pubsub messaging")
tools.StringVarFlagWithEnv(&c.natsPairURL, "nats-pair-discord-url",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still necessary for discord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still type some commands as far as I remember

"nats://127.0.0.1:4222", "NATS server URL for pair messaging")
tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token",
"", "The secret token used to authenticate the bot")
tools.StringVarFlagWithEnv(&c.discordChatID, "discord-chat-id",
Expand All @@ -62,6 +63,8 @@ func newDiscordBotConfigConfig() *discordBotConfig {
tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.")
tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "",
"Local network address to bind the HTTP API of the service on.")
tools.StringVarFlagWithEnv(&c.scheme, "scheme",
"testnet", "Blockchain scheme i.e. mainnet, testnet, stagenet. Used in messaging service")
return c
}

Expand All @@ -70,6 +73,9 @@ func (c *discordBotConfig) validate(zap *zap.Logger) error {
zap.Error("discord bot token is required")
return common.ErrInvalidParameters
}
if c.scheme == "" {
zap.Error("the blockchain scheme must be specified")
}
if c.discordChatID == "" {
zap.Error("discord chat ID is required")
return common.ErrInvalidParameters
Expand Down Expand Up @@ -111,6 +117,7 @@ func runDiscordBot() error {
logger,
requestChan,
responseChan,
cfg.scheme,
)
if initErr != nil {
return errors.Wrap(initErr, "failed to init discord bot")
Expand Down Expand Up @@ -178,15 +185,15 @@ func runMessagingClients(
responseChan chan pair.Response,
) {
go func() {
clientErr := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, discordBotEnv, logger)
clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsPubSubURL, discordBotEnv, logger, cfg.scheme)
if clientErr != nil {
logger.Fatal("failed to start sub messaging client", zap.Error(clientErr))
return
}
}()

go func() {
err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, requestChan, responseChan, logger)
err := messaging.StartPairMessagingClient(ctx, cfg.natsPairURL, requestChan, responseChan, logger, cfg.scheme)
if err != nil {
logger.Fatal("failed to start pair messaging client", zap.Error(err))
return
Expand Down
56 changes: 26 additions & 30 deletions cmd/bots/internal/common/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/bwmarrin/discordgo"
"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/crypto"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol"
"go.uber.org/zap"
"gopkg.in/telebot.v3"
)
Expand Down Expand Up @@ -80,12 +78,12 @@ func (s *subscriptions) MapR(f func()) {
type DiscordBotEnvironment struct {
ChatID string
Bot *discordgo.Session
subSocket protocol.Socket
Subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
}

func NewDiscordBotEnvironment(
Expand All @@ -94,6 +92,7 @@ func NewDiscordBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *DiscordBotEnvironment {
return &DiscordBotEnvironment{
Bot: bot,
Expand All @@ -106,6 +105,7 @@ func NewDiscordBotEnvironment(
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -121,10 +121,6 @@ func (dscBot *DiscordBotEnvironment) Start() error {
return nil
}

func (dscBot *DiscordBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
dscBot.subSocket = subSocket
}

func (dscBot *DiscordBotEnvironment) SendMessage(msg string) {
_, err := dscBot.Bot.ChannelMessageSend(dscBot.ChatID, msg)
if err != nil {
Expand Down Expand Up @@ -194,10 +190,10 @@ func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error {
if dscBot.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
if err != nil {
return err
}
// err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file you viewed is outdated

// if err != nil {
// return err
// }
dscBot.Subscriptions.Add(alertType, alertName)
dscBot.zap.Sugar().Infof("subscribed to %s", alertName)
}
Expand Down Expand Up @@ -244,13 +240,13 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) {
type TelegramBotEnvironment struct {
ChatID int64
Bot *telebot.Bot
Mute bool // If it used elsewhere, should be protected by mutex
subSocket protocol.Socket
Mute bool // If it used elsewhere, should be protected by mutex.
subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
}

func NewTelegramBotEnvironment(
Expand All @@ -260,6 +256,7 @@ func NewTelegramBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *TelegramBotEnvironment {
return &TelegramBotEnvironment{
Bot: bot,
Expand All @@ -273,6 +270,7 @@ func NewTelegramBotEnvironment(
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -289,10 +287,6 @@ func (tgEnv *TelegramBotEnvironment) Start(ctx context.Context) error {
return nil
}

func (tgEnv *TelegramBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
tgEnv.subSocket = subSocket
}

func (tgEnv *TelegramBotEnvironment) SendAlertMessage(msg generalMessaging.AlertMessage) {
if tgEnv.Mute {
tgEnv.zap.Info("received an alert, but asleep now")
Expand Down Expand Up @@ -425,10 +419,11 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error {
if tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
if err != nil {
return err
}
// todo fix this. send (topic, handlerFunc) into this function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

// err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}).
// if err != nil {.
// return err.
// }.
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)
}
Expand All @@ -446,10 +441,11 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
if err != nil {
return errors.Wrap(err, "failed to subscribe to alert")
}
// todo fix this. send (topic, handlerFunc) into this function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

// err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)}).
// if err != nil {.
// return errors.Wrap(err, "failed to subscribe to alert").
// }.
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)
return nil
Expand All @@ -464,11 +460,11 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale
if !tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)})
if err != nil {
return errors.Wrap(err, "failed to unsubscribe from alert")
}
// TODO fix this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly should be fixed?

// err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)})
// if err != nil {
// return errors.Wrap(err, "failed to unsubscribe from alert")
// }
ok = tgEnv.IsAlreadySubscribed(alertType)
if !ok {
return errors.New("failed to unsubscribe from alert: was not subscribed to it")
Expand Down
6 changes: 4 additions & 2 deletions cmd/bots/internal/common/initial/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func InitTgBot(behavior string,
logger *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) (*common.TelegramBotEnvironment, error) {
botSettings, err := config.NewTgBotSettings(behavior, webhookLocalAddress, publicURL, botToken)
if err != nil {
Expand All @@ -31,7 +32,7 @@ func InitTgBot(behavior string,

logger.Sugar().Debugf("telegram chat id for sending alerts is %d", chatID)

tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType)
tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType, scheme)
return tgBotEnv, nil
}

Expand All @@ -41,6 +42,7 @@ func InitDiscordBot(
logger *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) (*common.DiscordBotEnvironment, error) {
bot, err := discordgo.New("Bot " + botToken)
if err != nil {
Expand All @@ -49,6 +51,6 @@ func InitDiscordBot(
logger.Sugar().Debugf("discord chat id for sending alerts is %s", chatID)

bot.Identify.Intents = discordgo.IntentsGuildMessages | discordgo.IntentsMessageContent
dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType)
dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType, scheme)
return dscBotEnv, nil
}
3 changes: 0 additions & 3 deletions cmd/bots/internal/common/messaging/bots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package messaging

import (
"nodemon/pkg/messaging"

"go.nanomsg.org/mangos/v3/protocol"
)

type Bot interface {
SendAlertMessage(msg messaging.AlertMessage)
SendMessage(msg string)
SubscribeToAllAlerts() error
SetSubSocket(subSocket protocol.Socket)
IsEligibleForAction(chatID string) bool
}
Loading
Loading