Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced SMTP notification #5535

Merged
merged 33 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b11f2cc
Introduced SMTP notification
robert-ulbrich-mercedes-benz Jul 1, 2024
3296a1a
Fixing lint issues
robert-ulbrich-mercedes-benz Jul 3, 2024
d6bc1a4
Fixing tests
robert-ulbrich-mercedes-benz Jul 4, 2024
a740c8e
Fixing lint
robert-ulbrich-mercedes-benz Jul 4, 2024
4be6a0e
Fixing lint
robert-ulbrich-mercedes-benz Jul 5, 2024
4353452
Fix linting issues
robert-ulbrich-mercedes-benz Jul 15, 2024
9aa1b3e
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Jul 29, 2024
3d74ea9
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Aug 16, 2024
cada9d8
Fix go mod
robert-ulbrich-mercedes-benz Aug 16, 2024
fccc16a
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Sep 5, 2024
73bb305
Fixing location of no sec comment
robert-ulbrich-mercedes-benz Sep 5, 2024
c7901bc
Removing unused import
robert-ulbrich-mercedes-benz Sep 5, 2024
b265304
Running gci for file
robert-ulbrich-mercedes-benz Sep 9, 2024
eda4d3e
Implemented connection reuse for smtp emailer
robert-ulbrich-mercedes-benz Sep 10, 2024
445b979
Introduced SMTP notification
robert-ulbrich-mercedes-benz Jul 1, 2024
a720425
Fixing lint issues
robert-ulbrich-mercedes-benz Jul 3, 2024
0bf9654
Fixing tests
robert-ulbrich-mercedes-benz Jul 4, 2024
f513352
Fixing lint
robert-ulbrich-mercedes-benz Jul 4, 2024
1a0de4a
Fixing lint
robert-ulbrich-mercedes-benz Jul 5, 2024
dc88a99
Fix linting issues
robert-ulbrich-mercedes-benz Jul 15, 2024
b54a932
Fix go mod
robert-ulbrich-mercedes-benz Aug 16, 2024
5eac9ea
Fixing location of no sec comment
robert-ulbrich-mercedes-benz Sep 5, 2024
0926604
Removing unused import
robert-ulbrich-mercedes-benz Sep 5, 2024
9d72fac
Running gci for file
robert-ulbrich-mercedes-benz Sep 9, 2024
d995db7
Implemented connection reuse for smtp emailer
robert-ulbrich-mercedes-benz Sep 10, 2024
f04c0fd
Merge remote-tracking branch 'origin/smtp-notifications' into smtp-no…
robert-ulbrich-mercedes-benz Sep 11, 2024
5da05e1
Adapting interface of smtp_emailer
robert-ulbrich-mercedes-benz Sep 11, 2024
1050f4b
Fixing linter issue
robert-ulbrich-mercedes-benz Sep 12, 2024
6d0e4bf
Adding unit tests
robert-ulbrich-mercedes-benz Sep 12, 2024
39369ba
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Sep 12, 2024
8be8c46
Applying gci
robert-ulbrich-mercedes-benz Sep 13, 2024
64f3333
Faking mockery generation
robert-ulbrich-mercedes-benz Sep 16, 2024
50ce320
Using mocker 1.0.1
robert-ulbrich-mercedes-benz Sep 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
github.com/wI2L/jsondiff v0.5.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.24.0
golang.org/x/net v0.23.0
golang.org/x/oauth2 v0.16.0
golang.org/x/time v0.5.0
google.golang.org/api v0.155.0
Expand Down
14 changes: 9 additions & 5 deletions flyteadmin/pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"

"github.com/NYTimes/gizmo/pubsub"
gizmoAWS "github.com/NYTimes/gizmo/pubsub/aws"
gizmoGCP "github.com/NYTimes/gizmo/pubsub/gcp"
Expand Down Expand Up @@ -50,13 +52,15 @@ func CreateMsgChan() {
})
}

func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Emailer {
func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope, sm core.SecretManager) interfaces.Emailer {
// If an external email service is specified use that instead.
// TODO: Handling of this is messy, see https://github.com/flyteorg/flyte/issues/1063
if config.NotificationsEmailerConfig.EmailerConfig.ServiceName != "" {
switch config.NotificationsEmailerConfig.EmailerConfig.ServiceName {
case implementations.Sendgrid:
return implementations.NewSendGridEmailer(config, scope)
case implementations.SMTP:
return implementations.NewSMTPEmailer(context.Background(), config, scope, sm)
default:
panic(fmt.Errorf("No matching email implementation for %s", config.NotificationsEmailerConfig.EmailerConfig.ServiceName))
}
Expand Down Expand Up @@ -87,7 +91,7 @@ func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Sc
}
}

