-
Notifications
You must be signed in to change notification settings - Fork 19
/
connpool.go
137 lines (107 loc) · 2.54 KB
/
connpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package mongonet
import "crypto/tls"
import "fmt"
import "net"
import "sync"
import "sync/atomic"
import "time"
type PooledConnection struct {
conn net.Conn
lastUsedUnix int64
pool *ConnectionPool
closed bool
bad bool
}
func (pc *PooledConnection) Close() {
pc.pool.Put(pc)
}
// ---
type ConnectionHook func(net.Conn) error
type ConnectionPool struct {
address string
ssl bool
timeoutSeconds int64
trace bool
pool []*PooledConnection
poolMutex sync.Mutex
totalCreated int64
postCreateHook ConnectionHook
}
func NewConnectionPool(address string, ssl bool, hook func(net.Conn) error) *ConnectionPool {
return &ConnectionPool{address, ssl, 3600, false, []*PooledConnection{}, sync.Mutex{}, 0, hook}
}
func (cp *ConnectionPool) Trace(s string) {
if cp.trace {
fmt.Printf(s)
}
}
func (cp *ConnectionPool) LoadTotalCreated() int64 {
return atomic.LoadInt64(&cp.totalCreated)
}
func (cp *ConnectionPool) CurrentInPool() int {
cp.poolMutex.Lock()
defer cp.poolMutex.Unlock()
return len(cp.pool)
}
func (cp *ConnectionPool) rawGet() *PooledConnection {
cp.poolMutex.Lock()
defer cp.poolMutex.Unlock()
last := len(cp.pool) - 1
if last < 0 {
return nil
}
ret := cp.pool[last]
cp.pool = cp.pool[:last]
return ret
}
func (cp *ConnectionPool) Get() (*PooledConnection, error) {
cp.Trace("ConnectionPool::Get\n")
for {
conn := cp.rawGet()
if conn == nil {
break
}
// if a connection has been idle for more than an hour, don't re-use it
if time.Now().Unix()-conn.lastUsedUnix < cp.timeoutSeconds {
conn.closed = false
return conn, nil
}
// close it since we're not going to use it anymore
conn.conn.Close()
}
var err error
var newConn net.Conn
if cp.ssl {
tlsConfig := &tls.Config{}
tlsConfig.InsecureSkipVerify = true
newConn, err = tls.Dial("tcp", cp.address, tlsConfig)
} else {
newConn, err = net.Dial("tcp", cp.address)
}
if err != nil {
return &PooledConnection{}, err
}
if cp.postCreateHook != nil {
err = cp.postCreateHook(newConn)
if err != nil {
return &PooledConnection{}, err
}
}
atomic.AddInt64(&cp.totalCreated, 1)
return &PooledConnection{newConn, 0, cp, false, false}, nil
}
func (cp *ConnectionPool) Put(conn *PooledConnection) {
cp.Trace("ConnectionPool::Put\n")
if conn.closed {
panic("closing a connection twice")
}
conn.lastUsedUnix = time.Now().Unix()
conn.closed = true
if conn.bad {
conn.conn.Close()
return
}
cp.poolMutex.Lock()
defer cp.poolMutex.Unlock()
cp.pool = append(cp.pool, conn)
}