-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
104 lines (89 loc) · 2.62 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"context"
"fmt"
"github.com/vadiminshakov/marti/config"
"log"
"os"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/vadiminshakov/marti/services/windowfinder"
"github.com/adshao/go-binance/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
restartWaitSec = 30
)
func main() {
apikey := os.Getenv("APIKEY")
if len(apikey) == 0 {
log.Fatal("APIKEY env is not set")
}
secretKey := os.Getenv("SECRETKEY")
if len(apikey) == 0 {
log.Fatal("SECRETKEY env is not set")
}
logger, _ := zap.NewProduction()
defer logger.Sync()
configs, err := config.Get()
if err != nil {
logger.Fatal("failed to get configuration", zap.Error(err))
}
binanceClient := binance.NewClient(apikey, secretKey)
g := new(errgroup.Group)
var timerStarted atomic.Bool
timerStarted.Store(false)
for _, c := range configs {
conf := c // save value for goroutine
g.Go(func() error {
for {
ctx, cancel := context.WithTimeout(context.Background(), conf.RebalanceInterval)
go timer(ctx, conf.RebalanceInterval, &timerStarted)
wf := windowfinder.NewBinanceWindowFinder(binanceClient, conf.Minwindow, conf.Pair, conf.StatHours)
fn, err := binanceTradeServiceCreator(logger, wf, binanceClient, conf.Pair, conf.Usebalance, conf.PollPriceInterval)
if err != nil {
logger.Error(fmt.Sprintf("failed to create binance trader service for pair %s, recreate instance after %ds", conf.Pair.String(),
restartWaitSec*2), zap.Error(err))
time.Sleep(restartWaitSec * 2 * time.Second)
continue
}
if err := fn(ctx); err != nil {
cancel()
if errors.Is(err, context.DeadlineExceeded) {
logger.Info("recreate instance", zap.String("pair", conf.Pair.String()))
continue
}
logger.Error(fmt.Sprintf("error, recreate instance for pair %s after %ds", conf.Pair.String(), restartWaitSec), zap.Error(err))
time.Sleep(restartWaitSec * time.Second)
continue
}
}
})
logger.Info("started", zap.String("pair", conf.Pair.String()))
}
if err := g.Wait(); err != nil {
logger.Error(err.Error())
}
}
// timer prints remaining time before rebalance.
func timer(ctx context.Context, recreateInterval time.Duration, timerStarted *atomic.Bool) {
if swapped := timerStarted.CompareAndSwap(false, true); !swapped {
return
}
startpoint := time.Now()
endpoint := startpoint.Add(recreateInterval)
for {
select {
case <-ctx.Done():
timerStarted.CompareAndSwap(true, false)
return
default:
remain := endpoint.Sub(time.Now())
fmt.Printf("%.0fs remaining before rebalance", remain.Seconds())
fmt.Print("\r")
time.Sleep(1 * time.Second)
}
}
}