From 0d8cc277f2688593442b82f3f9f317bd4b9271b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E8=8B=B1=E6=9D=B0?= <2635879218@qq.com> Date: Wed, 4 Sep 2024 18:09:04 +0800 Subject: [PATCH] WIP(x/net/http/client): Implement IdleConnPool --- x/net/http/_demo/get/get.go | 2 +- x/net/http/_demo/headers/headers.go | 2 +- .../_demo/maxConnsPerHost/maxConnsPerHost.go | 2 +- x/net/http/_demo/post/post.go | 2 +- x/net/http/_demo/postform/postform.go | 2 - x/net/http/_demo/redirect/redirect.go | 2 +- x/net/http/_demo/reuseConn/reuseConn.go | 42 + x/net/http/_demo/timeout/timeout.go | 12 +- x/net/http/client.go | 8 +- x/net/http/request.go | 73 +- x/net/http/transport.go | 974 +++++++++++++----- x/net/http/util.go | 2 +- 12 files changed, 832 insertions(+), 291 deletions(-) create mode 100644 x/net/http/_demo/reuseConn/reuseConn.go diff --git a/x/net/http/_demo/get/get.go b/x/net/http/_demo/get/get.go index 6bc5b06..6e91bd4 100644 --- a/x/net/http/_demo/get/get.go +++ b/x/net/http/_demo/get/get.go @@ -13,6 +13,7 @@ func main() { fmt.Println(err) return } + defer resp.Body.Close() fmt.Println(resp.Status, "read bytes: ", resp.ContentLength) resp.PrintHeaders() body, err := io.ReadAll(resp.Body) @@ -21,5 +22,4 @@ func main() { return } fmt.Println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/_demo/headers/headers.go b/x/net/http/_demo/headers/headers.go index aa2e5d6..5538923 100644 --- a/x/net/http/_demo/headers/headers.go +++ b/x/net/http/_demo/headers/headers.go @@ -36,6 +36,7 @@ func main() { println(err.Error()) return } + defer resp.Body.Close() fmt.Println(resp.Status) resp.PrintHeaders() body, err := io.ReadAll(resp.Body) @@ -44,5 +45,4 @@ func main() { return } fmt.Println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/_demo/maxConnsPerHost/maxConnsPerHost.go b/x/net/http/_demo/maxConnsPerHost/maxConnsPerHost.go index 882bdc1..5662251 100644 --- a/x/net/http/_demo/maxConnsPerHost/maxConnsPerHost.go +++ b/x/net/http/_demo/maxConnsPerHost/maxConnsPerHost.go @@ -19,6 +19,7 @@ func main() { fmt.Println(err) return } + defer resp.Body.Close() fmt.Println(resp.Status, "read bytes: ", resp.ContentLength) fmt.Println(resp.Proto) resp.PrintHeaders() @@ -28,5 +29,4 @@ func main() { return } fmt.Println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/_demo/post/post.go b/x/net/http/_demo/post/post.go index f169dfc..fd756b3 100644 --- a/x/net/http/_demo/post/post.go +++ b/x/net/http/_demo/post/post.go @@ -15,6 +15,7 @@ func main() { fmt.Println(err) return } + defer resp.Body.Close() fmt.Println(resp.Status) body, err := io.ReadAll(resp.Body) if err != nil { @@ -22,5 +23,4 @@ func main() { return } fmt.Println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/_demo/postform/postform.go b/x/net/http/_demo/postform/postform.go index eae4d6e..232c15d 100644 --- a/x/net/http/_demo/postform/postform.go +++ b/x/net/http/_demo/postform/postform.go @@ -20,12 +20,10 @@ func main() { return } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println(err) return } fmt.Println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/_demo/redirect/redirect.go b/x/net/http/_demo/redirect/redirect.go index e4fdb92..f189255 100644 --- a/x/net/http/_demo/redirect/redirect.go +++ b/x/net/http/_demo/redirect/redirect.go @@ -13,6 +13,7 @@ func main() { fmt.Println(err) return } + defer resp.Body.Close() fmt.Println(resp.Status, "read bytes: ", resp.ContentLength) fmt.Println(resp.Proto) resp.PrintHeaders() @@ -22,5 +23,4 @@ func main() { return } fmt.Println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/_demo/reuseConn/reuseConn.go b/x/net/http/_demo/reuseConn/reuseConn.go new file mode 100644 index 0000000..bb460ce --- /dev/null +++ b/x/net/http/_demo/reuseConn/reuseConn.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "io" + + "github.com/goplus/llgoexamples/x/net/http" +) + +func main() { + // Send request first time + resp, err := http.Get("https://www.baidu.com") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(resp.Status, "read bytes: ", resp.ContentLength) + resp.PrintHeaders() + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(string(body)) + resp.Body.Close() + + // Send request second time + resp, err = http.Get("https://www.baidu.com") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(resp.Status, "read bytes: ", resp.ContentLength) + resp.PrintHeaders() + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(string(body)) + resp.Body.Close() +} diff --git a/x/net/http/_demo/timeout/timeout.go b/x/net/http/_demo/timeout/timeout.go index 62b2c9d..a6930b1 100644 --- a/x/net/http/_demo/timeout/timeout.go +++ b/x/net/http/_demo/timeout/timeout.go @@ -10,24 +10,24 @@ import ( func main() { client := &http.Client{ - //Timeout: time.Millisecond, // Set a small timeout to ensure it will time out - Timeout: time.Second * 5, + Timeout: time.Millisecond, // Set a small timeout to ensure it will time out + //Timeout: time.Second, } req, err := http.NewRequest("GET", "https://www.baidu.com", nil) if err != nil { - fmt.Println(err.Error()) + fmt.Println(err) return } resp, err := client.Do(req) if err != nil { - fmt.Println(err.Error()) + fmt.Println(err) return } + defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - fmt.Println(err.Error()) + fmt.Println(err) return } println(string(body)) - defer resp.Body.Close() } diff --git a/x/net/http/client.go b/x/net/http/client.go index d56f5f2..002397a 100644 --- a/x/net/http/client.go +++ b/x/net/http/client.go @@ -241,7 +241,6 @@ func (c *Client) do(req *Request) (retres *Response, reterr error) { // didTimeout is non-nil only if err != nil. func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { - // TODO(spongehah) cookie(c.send) if c.Jar != nil { for _, cookie := range c.Jar.Cookies(req.URL) { req.AddCookie(cookie) @@ -309,13 +308,16 @@ func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, d } // TODO(spongehah) timeout(send) + //stopTimer, didTimeout := setRequestCancel(req, rt, deadline) + req.timeoutch = make(chan struct{}, 1) req.deadline = deadline + req.ctx.Done() if deadline.IsZero() { didTimeout = alwaysFalse + defer close(req.timeoutch) } else { didTimeout = func() bool { return req.timer.GetDueIn() == 0 } } - //stopTimer, didTimeout := setRequestCancel(req, rt, deadline) resp, err = rt.RoundTrip(req) if err != nil { @@ -488,7 +490,7 @@ func knownRoundTripperImpl(rt RoundTripper, req *Request) bool { return knownRoundTripperImpl(altRT, req) } return true - // TODO(spongehah) + // TODO(spongehah) http2 //case *http2Transport, http2noDialH2RoundTripper: // return true } diff --git a/x/net/http/request.go b/x/net/http/request.go index 658b033..c5146ed 100644 --- a/x/net/http/request.go +++ b/x/net/http/request.go @@ -50,6 +50,30 @@ type Request struct { const defaultChunkSize = 8192 +// NOTE: This is not intended to reflect the actual Go version being used. +// It was changed at the time of Go 1.1 release because the former User-Agent +// had ended up blocked by some intrusion detection systems. +// See https://codereview.appspot.com/7532043. +const defaultUserAgent = "Go-http-client/1.1" + +// errMissingHost is returned by Write when there is no Host or URL present in +// the Request. +var errMissingHost = errors.New("http: Request.Write on Request with no Host or URL set") + +// Headers that Request.Write handles itself and should be skipped. +var reqWriteExcludeHeader = map[string]bool{ + "Host": true, // not in Header map anyway + "User-Agent": true, + "Content-Length": true, + "Transfer-Encoding": true, + "Trailer": true, +} + +// requestBodyReadError wraps an error from (*Request).write to indicate +// that the error came from a Read call on the Request.Body. +// This error type should not escape the net/http package to users. +type requestBodyReadError struct{ error } + // NewRequest wraps NewRequestWithContext using context.Background. func NewRequest(method, url string, body io.Reader) (*Request, error) { return NewRequestWithContext(context.Background(), method, url, body) @@ -188,6 +212,22 @@ func (r *Request) closeBody() error { return r.Body.Close() } +func (r *Request) isReplayable() bool { + if r.Body == nil || r.Body == NoBody || r.GetBody != nil { + switch valueOrDefault(r.Method, "GET") { + case "GET", "HEAD", "OPTIONS", "TRACE": + return true + } + // The Idempotency-Key, while non-standard, is widely used to + // mean a POST or other request is idempotent. See + // https://golang.org/issue/19943#issuecomment-421092421 + if r.Header.has("Idempotency-Key") || r.Header.has("X-Idempotency-Key") { + return true + } + } + return false +} + // Context returns the request's context. To change the context, use // Clone or WithContext. // @@ -240,25 +280,6 @@ func (r *Request) ProtoAtLeast(major, minor int) bool { r.ProtoMajor == major && r.ProtoMinor >= minor } -// errMissingHost is returned by Write when there is no Host or URL present in -// the Request. -var errMissingHost = errors.New("http: Request.Write on Request with no Host or URL set") - -// NOTE: This is not intended to reflect the actual Go version being used. -// It was changed at the time of Go 1.1 release because the former User-Agent -// had ended up blocked by some intrusion detection systems. -// See https://codereview.appspot.com/7532043. -const defaultUserAgent = "Go-http-client/1.1" - -// Headers that Request.Write handles itself and should be skipped. -var reqWriteExcludeHeader = map[string]bool{ - "Host": true, // not in Header map anyway - "User-Agent": true, - "Content-Length": true, - "Transfer-Encoding": true, - "Trailer": true, -} - // extraHeaders may be nil // waitForContinue may be nil // always closes body @@ -279,7 +300,6 @@ func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hype } // Send it! sendTask := client.Send(hyperReq) - taskData.taskId = read sendTask.SetUserdata(c.Pointer(taskData)) sendRes := exec.Push(sendTask) if sendRes != hyper.OK { @@ -482,11 +502,6 @@ func readCookies(h Header, filter string) []*Cookie { return cookies } -// requestBodyReadError wraps an error from (*Request).write to indicate -// that the error came from a Read call on the Request.Body. -// This error type should not escape the net/http package to users. -type requestBodyReadError struct{ error } - func idnaASCII(v string) (string, error) { // TODO: Consider removing this check after verifying performance is okay. // Right now punycode verification, length checks, context checks, and the @@ -519,3 +534,11 @@ func removeZone(host string) string { } return host[:j] + host[i:] } + +// Return value if nonempty, def otherwise. +func valueOrDefault(value, def string) string { + if value != "" { + return value + } + return def +} diff --git a/x/net/http/transport.go b/x/net/http/transport.go index 062cbe9..44d721d 100644 --- a/x/net/http/transport.go +++ b/x/net/http/transport.go @@ -2,6 +2,7 @@ package http import ( "compress/gzip" + "container/list" "context" "errors" "fmt" @@ -17,8 +18,8 @@ import ( "github.com/goplus/llgo/c/libuv" cnet "github.com/goplus/llgo/c/net" "github.com/goplus/llgo/c/syscall" - "github.com/goplus/llgo/x/net" "github.com/goplus/llgoexamples/rust/hyper" + "github.com/goplus/llgoexamples/x/net" ) // DefaultTransport is the default implementation of Transport and is @@ -28,7 +29,9 @@ import ( // and NO_PROXY (or the lowercase versions thereof). var DefaultTransport RoundTripper = &Transport{ //Proxy: ProxyFromEnvironment, - Proxy: nil, + Proxy: nil, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, } // DefaultMaxIdleConnsPerHost is the default value of Transport's @@ -37,6 +40,12 @@ const DefaultMaxIdleConnsPerHost = 2 const debugSwitch = true type Transport struct { + idleMu sync.Mutex + closeIdle bool // user has requested to close all idle conns + idleConn map[connectMethodKey][]*persistConn // most recently used at end + idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns + idleLRU connLRU + altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme reqMu sync.Mutex reqCanceler map[cancelKey]func(error) @@ -63,6 +72,15 @@ type Transport struct { // uncompressed. DisableCompression bool + // MaxIdleConns controls the maximum number of idle (keep-alive) + // connections across all hosts. Zero means no limit. + MaxIdleConns int + + // MaxIdleConnsPerHost, if non-zero, controls the maximum idle + // (keep-alive) connections to keep per-host. If zero, + // DefaultMaxIdleConnsPerHost is used. + MaxIdleConnsPerHost int + // MaxConnsPerHost optionally limits the total number of // connections per host, including connections in the dialing, // active, and idle states. On limit violation, dials will block. @@ -70,9 +88,16 @@ type Transport struct { // Zero means no limit. MaxConnsPerHost int + // IdleConnTimeout is the maximum amount of time an idle + // (keep-alive) connection will remain idle before closing + // itself. + // Zero means no limit. + IdleConnTimeout time.Duration + // libuv and hyper related loopInitOnce sync.Once loop *libuv.Loop + async *libuv.Async exec *hyper.Executor } @@ -181,14 +206,258 @@ func (tr *transportRequest) setError(err error) { tr.mu.Unlock() } -func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) { - cm.targetScheme = treq.URL.Scheme - cm.targetAddr = canonicalAddr(treq.URL) - if t.Proxy != nil { - cm.proxyURL, err = t.Proxy(treq.Request) +func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { + if err := t.tryPutIdleConn(pconn); err != nil { + pconn.close(err) + } +} + +func (t *Transport) maxIdleConnsPerHost() int { + if v := t.MaxIdleConnsPerHost; v != 0 { + return v + } + return DefaultMaxIdleConnsPerHost +} + +// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting +// a new request. +// If pconn is no longer needed or not in a good state, tryPutIdleConn returns +// an error explaining why it wasn't registered. +// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that. +func (t *Transport) tryPutIdleConn(pconn *persistConn) error { + if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { + return errKeepAlivesDisabled + } + if pconn.isBroken() { + return errConnBroken + } + pconn.markReused() + + t.idleMu.Lock() + defer t.idleMu.Unlock() + + // HTTP/2 (pconn.alt != nil) connections do not come out of the idle list, + // because multiple goroutines can use them simultaneously. + // If this is an HTTP/2 connection being “returned,” we're done. + if pconn.alt != nil && t.idleLRU.m[pconn] != nil { + return nil + } + + // Deliver pconn to goroutine waiting for idle connection, if any. + // (They may be actively dialing, but this conn is ready first. + // Chrome calls this socket late binding. + // See https://www.chromium.org/developers/design-documents/network-stack#TOC-Connection-Management.) + key := pconn.cacheKey + if q, ok := t.idleConnWait[key]; ok { + done := false + if pconn.alt == nil { + // HTTP/1. + // Loop over the waiting list until we find a w that isn't done already, and hand it pconn. + for q.len() > 0 { + w := q.popFront() + if w.tryDeliver(pconn, nil) { + done = true + break + } + } + } else { + // HTTP/2. + // Can hand the same pconn to everyone in the waiting list, + // and we still won't be done: we want to put it in the idle + // list unconditionally, for any future clients too. + for q.len() > 0 { + w := q.popFront() + w.tryDeliver(pconn, nil) + } + } + if q.len() == 0 { + delete(t.idleConnWait, key) + } else { + t.idleConnWait[key] = q + } + if done { + return nil + } + } + + if t.closeIdle { + return errCloseIdle + } + if t.idleConn == nil { + t.idleConn = make(map[connectMethodKey][]*persistConn) + } + idles := t.idleConn[key] + if len(idles) >= t.maxIdleConnsPerHost() { + return errTooManyIdleHost + } + for _, exist := range idles { + if exist == pconn { + log.Fatalf("dup idle pconn %p in freelist", pconn) + } + } + t.idleConn[key] = append(idles, pconn) + t.idleLRU.add(pconn) + if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { + oldest := t.idleLRU.removeOldest() + oldest.close(errTooManyIdle) + t.removeIdleConnLocked(oldest) + } + + // Set idle timer, but only for HTTP/1 (pconn.alt == nil). + // The HTTP/2 implementation manages the idle timer itself + // (see idleConnTimeout in h2_bundle.go). + idleConnTimeout := uint64(t.IdleConnTimeout.Milliseconds()) + if t.IdleConnTimeout > 0 && pconn.alt == nil { + if pconn.idleTimer != nil { + pconn.idleTimer.Start(onIdleConnTimeout, idleConnTimeout, 0) + } else { + pconn.idleTimer = &libuv.Timer{} + libuv.InitTimer(t.loop, pconn.idleTimer) + (*libuv.Handle)(c.Pointer(pconn.idleTimer)).SetData(c.Pointer(pconn)) + pconn.idleTimer.Start(onIdleConnTimeout, idleConnTimeout, 0) + } + } + pconn.idleAt = time.Now() + return nil +} + +func onIdleConnTimeout(timer *libuv.Timer) { + pconn := (*persistConn)((*libuv.Handle)(c.Pointer(timer)).GetData()) + isClose := pconn.closeConnIfStillIdle() + if isClose { + timer.Stop() + } else { + timer.Start(onIdleConnTimeout, 0, 0) } - cm.onlyH1 = treq.requiresHTTP1() - return cm, err +} + +// queueForIdleConn queues w to receive the next idle connection for w.cm. +// As an optimization hint to the caller, queueForIdleConn reports whether +// it successfully delivered an already-idle connection. +func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { + if t.DisableKeepAlives { + return false + } + + t.idleMu.Lock() + defer t.idleMu.Unlock() + + // Stop closing connections that become idle - we might want one. + // (That is, undo the effect of t.CloseIdleConnections.) + t.closeIdle = false + + if w == nil { + // Happens in test hook. + return false + } + + // If IdleConnTimeout is set, calculate the oldest + // persistConn.idleAt time we're willing to use a cached idle + // conn. + var oldTime time.Time + if t.IdleConnTimeout > 0 { + oldTime = time.Now().Add(-t.IdleConnTimeout) + } + // Look for most recently-used idle connection. + if list, ok := t.idleConn[w.key]; ok { + stop := false + delivered := false + for len(list) > 0 && !stop { + pconn := list[len(list)-1] + + // See whether this connection has been idle too long, considering + // only the wall time (the Round(0)), in case this is a laptop or VM + // coming out of suspend with previously cached idle connections. + tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) + if tooOld { + // Async cleanup. Launch in its own goroutine (as if a + // time.AfterFunc called it); it acquires idleMu, which we're + // holding, and does a synchronous net.Conn.Close. + pconn.closeConnIfStillIdleLocked() + } + if pconn.isBroken() || tooOld { + // If either persistConn.readLoop has marked the connection + // broken, but Transport.removeIdleConn has not yet removed it + // from the idle list, or if this persistConn is too old (it was + // idle too long), then ignore it and look for another. In both + // cases it's already in the process of being closed. + list = list[:len(list)-1] + continue + } + delivered = w.tryDeliver(pconn, nil) + if delivered { + if pconn.alt != nil { + // HTTP/2: multiple clients can share pconn. + // Leave it in the list. + } else { + // HTTP/1: only one client can use pconn. + // Remove it from the list. + t.idleLRU.remove(pconn) + list = list[:len(list)-1] + } + } + stop = true + } + if len(list) > 0 { + t.idleConn[w.key] = list + } else { + delete(t.idleConn, w.key) + } + if stop { + return delivered + } + } + + // Register to receive next connection that becomes idle. + if t.idleConnWait == nil { + t.idleConnWait = make(map[connectMethodKey]wantConnQueue) + } + q := t.idleConnWait[w.key] + q.cleanFront() + q.pushBack(w) + t.idleConnWait[w.key] = q + return false +} + +// removeIdleConn marks pconn as dead. +func (t *Transport) removeIdleConn(pconn *persistConn) bool { + t.idleMu.Lock() + defer t.idleMu.Unlock() + return t.removeIdleConnLocked(pconn) +} + +// t.idleMu must be held. +func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool { + if pconn.idleTimer != nil { + pconn.idleTimer.Stop() + (*libuv.Handle)(c.Pointer(pconn.idleTimer)).Close(nil) + } + t.idleLRU.remove(pconn) + key := pconn.cacheKey + pconns := t.idleConn[key] + var removed bool + switch len(pconns) { + case 0: + // Nothing + case 1: + if pconns[0] == pconn { + delete(t.idleConn, key) + removed = true + } + default: + for i, v := range pconns { + if v != pconn { + continue + } + // Slide down, keeping most recently-used + // conns at the end. + copy(pconns[i:], pconns[i+1:]) + t.idleConn[key] = pconns[:len(pconns)-1] + removed = true + break + } + } + return removed } func (t *Transport) setReqCanceler(key cancelKey, fn func(error)) { @@ -223,6 +492,16 @@ func (t *Transport) replaceReqCanceler(key cancelKey, fn func(error)) bool { return true } +func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) { + cm.targetScheme = treq.URL.Scheme + cm.targetAddr = canonicalAddr(treq.URL) + if t.Proxy != nil { + cm.proxyURL, err = t.Proxy(treq.Request) + } + cm.onlyH1 = treq.requiresHTTP1() + return cm, err +} + // alternateRoundTripper returns the alternate RoundTripper to use // for this request if the Request's URL scheme requires one, // or nil for the normal case of using the Transport. @@ -286,6 +565,9 @@ func (t *Transport) closeLocked(err error) { if t.loop != nil { t.loop.Close() } + if t.async != nil { + t.async.Close(nil) + } if t.exec != nil { t.exec.Free() } @@ -308,13 +590,12 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { defer println("RoundTrip end") } t.loopInitOnce.Do(func() { + println("init loop") t.loop = libuv.LoopNew() + t.async = &libuv.Async{} t.exec = hyper.NewExecutor() - //idle := &libuv.Idle{} - //libuv.InitIdle(t.loop, idle) - //(*libuv.Handle)(c.Pointer(idle)).SetData(c.Pointer(t)) - //idle.Start(readWriteLoop) + t.loop.Async(t.async, nil) checker := &libuv.Check{} libuv.InitCheck(t.loop, checker) @@ -330,7 +611,6 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { // Only the first request will initialize the timer if req.timer == nil && !req.deadline.IsZero() { req.timer = &libuv.Timer{} - req.timeoutch = make(chan struct{}, 1) libuv.InitTimer(t.loop, req.timer) ch := &timeoutData{ timeoutch: req.timeoutch, @@ -473,9 +753,10 @@ func (t *Transport) doRoundTrip(req *Request) (*Response, error) { var resp *Response if pconn.alt != nil { // HTTP/2 path. - t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest + t.setReqCanceler(cancelKey, nil) // HTTP/2 not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { + // HTTP/1.X path. resp, err = pconn.roundTrip(treq) } @@ -485,8 +766,35 @@ func (t *Transport) doRoundTrip(req *Request) (*Response, error) { } // Failed. Clean up and determine whether to retry. - // TODO(spongehah) Retry & ConnPool(t.doRoundTrip) - return nil, err + // TODO(spongehah) ConnPool(t.doRoundTrip) + if http2isNoCachedConnError(err) { + if t.removeIdleConn(pconn) { + t.decConnsPerHost(pconn.cacheKey) + } + } else if !pconn.shouldRetryRequest(req, err) { + // Issue 16465: return underlying net.Conn.Read error from peek, + // as we've historically done. + if e, ok := err.(nothingWrittenError); ok { + err = e.error + } + if e, ok := err.(transportReadFromServerError); ok { + err = e.err + } + if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose { + // Issue 49621: Close the request body if pconn.roundTrip + // didn't do so already. This can happen if the pconn + // write loop exits without reading the write request. + req.closeBody() + } + return nil, err + } + testHookRoundTripRetried() + + // Rewind the body if we're able to. + req, err = rewindBody(req) + if err != nil { + return nil, err + } } } @@ -507,7 +815,6 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persi key: cm.key(), //ctx: ctx, timeoutch: treq.timeoutch, - ready: make(chan struct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } @@ -518,20 +825,21 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persi }() // TODO(spongehah) ConnPool(t.getConn) - //// Queue for idle connection. - //if delivered := t.queueForIdleConn(w); delivered { - // pc := w.pc - // // Trace only for HTTP/1. - // // HTTP/2 calls trace.GotConn itself. - // if pc.alt == nil && trace != nil && trace.GotConn != nil { - // trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) - // } - // // set request canceler to some non-nil function so we - // // can detect whether it was cleared between now and when - // // we enter roundTrip - // t.setReqCanceler(treq.cancelKey, func(error) {}) - // return pc, nil - //} + // Queue for idle connection. + if delivered := t.queueForIdleConn(w); delivered { + pc := w.pc + // Trace only for HTTP/1. + // HTTP/2 calls trace.GotConn itself. + // TODO(spongehah) trace(t.getConn) + //if pc.alt == nil && trace != nil && trace.GotConn != nil { + // trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) + //} + // set request canceler to some non-nil function so we + // can detect whether it was cleared between now and when + // we enter roundTrip + t.setReqCanceler(treq.cancelKey, func(error) {}) + return pc, nil + } cancelc := make(chan error, 1) t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err }) @@ -539,52 +847,36 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persi // Queue for permission to dial. t.queueForDial(w) - // Wait for completion or cancellation. - select { - case <-w.ready: - // Trace success but only for HTTP/1. - // HTTP/2 calls trace.GotConn itself. - //if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { - // trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) - //} - if w.err != nil { - // If the request has been canceled, that's probably - // what caused w.err; if so, prefer to return the - // cancellation error (see golang.org/issue/16049). - select { - // TODO(spongehah) cancel(t.getConn) - //case <-req.Cancel: - // return nil, errRequestCanceledConn - //case <-req.Context().Done(): - // return nil, req.Context().Err() - case <-req.timeoutch: - return nil, errors.New("timeout: req.Context().Err()") - case err := <-cancelc: - if err == errRequestCanceled { - err = errRequestCanceledConn - } - return nil, err - default: - // return below + // Trace success but only for HTTP/1. + // HTTP/2 calls trace.GotConn itself. + //if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { + // trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) + //} + if w.err != nil { + // If the request has been canceled, that's probably + // what caused w.err; if so, prefer to return the + // cancellation error (see golang.org/issue/16049). + select { + // TODO(spongehah) timeout(t.getConn) + //case <-req.Cancel: + // return nil, errRequestCanceledConn + //case <-req.Context().Done(): + // return nil, req.Context().Err() + case <-req.timeoutch: + if debugSwitch { + println("getConn: timeoutch") } + return nil, errors.New("timeout: req.Context().Err()") + case err := <-cancelc: + if err == errRequestCanceled { + err = errRequestCanceledConn + } + return nil, err + default: + // return below } - return w.pc, w.err - // TODO(spongehah) cancel(t.getConn) - //case <-req.Cancel: - // return nil, errRequestCanceledConn - //case <-req.Context().Done(): - // return nil, - case <-req.timeoutch: - if debugSwitch { - println("getConn: timeoutch") - } - return nil, errors.New("timeout: req.Context().Err()\n") - case err := <-cancelc: - if err == errRequestCanceled { - err = errRequestCanceledConn - } - return nil, err } + return w.pc, w.err } // queueForDial queues w to wait for permission to begin dialing. @@ -597,7 +889,7 @@ func (t *Transport) queueForDial(w *wantConn) { w.beforeDial() if t.MaxConnsPerHost <= 0 { - go t.dialConnFor(w) + t.dialConnFor(w) return } @@ -609,7 +901,7 @@ func (t *Transport) queueForDial(w *wantConn) { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 - go t.dialConnFor(w) + t.dialConnFor(w) return } @@ -633,17 +925,16 @@ func (t *Transport) dialConnFor(w *wantConn) { defer w.afterDial() pc, err := t.dialConn(w.timeoutch, w.cm) - w.tryDeliver(pc, err) // TODO(spongehah) ConnPool(t.dialConnFor) - //delivered := w.tryDeliver(pc, err) - // Handle undelivered or shareable connections - //if err == nil && (!delivered || pc.alt != nil) { - // // pconn was not passed to w, - // // or it is HTTP/2 and can be shared. - // // Add to the idle connection pool. - // t.putOrCloseIdleConn(pc) - //} - + delivered := w.tryDeliver(pc, err) + // If the connection was successfully established but was not passed to w, + // or is a shareable HTTP/2 connection + if err == nil && (!delivered || pc.alt != nil) { + // pconn was not passed to w, + // or it is HTTP/2 and can be shared. + // Add to the idle connection pool. + t.putOrCloseIdleConn(pc) + } // If an error occurs during the dialing process, the connection count for that host is decreased. // This ensures that the connection count remains accurate even in cases where the dial attempt fails. if err != nil { @@ -676,7 +967,7 @@ func (t *Transport) decConnsPerHost(key connectMethodKey) { for q.len() > 0 { w := q.popFront() if w.waiting() { - go t.dialConnFor(w) + t.dialConnFor(w) done = true break } @@ -708,6 +999,7 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * } select { case <-timeoutch: + err = errors.New("[t.dialConn] request timeout") return default: } @@ -716,6 +1008,7 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * cacheKey: cm.key(), closech: make(chan struct{}, 1), writeLoopDone: make(chan struct{}, 1), + alive: true, } //trace := httptrace.ContextClientTrace(ctx) @@ -754,7 +1047,7 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * // } //} else { //conn, err := t.dial(ctx, "tcp", cm.addr()) - conn, err := t.dial(timeoutch, cm.addr()) + conn, err := t.dial(cm.addr()) if err != nil { return nil, err } @@ -811,14 +1104,15 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * select { case <-timeoutch: - conn.Close() - return + err = errors.New("[t.dialConn] request timeout") + pconn.close(err) + return nil, err default: } return pconn, nil } -func (t *Transport) dial(timeoutch chan struct{}, addr string) (*connData, error) { +func (t *Transport) dial(addr string) (*connData, error) { if debugSwitch { println("dial start") defer println("dial end") @@ -862,7 +1156,7 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err testHookEnterRoundTrip() if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) { // TODO(spongehah) ConnPool(pc.roundTrip) - //pc.t.putOrCloseIdleConn(pc) + pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.mu.Lock() @@ -925,16 +1219,7 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err writeErrCh := make(chan error, 1) resc := make(chan responseAndError, 1) - // Hookup the IO - hyperIo := newIoWithConnReadWrite(pc.conn) - // We need an executor generally to poll futures - // Prepare client options - opts := hyper.NewClientConnOptions() - opts.Exec(pc.t.exec) - // send the handshake - handshakeTask := hyper.Handshake(hyperIo, opts) taskData := &taskData{ - taskId: write, req: req, pc: pc, addedGzip: requestedGzip, @@ -942,14 +1227,32 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err callerGone: gone, resc: resc, } - handshakeTask.SetUserdata(c.Pointer(taskData)) - // Send the request to readWriteLoop(). - // Let's wait for the handshake to finish... - pc.t.exec.Push(handshakeTask) - async := &libuv.Async{} - pc.t.loop.Async(async, asyncCb) - async.Send() + if pc.client == nil && !pc.isReused() { + println("first") + // Hookup the IO + hyperIo := newIoWithConnReadWrite(pc.conn) + // We need an executor generally to poll futures + // Prepare client options + opts := hyper.NewClientConnOptions() + opts.Exec(pc.t.exec) + // send the handshake + handshakeTask := hyper.Handshake(hyperIo, opts) + taskData.taskId = handshake + handshakeTask.SetUserdata(c.Pointer(taskData)) + // Send the request to readWriteLoop(). + pc.t.exec.Push(handshakeTask) + } else { + println("second") + taskData.taskId = read + err = req.write(pc.client, taskData, pc.t.exec) + if err != nil { + writeErrCh <- err + } + } + + // Wake up libuv. Loop + pc.t.async.Send() //var respHeaderTimer <-chan time.Time //cancelChan := req.Request.Cancel @@ -1003,7 +1306,7 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil - // TODO(spongehah) cancel(pc.roundTrip) + // TODO(spongehah) timeout(pc.roundTrip) //case <-cancelChan: // canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled) // cancelChan = nil @@ -1022,20 +1325,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err } } -func asyncCb(async *libuv.Async) { - println("async called") -} - // readWriteLoop handles the main I/O loop for a persistent connection. // It processes incoming requests, sends them to the server, and handles responses. -func readWriteLoop(idle *libuv.Check) { - println("polling") - t := (*Transport)((*libuv.Handle)(c.Pointer(idle)).GetData()) +func readWriteLoop(checker *libuv.Check) { + t := (*Transport)((*libuv.Handle)(c.Pointer(checker)).GetData()) // Read this once, before loop starts. (to avoid races in tests) - testHookMu.Lock() - testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead - testHookMu.Unlock() + //testHookMu.Lock() + //testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead + //testHookMu.Unlock() const debugReadWriteLoop = true // Debug switch provided for developers @@ -1043,6 +1341,9 @@ func readWriteLoop(idle *libuv.Check) { // Poll all ready tasks and act on them... for { task := t.exec.Poll() + if debugSwitch { + println("polling") + } if task == nil { return } @@ -1057,28 +1358,51 @@ func readWriteLoop(idle *libuv.Check) { println("taskId: ", taskId) } switch taskId { - case write: + case handshake: if debugReadWriteLoop { println("write") } + err := checkTaskType(task, handshake) + if err != nil { + taskData.writeErrCh <- err + task.Free() + continue + } + + pc := taskData.pc select { - case <-taskData.pc.closech: + case <-pc.closech: task.Free() continue default: } - err := checkTaskType(task, write) - client := (*hyper.ClientConn)(task.Value()) + pc.client = (*hyper.ClientConn)(task.Value()) task.Free() - if err == nil { - // TODO(spongehah) Proxy(writeLoop) - err = taskData.req.Request.write(client, taskData, t.exec) + // TODO(spongehah) Proxy(writeLoop) + taskData.taskId = read + err = taskData.req.Request.write(pc.client, taskData, t.exec) + + if err != nil { + //pc.writeErrCh <- err // to the body reader, which might recycle us + taskData.writeErrCh <- err // to the roundTrip function + pc.close(err) + continue + } + + if debugReadWriteLoop { + println("write end") + } + case read: + if debugReadWriteLoop { + println("read") } - // For this request, no longer need the client - client.Free() + + pc := taskData.pc + + err := checkTaskType(task, read) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's @@ -1093,59 +1417,48 @@ func readWriteLoop(idle *libuv.Check) { if err != nil { //pc.writeErrCh <- err // to the body reader, which might recycle us taskData.writeErrCh <- err // to the roundTrip function - taskData.pc.close(err) + pc.close(err) continue } - if debugReadWriteLoop { - println("write end") - } - case read: - if debugReadWriteLoop { - println("read") - } - - if taskData.pc.closeErr == nil { - taskData.pc.closeErr = errReadLoopExiting + if pc.closeErr == nil { + pc.closeErr = errReadLoopExiting } // TODO(spongehah) ConnPool(readWriteLoop) - //if taskData.pc.tryPutIdleConn == nil { - // //taskData.pc.tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { - // // if err := pc.t.tryPutIdleConn(pc); err != nil { - // // closeErr = err - // // if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { - // // trace.PutIdleConn(err) - // // } - // // return false - // // } - // // if trace != nil && trace.PutIdleConn != nil { - // // trace.PutIdleConn(nil) - // // } - // // return true - // //} - //} + if pc.tryPutIdleConn == nil { + pc.tryPutIdleConn = func() bool { + if err := pc.t.tryPutIdleConn(pc); err != nil { + pc.closeErr = err + // TODO(spongehah) trace(readWriteLoop) + //if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { + // trace.PutIdleConn(err) + //} + return false + } + //if trace != nil && trace.PutIdleConn != nil { + // trace.PutIdleConn(nil) + //} + return true + } + } - err := checkTaskType(task, read) + // Take the results + hyperResp := (*hyper.Response)(task.Value()) + task.Free() - taskData.pc.mu.Lock() - if taskData.pc.numExpectedResponses == 0 { - taskData.pc.closeLocked(errServerClosedIdle) - taskData.pc.mu.Unlock() + pc.mu.Lock() + if pc.numExpectedResponses == 0 { + pc.readLoopPeekFailLocked(hyperResp, err) + pc.mu.Unlock() // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) continue } - taskData.pc.mu.Unlock() + pc.mu.Unlock() //trace := httptrace.ContextClientTrace(rc.req.Context()) - // Take the results - hyperResp := (*hyper.Response)(task.Value()) - task.Free() - var resp *Response var respBody *hyper.Body if err == nil { @@ -1155,7 +1468,7 @@ func readWriteLoop(idle *libuv.Check) { respBody = hyperResp.Body() } else { err = transportReadFromServerError{err} - taskData.pc.closeErr = err + pc.closeErr = err } // No longer need the response @@ -1166,21 +1479,17 @@ func readWriteLoop(idle *libuv.Check) { case taskData.resc <- responseAndError{err: err}: case <-taskData.callerGone: // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) continue } // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) continue } - taskData.pc.mu.Lock() - taskData.pc.numExpectedResponses-- - taskData.pc.mu.Unlock() + pc.mu.Lock() + pc.numExpectedResponses-- + pc.mu.Unlock() bodyWritable := resp.bodyIsWritable() hasBody := taskData.req.Method != "HEAD" && resp.ContentLength != 0 @@ -1189,46 +1498,43 @@ func readWriteLoop(idle *libuv.Check) { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. - taskData.pc.alive = false + pc.alive = false } if !hasBody || bodyWritable { - //replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) - t.replaceReqCanceler(taskData.req.cancelKey, nil) + replaced := pc.t.replaceReqCanceler(taskData.req.cancelKey, nil) // TODO(spongehah) ConnPool(readWriteLoop) - //// Put the idle conn back into the pool before we send the response - //// so if they process it quickly and make another request, they'll - //// get this same conn. But we use the unbuffered channel 'rc' - //// to guarantee that persistConn.roundTrip got out of its select - //// potentially waiting for this persistConn to close. - //taskData.pc.alive = taskData.pc.alive && + // Put the idle conn back into the pool before we send the response + // so if they process it quickly and make another request, they'll + // get this same conn. But we use the unbuffered channel 'rc' + // to guarantee that persistConn.roundTrip got out of its select + // potentially waiting for this persistConn to close. + pc.alive = pc.alive && + replaced && pc.tryPutIdleConn() + //pc.alive = pc.alive && // !pc.sawEOF && // pc.wroteRequest() && - // replaced && tryPutIdleConn(trace) + // replaced && pc.tryPutIdleConn() if bodyWritable { - taskData.pc.closeErr = errCallerOwnsConn + pc.closeErr = errCallerOwnsConn } select { case taskData.resc <- responseAndError{res: resp}: case <-taskData.callerGone: // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) continue } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) - testHookReadLoopBeforeNextRead() - if taskData.pc.alive == false { + //testHookReadLoopBeforeNextRead() + if pc.alive == false { // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) } continue } @@ -1242,7 +1548,7 @@ func readWriteLoop(idle *libuv.Check) { fn: func(err error) error { isEOF := err == io.EOF if !isEOF { - if cerr := taskData.pc.canceled(); cerr != nil { + if cerr := pc.canceled(); cerr != nil { return cerr } } @@ -1265,24 +1571,22 @@ func readWriteLoop(idle *libuv.Check) { taskData.taskId = readDone bodyForeachTask.SetUserdata(c.Pointer(taskData)) t.exec.Push(bodyForeachTask) - (*timeoutData)((*libuv.Handle)(c.Pointer(taskData.req.timer)).GetData()).taskData = taskData + if taskData.req.timer != nil { + (*timeoutData)((*libuv.Handle)(c.Pointer(taskData.req.timer)).GetData()).taskData = taskData + } // TODO(spongehah) select blocking(readWriteLoop) //select { //case taskData.resc <- responseAndError{res: resp}: //case <-taskData.callerGone: // // defer - // taskData.pc.close(taskData.pc.closeErr) - // // TODO(spongehah) ConnPool(readWriteLoop) - // //t.removeIdleConn(pc) + // readLoopDefer(pc, t) // continue //} select { case <-taskData.callerGone: // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) continue default: } @@ -1301,45 +1605,48 @@ func readWriteLoop(idle *libuv.Check) { } checkTaskType(task, readDone) - //bodyEOF := task.Type() == hyper.TaskEmpty + bodyEOF := task.Type() == hyper.TaskEmpty // free the task task.Free() - t.replaceReqCanceler(taskData.req.cancelKey, nil) // before pc might return to idle pool + pc := taskData.pc + + replaced := t.replaceReqCanceler(taskData.req.cancelKey, nil) // before pc might return to idle pool // TODO(spongehah) ConnPool(readWriteLoop) - //taskData.pc.alive = taskData.pc.alive && + pc.alive = pc.alive && + bodyEOF && + replaced && pc.tryPutIdleConn() + //pc.alive = pc.alive && // bodyEOF && // !pc.sawEOF && // pc.wroteRequest() && // replaced && tryPutIdleConn(trace) - // TODO(spongehah) cancel(pc.readWriteLoop) + // TODO(spongehah) timeout(t.readWriteLoop) //case <-rw.rc.req.Cancel: - // taskData.pc.alive = false + // pc.alive = false // pc.t.CancelRequest(rw.rc.req) //case <-rw.rc.req.Context().Done(): - // taskData.pc.alive = false + // pc.alive = false // pc.t.cancelRequest(rw.rc.cancelKey, rw.rc.req.Context().Err()) - //case <-taskData.pc.closech: - // taskData.pc.alive = false + //case <-pc.closech: + // pc.alive = false //} - select { - case <-taskData.req.timeoutch: - continue - case <-taskData.pc.closech: - taskData.pc.alive = false - default: - } + //select { + //case <-taskData.req.timeoutch: + // continue + //case <-pc.closech: + // pc.alive = false + //default: + //} - if taskData.pc.alive == false { + if pc.alive == false { // defer - taskData.pc.close(taskData.pc.closeErr) - // TODO(spongehah) ConnPool(readWriteLoop) - //t.removeIdleConn(pc) + readLoopDefer(pc, t) } - testHookReadLoopBeforeNextRead() + //testHookReadLoopBeforeNextRead() if debugReadWriteLoop { println("readDone end") } @@ -1350,6 +1657,12 @@ func readWriteLoop(idle *libuv.Check) { } } +func readLoopDefer(pc *persistConn, t *Transport) { + pc.close(pc.closeErr) + // TODO(spongehah) ConnPool(readLoopDefer) + t.removeIdleConn(pc) +} + // ---------------------------------------------------------- type taskData struct { @@ -1374,6 +1687,9 @@ type connData struct { } func (conn *connData) Close() error { + if conn == nil { + return nil + } if conn.ReadWaker != nil { conn.ReadWaker.Free() conn.ReadWaker = nil @@ -1535,9 +1851,7 @@ func onTimeout(timer *libuv.Timer) { pc.alive = false pc.t.cancelRequest(taskData.req.cancelKey, errors.New("timeout: req.Context().Err()")) // defer - pc.close(pc.closeErr) - // TODO(spongehah) ConnPool(onTimeout) - //t.removeIdleConn(pc) + readLoopDefer(pc, pc.t) } } @@ -1555,7 +1869,7 @@ type taskId c.Int const ( notSet taskId = iota - write + handshake read readDone ) @@ -1563,13 +1877,13 @@ const ( // checkTaskType checks the task type func checkTaskType(task *hyper.Task, curTaskId taskId) error { switch curTaskId { - case write: + case handshake: if task.Type() == hyper.TaskError { - log.Printf("[readWriteLoop::write]handshake task error!\n") + log.Printf("[readWriteLoop::handshake]handshake task error!\n") return fail((*hyper.Error)(task.Value())) } if task.Type() != hyper.TaskClientConn { - return fmt.Errorf("[readWriteLoop::write]unexpected task type\n") + return fmt.Errorf("[readWriteLoop::handshake]unexpected task type\n") } return nil case read: @@ -1746,6 +2060,10 @@ type persistConn struct { writeLoopDone chan struct{} // closed when readWriteLoop ends + // Both guarded by Transport.idleMu: + idleAt time.Time // time it last become idle + idleTimer *libuv.Timer // holding an onIdleConnTimeout to close it + mu sync.Mutex // guards following fields numExpectedResponses int closed error // set non-nil when conn is closed, before closech is closed @@ -1754,11 +2072,14 @@ type persistConn struct { // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) + reused bool // whether conn has had successful request/response and is being reused. mutateHeaderFunc func(Header) // other - alive bool // Replace the alive in readLoop - closeErr error // Replace the closeErr in readLoop + alive bool // Replace the alive in readLoop + closeErr error // Replace the closeErr in readLoop + tryPutIdleConn func() bool // Replace the tryPutIdleConn in readLoop + client *hyper.ClientConn } func (pc *persistConn) cancelRequest(err error) { @@ -1779,7 +2100,18 @@ func (pc *persistConn) close(err error) { pc.closeLocked(err) } +// markReused marks this connection as having been successfully used for a +// request and response. +func (pc *persistConn) markReused() { + pc.mu.Lock() + pc.reused = true + pc.mu.Unlock() +} + func (pc *persistConn) closeLocked(err error) { + if debugSwitch { + println("pc closed") + } if err == nil { panic("nil error") } @@ -1795,6 +2127,7 @@ func (pc *persistConn) closeLocked(err error) { } close(pc.closech) close(pc.writeLoopDone) + pc.client.Free() } } pc.mutateHeaderFunc = nil @@ -1866,6 +2199,14 @@ func (pc *persistConn) canceled() error { return pc.canceledErr } +// isReused reports whether this connection has been used before. +func (pc *persistConn) isReused() bool { + pc.mu.Lock() + r := pc.reused + pc.mu.Unlock() + return r +} + // isBroken reports whether this connection is in a known broken state. func (pc *persistConn) isBroken() bool { pc.mu.Lock() @@ -1874,6 +2215,107 @@ func (pc *persistConn) isBroken() bool { return b } +// shouldRetryRequest reports whether we should retry sending a failed +// HTTP request on a new connection. The non-nil input error is the +// error from roundTrip. +func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool { + if http2isNoCachedConnError(err) { + // Issue 16582: if the user started a bunch of + // requests at once, they can all pick the same conn + // and violate the server's max concurrent streams. + // Instead, match the HTTP/1 behavior for now and dial + // again to get a new TCP connection, rather than failing + // this request. + return true + } + if err == errMissingHost { + // User error. + return false + } + if !pc.isReused() { + // This was a fresh connection. There's no reason the server + // should've hung up on us. + // + // Also, if we retried now, we could loop forever + // creating new connections and retrying if the server + // is just hanging up on us because it doesn't like + // our request (as opposed to sending an error). + return false + } + if _, ok := err.(nothingWrittenError); ok { + // We never wrote anything, so it's safe to retry, if there's no body or we + // can "rewind" the body with GetBody. + return req.outgoingLength() == 0 || req.GetBody != nil + } + if !req.isReplayable() { + // Don't retry non-idempotent requests. + return false + } + if _, ok := err.(transportReadFromServerError); ok { + // We got some non-EOF net.Conn.Read failure reading + // the 1st response byte from the server. + return true + } + if err == errServerClosedIdle { + // The server replied with io.EOF while we were trying to + // read the response. Probably an unfortunately keep-alive + // timeout, just as the client was writing a request. + return true + } + return false // conservatively +} + +// closeConnIfStillIdle closes the connection if it's still sitting idle. +// This is what's called by the persistConn's idleTimer, and is run in its +// own goroutine. +func (pc *persistConn) closeConnIfStillIdle() bool { + t := pc.t + isLock := t.idleMu.TryLock() + if isLock { + defer t.idleMu.Unlock() + pc.closeConnIfStillIdleLocked() + return true + } + return false +} + +func (pc *persistConn) closeConnIfStillIdleLocked() { + t := pc.t + if _, ok := t.idleLRU.m[pc]; !ok { + // Not idle. + return + } + t.removeIdleConnLocked(pc) + pc.close(errIdleConnTimeout) +} + +func (pc *persistConn) readLoopPeekFailLocked(resp *hyper.Response, err error) { + if pc.closed != nil { + return + } + if is408Message(resp) { + pc.closeLocked(errServerClosedIdle) + return + } + pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", err)) +} + +func is408Message(resp *hyper.Response) bool { + httpVersion := int(resp.Version()) + if httpVersion != 10 && httpVersion != 11 { + return false + } + return resp.Status() == 408 +} + +// isNoCachedConnError reports whether err is of type noCachedConnError +// or its equivalent renamed type in net/http2's h2_bundle.go. Both types +// may coexist in the same running program. +func http2isNoCachedConnError(err error) bool { // h2_bundle.go + _, ok := err.(interface{ IsHTTP2NoCachedConnError() }) + return ok +} + // connectMethod is the map key (in its String form) for keeping persistent // TCP connections alive for subsequent HTTP requests. // @@ -1967,7 +2409,8 @@ type wantConn struct { key connectMethodKey // cm.key() ctx context.Context // context for dial timeoutch chan struct{} // tmp timeout to replace ctx - ready chan struct{} // closed when pc, err pair is delivered + ready bool + //ready chan struct{} // closed when pc, err pair is delivered // hooks for testing to know when dials are done // beforeDial is called in the getConn goroutine when the dial is queued. @@ -1985,25 +2428,24 @@ type wantConn struct { func (w *wantConn) cancel(t *Transport, err error) { w.mu.Lock() if w.pc == nil && w.err == nil { - close(w.ready) // catch misbehavior in future delivery + w.ready = true // catch misbehavior in future delivery } - //pc := w.pc + pc := w.pc w.pc = nil w.err = err w.mu.Unlock() // TODO(spongehah) ConnPool(w.cancel) - //if pc != nil { - // t.putOrCloseIdleConn(pc) - //} + if pc != nil { + t.putOrCloseIdleConn(pc) + } } // waiting reports whether w is still waiting for an answer (connection or error). func (w *wantConn) waiting() bool { - select { - case <-w.ready: + if w.ready { return false - default: + } else { return true } } @@ -2022,12 +2464,7 @@ func (w *wantConn) tryDeliver(pc *persistConn, err error) bool { if w.pc == nil && w.err == nil { panic("net/http: internal error: misuse of tryDeliver") } - select { - case <-w.timeoutch: - pc.close(errors.New("request timeout: dialConn timeout")) - default: - } - close(w.ready) + w.ready = true return true } @@ -2200,3 +2637,42 @@ func (gz *gzipReader) Read(p []byte) (n int, err error) { func (gz *gzipReader) Close() error { return gz.body.Close() } + +type connLRU struct { + ll *list.List // list.Element.Value type is of *persistConn + m map[*persistConn]*list.Element +} + +// add adds pc to the head of the linked list. +func (cl *connLRU) add(pc *persistConn) { + if cl.ll == nil { + cl.ll = list.New() + cl.m = make(map[*persistConn]*list.Element) + } + ele := cl.ll.PushFront(pc) + if _, ok := cl.m[pc]; ok { + panic("persistConn was already in LRU") + } + cl.m[pc] = ele +} + +func (cl *connLRU) removeOldest() *persistConn { + ele := cl.ll.Back() + pc := ele.Value.(*persistConn) + cl.ll.Remove(ele) + delete(cl.m, pc) + return pc +} + +// remove removes pc from cl. +func (cl *connLRU) remove(pc *persistConn) { + if ele, ok := cl.m[pc]; ok { + cl.ll.Remove(ele) + delete(cl.m, pc) + } +} + +// len returns the number of items in the cache. +func (cl *connLRU) len() int { + return len(cl.m) +} diff --git a/x/net/http/util.go b/x/net/http/util.go index bec22a8..bfd9fc3 100644 --- a/x/net/http/util.go +++ b/x/net/http/util.go @@ -7,7 +7,7 @@ import ( "golang.org/x/net/idna" - "github.com/goplus/llgo/x/net" + "github.com/goplus/llgoexamples/x/net" ) /**