func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Processor {
func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope, sm core.SecretManager) interfaces.Processor {
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
var sub pubsub.Subscriber
Expand Down Expand Up @@ -119,7 +123,7 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
if err != nil {
panic(err)
}
emailer = GetEmailer(config, scope)
emailer = GetEmailer(config, scope, sm)
return implementations.NewProcessor(sub, emailer, scope)
case common.GCP:
projectID := config.GCPConfig.ProjectID
Expand All @@ -135,10 +139,10 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
if err != nil {
panic(err)
}
emailer = GetEmailer(config, scope)
emailer = GetEmailer(config, scope, sm)
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Sandbox:
emailer = GetEmailer(config, scope)
emailer = GetEmailer(config, scope, sm)
return implementations.NewSandboxProcessor(msgChan, emailer)
case common.Local:
fallthrough
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/pkg/async/notifications/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var (
Expand Down Expand Up @@ -38,7 +38,7 @@ func TestGetEmailer(t *testing.T) {
},
}

GetEmailer(cfg, promutils.NewTestScope())
GetEmailer(cfg, promutils.NewTestScope(), &mocks.SecretManager{})

// shouldn't reach here
t.Errorf("did not panic")
Expand All @@ -47,7 +47,7 @@ func TestGetEmailer(t *testing.T) {
func TestNewNotificationPublisherAndProcessor(t *testing.T) {
testSandboxPublisher := NewNotificationsPublisher(notificationsConfig, scope)
assert.IsType(t, testSandboxPublisher, &implementations.SandboxPublisher{})
testSandboxProcessor := NewNotificationsProcessor(notificationsConfig, scope)
testSandboxProcessor := NewNotificationsProcessor(notificationsConfig, scope, &mocks.SecretManager{})
assert.IsType(t, testSandboxProcessor, &implementations.SandboxProcessor{})

go func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ type ExternalEmailer = string

const (
Sendgrid ExternalEmailer = "sendgrid"
SMTP ExternalEmailer = "smtp"
)
138 changes: 138 additions & 0 deletions flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package implementations

import (
"crypto/tls"
"fmt"
"net/smtp"
"strings"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)

type SMTPEmailer struct {
config *runtimeInterfaces.NotificationsEmailerConfig
systemMetrics emailMetrics
tlsConf *tls.Config
auth *smtp.Auth
}

func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error {

newClient, err := smtp.Dial(s.config.EmailerConfig.SMTPServer + ":" + s.config.EmailerConfig.SMTPPort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, this is the intended behavior? If admin needs to send a thousand emails, it has to make a new client each time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intended behavior. When using the built-in email client of Go, most examples you find work that way. Also with the integration into the Notifications processor this is the simplest approach. Establishing a connection does not seem to be a very complex undertaking when looking at the source code of the email client.

I agree that keeping the connection is more efficient. But it also makes the code more complex, because the connection state needs to be maintained over an instance - so whenever an email is meant to be sent, it first needs to be checked if the connection is still alive.

If you want me to, I can refactor it to only open a connection once to the SMTP server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked again the internal implementation of Golang's smtp client: It appears that the client implementation seems to be not multi-thread-safe. Every client instance keeps an internal state between function calls - so when reusing the same client while sending an email for writing another mail can lead to weird behavior.

My assumption is that there is only a single email processor per Flyte Admin instance. That means that there should be no concurrent email processing on any Flyte Admin instance, which makes it safe to reuse an existing Flyste client.


if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err))
}

defer newClient.Close()

if err = newClient.Hello("localhost"); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err))
}

if ok, _ := newClient.Extension("STARTTLS"); ok {
if err = newClient.StartTLS(s.tlsConf); err != nil {
return err
}
}

if ok, _ := newClient.Extension("AUTH"); ok {
if err = newClient.Auth(*s.auth); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err))
}
}

if err = newClient.Mail(email.SenderEmail); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating email instance: %s", err))
}

for _, recipient := range email.RecipientsEmail {
if err = newClient.Rcpt(recipient); err != nil {
logger.Errorf(ctx, "Error adding email recipient: %s", err)
}
}

writer, err := newClient.Data()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error adding email recipient: %s", err))
}

_, err = writer.Write([]byte(createMailBody(s.config.Sender, email)))

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error writing mail body: %s", err))
}

err = writer.Close()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error closing mail body: %s", err))
}

err = newClient.Quit()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error quitting mail agent: %s", err))
}

s.systemMetrics.SendSuccess.Inc()
return nil
}

func (s *SMTPEmailer) emailError(ctx context.Context, error string) error {
s.systemMetrics.SendError.Inc()
logger.Error(ctx, error)
return errors.NewFlyteAdminErrorf(codes.Internal, "errors were seen while sending emails")
}

func createMailBody(emailSender string, email admin.EmailMessage) string {
headerMap := make(map[string]string)
headerMap["From"] = emailSender
headerMap["To"] = strings.Join(email.RecipientsEmail, ",")
headerMap["Subject"] = email.SubjectLine
headerMap["Content-Type"] = "text/html; charset=\"UTF-8\""

mailMessage := ""

for k, v := range headerMap {
mailMessage += fmt.Sprintf("%s: %s\r\n", k, v)
}

mailMessage += "\r\n" + email.Body

return mailMessage
}

