Skip to content

Commit

Permalink
feat: change logging, add MySQL email forward lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
d--j committed Apr 3, 2023
1 parent 25280d9 commit 400c181
Show file tree
Hide file tree
Showing 21 changed files with 386 additions and 174 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
GO_MILTER_INTEGRATION_DIR := $(shell cd integration && go list -f '{{.Dir}}' github.com/d--j/go-milter/integration)

integration:
docker build -q --progress=plain -t go-milter-integration "$(GO_MILTER_INTEGRATION_DIR)/docker" && \
docker build -q -t go-milter-integration "$(GO_MILTER_INTEGRATION_DIR)/docker" && \
docker run --rm --hostname=mx.example.com -w /usr/src/root/integration -v $(PWD):/usr/src/root go-milter-integration \
go run github.com/d--j/go-milter/integration/runner -filter '.*' -mtaFilter '.*' ./tests

Expand Down
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ localDomains:
- 'example.com'
```
You can also specify an optional MySQL query for email forwarding lookups:
```yaml
# Optional: You can specify a MySQL connection/query to lookup mail forwarding replacements
dbDriver: 'mysql'
dbDSN: 'user:password@tcp(host:port)/dbname'
dbForwardQuery: "SELECT destination from mail_forwarding WHERE source = ? AND active = 'y' AND server_id = 1;"
```
If your machine does not have public IP addresses (NATed/firewalled) or you deployed the milter on another machine, you
need to specify the IPs that we check against the SPF records. These IPs should be the IPs that get used for outgoing
SMTP connections.
Expand All @@ -75,9 +84,9 @@ You can use command line parameters to change this default:
```
$ srs-milter -help
Usage of ./srs-milter:
-addr string
-milterAddr string
Bind to address/port or unix domain socket path (default "127.0.0.1:10382")
-proto string
-milterProto unix or tcp
Protocol family (unix or tcp) (default "tcp")
-forward email
email to do forward SRS lookup for. If specified the milter will not be started.
Expand All @@ -98,6 +107,8 @@ Add this to `/etc/postfix/main.cf`
smtpd_milters = inet:127.0.0.1:10382
non_smtpd_milters = inet:127.0.0.1:10382
milter_protocol = 6
# so that SRS bounces can be accepted - otherwise Postfix will reject the bounces
mydestination = $myhostname, srs.example.com
```
If you already have milters defined (e.g. Rspamd),
Expand Down
2 changes: 1 addition & 1 deletion cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package srsMilter
package srsmilter

import (
"time"
Expand Down
2 changes: 1 addition & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package srsMilter
package srsmilter

import (
"net"
Expand Down
18 changes: 5 additions & 13 deletions cmd/srs-milter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package main

import (
"errors"
"log"
"net"
"reflect"
"strings"

"github.com/d--j/srs-milter"
_ "github.com/go-sql-driver/mysql"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"golang.org/x/net/idna"
Expand Down Expand Up @@ -52,32 +52,30 @@ func determineExternalIPs() ([]net.IP, error) {

func ipsToString(ips []net.IP) string {
var s strings.Builder
s.WriteString(`"`)
for i, ip := range ips {
if i > 0 {
s.WriteString(",")
}
s.WriteString(ip.String())
}
s.WriteString(`"`)
return s.String()
}

