-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.go
219 lines (195 loc) · 5.36 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package lori
import (
"context"
"errors"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"github.com/cr-mao/lori/log"
"github.com/cr-mao/lori/registry"
"github.com/cr-mao/lori/transport"
)
// AppInfo is application context value.
type AppInfo interface {
ID() string // app id
Name() string // 应用名
Version() string // 应用版本
Metadata() map[string]string // meta信息
Endpoint() []string // 暴露端点
}
type App struct {
opts options
ctx context.Context
locker sync.Mutex
instance *registry.ServiceInstance
cancel func()
}
func New(opts ...Option) *App {
o := options{
ctx: context.Background(),
sigs: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT},
registrarTimeout: 10 * time.Second,
stopTimeout: 10 * time.Second, //注意长链接,这个时间要给足够长。
}
if id, err := uuid.NewUUID(); err == nil {
o.id = id.String()
}
for _, opt := range opts {
opt(&o)
}
if o.logger != nil {
log.SetLogger(o.logger)
}
ctx, cancel := context.WithCancel(o.ctx)
return &App{
ctx: ctx,
cancel: cancel,
opts: o,
}
}
// Run executes all OnStart hooks registered with the application's Lifecycle.
func (a *App) Run() error {
//app 实例信息
instance, err := a.buildInstance()
if err != nil {
return err
}
a.locker.Lock()
a.instance = instance
a.locker.Unlock()
sctx := NewContext(a.ctx, a)
eg, ctx := errgroup.WithContext(sctx)
wg := sync.WaitGroup{}
for _, fn := range a.opts.beforeStart {
if err = fn(sctx); err != nil {
return err
}
}
for _, srv := range a.opts.servers {
// 复制一个,防止..
server := srv
eg.Go(func() error {
<-ctx.Done() // wait for stop signal , 收到信号后走Stop方法,然后把app的context cancel掉,那么它的子context就会关闭。
// 现在是收到信号后,里面进行关闭 todo 确认是否能这样。
stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout)
defer cancel()
return server.Stop(stopCtx) //执行server的stop 下面的start 方法会停止阻塞。
})
wg.Add(1)
// 在go程中异步start
eg.Go(func() error {
// 服务启动 是不用一定一定先于注册, 注册中心自己会自己进行探测
wg.Done() // here is to ensure server start has begun running before register, so defer is not needed
return server.Start(sctx)
})
}
//等待所有server启动
wg.Wait()
if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
defer rcancel()
// 进行注册
if err = a.opts.registrar.Register(rctx, instance); err != nil {
return err
}
}
// after start 钩子函数
for _, fn := range a.opts.afterStart {
if err = fn(sctx); err != nil {
return err
}
}
c := make(chan os.Signal, 1)
signal.Notify(c, a.opts.sigs...)
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
case <-c:
return a.Stop()
}
})
// 阻塞等待 直到有错误发生。 非canceld的 则是错的
if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
for _, fn := range a.opts.afterStop {
err = fn(sctx)
}
return err
}
// Stop gracefully stops the application.
func (a *App) Stop() (err error) {
sctx := NewContext(a.ctx, a)
for _, fn := range a.opts.beforeStop {
err = fn(sctx)
}
// 启动和关闭其实是异步的 ,加锁比较好。
a.locker.Lock()
instance := a.instance
a.locker.Unlock()
if a.opts.registrar != nil && instance != nil {
ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registrarTimeout)
defer cancel()
if err = a.opts.registrar.Deregister(ctx, instance); err != nil {
return err
}
}
if a.cancel != nil {
a.cancel()
}
return err
}
func (a *App) buildInstance() (*registry.ServiceInstance, error) {
endpoints := make([]string, 0, len(a.opts.endpoints))
for _, e := range a.opts.endpoints {
endpoints = append(endpoints, e.String())
}
if len(endpoints) == 0 {
for _, srv := range a.opts.servers {
if r, ok := srv.(transport.Endpointer); ok {
e, err := r.Endpoint()
if err != nil {
return nil, err
}
endpoints = append(endpoints, e.String())
}
}
}
return ®istry.ServiceInstance{
ID: a.opts.id,
Name: a.opts.name,
Version: a.opts.version,
Metadata: a.opts.metadata,
Endpoints: endpoints,
}, nil
}
// ID returns app instance id. app实例id
func (a *App) ID() string { return a.opts.id }
// Name returns service name. app名
func (a *App) Name() string { return a.opts.name }
// Version returns app version. app版本
func (a *App) Version() string { return a.opts.version }
// Metadata returns service metadata.
func (a *App) Metadata() map[string]string { return a.opts.metadata }
// Endpoint returns endpoints. app 服务的端点
func (a *App) Endpoint() []string {
if a.instance != nil {
return a.instance.Endpoints
}
return nil
}
type appKey struct{}
// NewContext returns a new Context that carries value. app 上下文信息
func NewContext(ctx context.Context, s AppInfo) context.Context {
return context.WithValue(ctx, appKey{}, s)
}
// FromContext returns the Transport value stored in ctx, if any. 获得app 上下文信息
func FromContext(ctx context.Context) (s AppInfo, ok bool) {
s, ok = ctx.Value(appKey{}).(AppInfo)
return
}