func NewSMTPEmailer(ctx context.Context, config runtimeInterfaces.NotificationsConfig, scope promutils.Scope, sm core.SecretManager) interfaces.Emailer {
var tlsConfiguration *tls.Config
emailConf := config.NotificationsEmailerConfig.EmailerConfig

smtpPassword, err := sm.Get(ctx, emailConf.SMTPPasswordSecretName)
if err != nil {
logger.Debug(ctx, "No SMTP password found.")
smtpPassword = ""
}

auth := smtp.PlainAuth("", emailConf.SMTPUsername, smtpPassword, emailConf.SMTPServer)

tlsConfiguration = &tls.Config{
InsecureSkipVerify: emailConf.SMTPSkipTLSVerify, // #nosec G402
ServerName: emailConf.SMTPServer,
}

return &SMTPEmailer{
config: &config.NotificationsEmailerConfig,
systemMetrics: newEmailMetrics(scope.NewSubScope("smtp")),
tlsConf: tlsConfiguration,
auth: &auth,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package implementations

import (
"context"
"testing"

"github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func getNotificationsEmailerConfig() interfaces.NotificationsConfig {
return interfaces.NotificationsConfig{
Type: "",
Region: "",
AWSConfig: interfaces.AWSConfig{},
GCPConfig: interfaces.GCPConfig{},
NotificationsPublisherConfig: interfaces.NotificationsPublisherConfig{},
NotificationsProcessorConfig: interfaces.NotificationsProcessorConfig{},
NotificationsEmailerConfig: interfaces.NotificationsEmailerConfig{
EmailerConfig: interfaces.EmailServerConfig{
ServiceName: SMTP,
SMTPServer: "smtpServer",
SMTPPort: "smtpPort",
SMTPUsername: "smtpUsername",
SMTPPasswordSecretName: "smtp_password",
},
Subject: "subject",
Sender: "sender",
Body: "body"},
ReconnectAttempts: 1,
ReconnectDelaySeconds: 2}
}

func TestEmailCreation(t *testing.T) {
email := admin.EmailMessage{
RecipientsEmail: []string{"[email protected]", "[email protected]"},
SubjectLine: "subject",
Body: "Email Body",
SenderEmail: "[email protected]",
}

body := createMailBody("[email protected]", email)
assert.Equal(t, "From: [email protected]\r\nTo: [email protected],[email protected]\r\nSubject: subject\r\nContent-Type: text/html; charset=\"UTF-8\"\r\n\r\nEmail Body", body)
}

func TestNewSmtpEmailer(t *testing.T) {
secretManagerMock := mocks.SecretManager{}
secretManagerMock.On("Get", mock.Anything, "smtp_password").Return("password", nil)

notificationsConfig := getNotificationsEmailerConfig()

smtpEmailer := NewSMTPEmailer(context.Background(), notificationsConfig, promutils.NewTestScope(), &secretManagerMock)

assert.NotNil(t, smtpEmailer)
}
6 changes: 4 additions & 2 deletions flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
workflowengineImpl "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/impl"
"github.com/flyteorg/flyte/flyteadmin/plugins"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/golang/protobuf/proto"

Check failure on line 27 in flyteadmin/pkg/rpc/adminservice/base.go

View workflow job for this annotation

GitHub Actions / compile

"github.com/golang/protobuf/proto" imported and not used
)

type AdminService struct {
Expand All @@ -45,7 +47,7 @@
const defaultRetries = 3

func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, configuration runtimeIfaces.Configuration,
kubeConfig, master string, dataStorageClient *storage.DataStore, adminScope promutils.Scope) *AdminService {
kubeConfig, master string, dataStorageClient *storage.DataStore, adminScope promutils.Scope, sm core.SecretManager) *AdminService {
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()

panicCounter := adminScope.MustNewCounter("initialization_panic",
Expand Down Expand Up @@ -81,7 +83,7 @@
pluginRegistry.RegisterDefault(plugins.PluginIDWorkflowExecutor, workflowExecutor)

publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope, sm)
eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope)
go func() {
logger.Info(ctx, "Started processing notifications.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,13 @@ type NotificationsProcessorConfig struct {
type EmailServerConfig struct {
ServiceName string `json:"serviceName"`
// Only one of these should be set.
APIKeyEnvVar string `json:"apiKeyEnvVar"`
APIKeyFilePath string `json:"apiKeyFilePath"`
APIKeyEnvVar string `json:"apiKeyEnvVar"`
APIKeyFilePath string `json:"apiKeyFilePath"`
SMTPServer string `json:"smtpServer"`
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
SMTPPort string `json:"smtpPort"`
SMTPSkipTLSVerify bool `json:"smtpSkipTLSVerify"`
SMTPUsername string `json:"smtpUsername"`
SMTPPasswordSecretName string `json:"smtpPasswordSecretName"`
}

// This section handles the configuration of notifications emails.
Expand Down
Loading
Loading