func loadViperConfig() (*srsMilter.Configuration, error) {
var conf srsMilter.Configuration
func loadViperConfig() (*srsmilter.Configuration, error) {
var conf srsmilter.Configuration
err := viper.Unmarshal(&conf, viper.DecodeHook(mapstructure.StringToIPHookFunc()), viper.DecodeHook(func(
f reflect.Type,
t reflect.Type,
data interface{}) (interface{}, error) {
if f.Kind() != reflect.String {
return data, nil
}
if t != reflect.TypeOf(srsMilter.Domain("")) {
if t != reflect.TypeOf(srsmilter.Domain("")) {
return data, nil
}

asciiDomain, err := idna.Lookup.ToASCII(data.(string))
return srsMilter.Domain(asciiDomain), err
return srsmilter.Domain(asciiDomain), err
}), viper.DecodeHook(func(
f reflect.Type,
t reflect.Type,
Expand Down Expand Up @@ -106,11 +104,5 @@ func loadViperConfig() (*srsMilter.Configuration, error) {
return nil, err
}
}
if len(conf.LocalDomains) == 0 {
log.Printf("warn=\"local domain list is empty: only relying on SPF lookups\"")
}
if conf.LogLevel > 0 {
log.Printf("info=\"config loaded\" srsDomain=%q numSrsKeys=%d numLocalDomains=%d localIPs=%s", conf.SrsDomain, len(conf.SrsKeys), len(conf.LocalDomains), ipsToString(conf.LocalIps))
}
return &conf, nil
}
24 changes: 24 additions & 0 deletions cmd/srs-milter/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"github.com/go-logfmt/logfmt"
"github.com/inconshreveable/log15"
)

func LogfmtFormatWithTime() log15.Format {
return log15.FormatFunc(func(r *log15.Record) []byte {
common := []interface{}{r.KeyNames.Time, r.Time, r.KeyNames.Lvl, r.Lvl, r.KeyNames.Msg, r.Msg}
b, _ := logfmt.MarshalKeyvals(append(common, r.Ctx...)...)
b = append(b, '\n')
return b
})
}

func LogfmtFormatWithoutTime() log15.Format {
return log15.FormatFunc(func(r *log15.Record) []byte {
common := []interface{}{r.KeyNames.Lvl, r.Lvl, r.KeyNames.Msg, r.Msg}
b, _ := logfmt.MarshalKeyvals(append(common, r.Ctx...)...)
b = append(b, '\n')
return b
})
}
102 changes: 72 additions & 30 deletions cmd/srs-milter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package main
import (
"context"
"flag"
"log"
"os"
"sync"

"github.com/d--j/go-milter/mailfilter"
"github.com/d--j/srs-milter"
"github.com/fsnotify/fsnotify"
"github.com/inconshreveable/log15"
"github.com/spf13/viper"
)

var RuntimeConfig *srsMilter.Configuration
var RuntimeCache *srsMilter.Cache
var RuntimeConfig *srsmilter.Configuration
var RuntimeCache *srsmilter.Cache
var RuntimeConfigMutex sync.RWMutex
var LogHandler log15.Handler

var (
version = "dev"
Expand All @@ -26,13 +28,13 @@ var (
func main() {
// parse commandline arguments
var systemd bool
var protocol, address, forward, reverse string
flag.StringVar(&protocol,
"proto",
var milterProtocol, milterAddress, forward, reverse string
flag.StringVar(&milterProtocol,
"milterProto",
"tcp",
"Protocol family (unix or tcp)")
flag.StringVar(&address,
"addr",
"Protocol family (`unix or tcp`)")
flag.StringVar(&milterAddress,
"milterAddr",
"127.0.0.1:10382",
"Bind to address/port or unix domain socket path")
flag.StringVar(&forward,
Expand All @@ -48,37 +50,74 @@ func main() {

// disable logging date/time when called as systemd service – journald will add those anyway
if systemd {
log.Default().SetFlags(0)
LogHandler = log15.StreamHandler(os.Stdout, LogfmtFormatWithoutTime())
} else {
LogHandler = log15.StreamHandler(os.Stdout, LogfmtFormatWithTime())
}
logger := log15.New()
logger.SetHandler(LogHandler)

log.Printf("info=\"start\" version=%q commit=%q buildDate=%q", version, commit, date)
logger.Info("start", log15.Ctx{"version": version, "commit": commit, "build": date})

// make sure the specified protocol is either unix or tcp
if protocol != "unix" && protocol != "tcp" {
log.Fatal("invalid protocol name")
if milterProtocol != "unix" && milterProtocol != "tcp" {
logger.Crit("invalid protocol name", "protocol", milterProtocol)
os.Exit(1)
}

var err error
viper.SetConfigName("srs-milter")
viper.AddConfigPath("/etc/srs-milter")
viper.AddConfigPath(".")
if err = viper.ReadInConfig(); err != nil {
log.Fatal(err)
logger.Crit("error reading config file", log15.Ctx{"err": err})
os.Exit(1)
}
RuntimeConfig, err = loadViperConfig()
if err != nil {
log.Fatal(err)
logger.Crit("error parsing config file", log15.Ctx{"err": err})
os.Exit(1)
}
RuntimeConfig.Setup()
RuntimeCache = srsMilter.NewCache(RuntimeConfig)
err = RuntimeConfig.Setup()
if err != nil {
logger.Crit("error in config file", log15.Ctx{"err": err})
os.Exit(1)
}
RuntimeCache = srsmilter.NewCache(RuntimeConfig)
configureLogging := func() {
if systemd {
LogHandler = log15.StreamHandler(os.Stdout, LogfmtFormatWithoutTime())
} else {
LogHandler = log15.StreamHandler(os.Stdout, LogfmtFormatWithTime())
}
switch RuntimeConfig.LogLevel {
case 0:
LogHandler = log15.LvlFilterHandler(log15.LvlCrit, LogHandler)
case 1:
LogHandler = log15.LvlFilterHandler(log15.LvlError, LogHandler)
case 2:
LogHandler = log15.LvlFilterHandler(log15.LvlWarn, LogHandler)
case 3:
LogHandler = log15.LvlFilterHandler(log15.LvlInfo, LogHandler)
default:
LogHandler = log15.LvlFilterHandler(log15.LvlDebug, LogHandler)
}
logger.SetHandler(LogHandler)
srsmilter.Log.SetHandler(LogHandler)
logger.Info("config loaded", log15.Ctx{"srsDomain": RuntimeConfig.SrsDomain, "localIps": ipsToString(RuntimeConfig.LocalIps), "numKeys": len(RuntimeConfig.SrsKeys), "numLocalDomains": len(RuntimeConfig.LocalDomains)})
if len(RuntimeConfig.LocalDomains) == 0 {
logger.Warn("local domain list is empty: only relying on SPF lookups")
}
}
configureLogging()

if forward != "" {
srsAddress, err := srsMilter.ForwardSrs(forward, RuntimeConfig)
log.Printf("address=<%s> srsAddress=<%s> error=%v", forward, srsAddress, err)
srsAddress, err := srsmilter.ForwardSrs(forward, RuntimeConfig)
logger.Info("forward SRS", log15.Ctx{"ofrom": forward, "from": srsAddress, "err": err})
}
if reverse != "" {
address, err := srsMilter.ReverseSrs(reverse, RuntimeConfig)
log.Printf("srsAddress=<%s> address=<%s> error=%v", reverse, address, err)
address, err := srsmilter.ReverseSrs(reverse, RuntimeConfig)
logger.Info("reverse SRS", log15.Ctx{"oto": reverse, "to": address, "err": err})
}
if forward != "" || reverse != "" {
return
Expand All @@ -87,30 +126,33 @@ func main() {
viper.OnConfigChange(func(_ fsnotify.Event) {
newConfig, err := loadViperConfig()
if err != nil {
log.Printf("warn=\"could not load new config on change\" error=%q", err)
logger.Error("could not load new config on change", "err", err)
} else {
newConfig.Setup()
err = newConfig.Setup()
if err != nil {
logger.Error("could not load new config on change", "err", err)
}
RuntimeConfigMutex.Lock()
RuntimeConfig = newConfig
RuntimeCache = srsMilter.NewCache(RuntimeConfig)
RuntimeCache = srsmilter.NewCache(RuntimeConfig)
configureLogging()
RuntimeConfigMutex.Unlock()
}
})
viper.WatchConfig()

filter, err := mailfilter.New(protocol, address, func(ctx context.Context, trx mailfilter.Trx) (mailfilter.Decision, error) {
filter, err := mailfilter.New(milterProtocol, milterAddress, func(ctx context.Context, trx mailfilter.Trx) (mailfilter.Decision, error) {
RuntimeConfigMutex.RLock()
config := RuntimeConfig
cache := RuntimeCache
RuntimeConfigMutex.RUnlock()
return srsMilter.Filter(ctx, trx, config, cache)
return srsmilter.Filter(ctx, trx, config, cache)
}, mailfilter.WithDecisionAt(mailfilter.DecisionAtEndOfHeaders))
if err != nil {
log.Fatal(err)
}
if RuntimeConfig.LogLevel > 0 {
log.Printf("info=\"ready\" network=%q address=%q", filter.Addr().Network(), filter.Addr().String())
logger.Crit("error creating milter", "err", err)
os.Exit(1)
}
logger.Info("milter ready", log15.Ctx{"network": filter.Addr().Network(), "address": filter.Addr().String()})

// quit when milter quits
filter.Wait()
Expand Down
Loading

0 comments on commit 400c181

Please sign in to comment.