From 4bf50fee39f584241079d8ebbb712742932b49a4 Mon Sep 17 00:00:00 2001 From: hackerchai Date: Fri, 20 Sep 2024 16:00:31 +0800 Subject: [PATCH] refactor(x/net/http): Multiple fixes & adapt bodyStream struct Signed-off-by: hackerchai --- x/net/http/request.go | 147 ++++++++++++++++++++++++++++++++++++++-- x/net/http/response.go | 11 ++- x/net/http/server.go | 35 ++++++---- x/net/http/transport.go | 117 ++++++++++++++++---------------- x/net/http/util.go | 3 +- 5 files changed, 225 insertions(+), 88 deletions(-) diff --git a/x/net/http/request.go b/x/net/http/request.go index 37d6408..b049578 100644 --- a/x/net/http/request.go +++ b/x/net/http/request.go @@ -9,12 +9,12 @@ import ( "net/url" "strings" "time" - - "github.com/goplus/llgo/c/libuv" - "golang.org/x/net/idna" + "unsafe" "github.com/goplus/llgo/c" + "github.com/goplus/llgo/c/libuv" "github.com/goplus/llgoexamples/rust/hyper" + "golang.org/x/net/idna" ) type Request struct { @@ -230,7 +230,7 @@ func (r *Request) ProtoAtLeast(major, minor int) bool { // extraHeaders may be nil // waitForContinue may be nil // always closes body -func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hyper.Executor) (err error) { +func (r *Request) write(client *hyper.ClientConn, taskData *clientTaskData, exec *hyper.Executor) (err error) { //trace := httptrace.ContextClientTrace(r.Context()) //if trace != nil && trace.WroteRequest != nil { // defer func() { @@ -488,3 +488,142 @@ func valueOrDefault(value, def string) string { } return def } + +func readRequest(executor *hyper.Executor, hyperReq *hyper.Request, requestNotifyHandle *libuv.Async, remoteAddr string) (*Request, error) { + println("[debug] readRequest called") + req := Request{ + Header: make(Header), + Body: nil, + } + req.RemoteAddr = remoteAddr + + headers := hyperReq.Headers() + if headers != nil { + headers.Foreach(addHeader, unsafe.Pointer(&req)) + } else { + return nil, fmt.Errorf("failed to get request headers") + } + + var host string + for key, values := range req.Header { + if strings.EqualFold(key, "Host") { + if len(values) > 0 { + host = values[0] + break + } + } + + } + + method := make([]byte, 32) + methodLen := unsafe.Sizeof(method) + if err := hyperReq.Method(&method[0], &methodLen); err != hyper.OK { + return nil, fmt.Errorf("failed to get method: %v", err) + } + + methodStr := string(method[:methodLen]) + + var scheme, authority, pathAndQuery [1024]byte + schemeLen, authorityLen, pathAndQueryLen := unsafe.Sizeof(scheme), unsafe.Sizeof(authority), unsafe.Sizeof(pathAndQuery) + uriResult := hyperReq.URIParts(&scheme[0], &schemeLen, &authority[0], &authorityLen, &pathAndQuery[0], &pathAndQueryLen) + if uriResult != hyper.OK { + return nil, fmt.Errorf("failed to get URI parts: %v", uriResult) + } + + var schemeStr, authorityStr, pathAndQueryStr string + if schemeLen == 0 { + schemeStr = "http" + } else { + schemeStr = string(scheme[:schemeLen]) + } + + if authorityLen == 0 { + authorityStr = host + } else { + authorityStr = string(authority[:authorityLen]) + } + + if pathAndQueryLen == 0 { + return nil, fmt.Errorf("failed to get URI path and query: %v", uriResult) + } else { + pathAndQueryStr = string(pathAndQuery[:pathAndQueryLen]) + } + req.Host = authorityStr + req.Method = methodStr + req.RequestURI = pathAndQueryStr + + var proto string + var protoMajor, protoMinor int + version := hyperReq.Version() + switch version { + case hyper.HTTPVersion10: + proto = "HTTP/1.0" + protoMajor = 1 + protoMinor = 0 + case hyper.HTTPVersion11: + proto = "HTTP/1.1" + protoMajor = 1 + protoMinor = 1 + case hyper.HTTPVersion2: + proto = "HTTP/2.0" + protoMajor = 2 + protoMinor = 0 + case hyper.HTTPVersionNone: + proto = "HTTP/0.0" + protoMajor = 0 + protoMinor = 0 + default: + return nil, fmt.Errorf("unknown HTTP version: %d", version) + } + req.Proto = proto + req.ProtoMajor = protoMajor + req.ProtoMinor = protoMinor + + urlStr := fmt.Sprintf("%s://%s%s", schemeStr, authorityStr, pathAndQueryStr) + url, err := url.Parse(urlStr) + if err != nil { + return nil, err + } + req.URL = url + + body := hyperReq.Body() + if body != nil { + taskFlag := getBodyTask + + bodyStream := newBodyStream(requestNotifyHandle) + req.Body = bodyStream + + taskData := taskData{ + hyperBody: body, + responseBody: nil, + bodyStream: bodyStream, + taskFlag: taskFlag, + executor: executor, + } + + requestNotifyHandle.SetData(c.Pointer(&taskData)) + fmt.Println("[debug] async task set") + + } else { + return nil, fmt.Errorf("failed to get request body") + } + + //hyperReq.Free() + + return &req, nil +} + +func addHeader(data unsafe.Pointer, name *byte, nameLen uintptr, value *byte, valueLen uintptr) c.Int { + req := (*Request)(data) + key := string(unsafe.Slice(name, nameLen)) + val := string(unsafe.Slice(value, valueLen)) + values := strings.Split(val, ",") + if len(values) > 1 { + for _, v := range values { + req.Header.Add(key, strings.TrimSpace(v)) + } + } else { + req.Header.Add(key, val) + } + return hyper.IterContinue +} diff --git a/x/net/http/response.go b/x/net/http/response.go index eceff38..a23de2e 100644 --- a/x/net/http/response.go +++ b/x/net/http/response.go @@ -32,7 +32,7 @@ type responseBodyRaw struct { type taskData struct { hyperBody *hyper.Body responseBody *responseBodyRaw - requestBody *requestBody + bodyStream *bodyStream executor *hyper.Executor taskFlag taskFlag } @@ -44,8 +44,6 @@ const ( getBodyTask ) -var DefaultChunkSize uintptr = 8192 - func newResponse(hyperChannel *hyper.ResponseChannel) *response { fmt.Printf("[debug] newResponse called\n") @@ -136,7 +134,7 @@ func (r *response) finalize() error { taskData := &taskData{ hyperBody: body, responseBody: &bodyData, - requestBody: nil, + bodyStream: nil, executor: nil, taskFlag: setBodyTask, } @@ -194,7 +192,6 @@ func setBodyDataFunc(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) return hyper.PollError } - type Response struct { Status string // e.g. "200 OK" StatusCode int // e.g. 200 @@ -234,7 +231,7 @@ func (r *Response) Cookies() []*Cookie { return readSetCookies(r.Header) } -func (r *Response) checkRespBody(taskData *taskData) (needContinue bool) { +func (r *Response) checkRespBody(taskData *clientTaskData) (needContinue bool) { pc := taskData.pc bodyWritable := r.bodyIsWritable() hasBody := taskData.req.Method != "HEAD" && r.ContentLength != 0 @@ -282,7 +279,7 @@ func (r *Response) checkRespBody(taskData *taskData) (needContinue bool) { return false } -func (r *Response) wrapRespBody(taskData *taskData) { +func (r *Response) wrapRespBody(taskData *clientTaskData) { body := &bodyEOFSignal{ body: r.Body, earlyCloseFn: func() error { diff --git a/x/net/http/server.go b/x/net/http/server.go index d530645..07a2ae7 100644 --- a/x/net/http/server.go +++ b/x/net/http/server.go @@ -29,9 +29,14 @@ import ( // well read them) const maxPostHandlerReadBytes = 256 << 10 +// _SC_NPROCESSORS_ONLN is the number of processors on the system const _SC_NPROCESSORS_ONLN c.Int = 58 -var cpuCount int +// DefaultChunkSize is the default chunk size for reading and writing data +var DefaultChunkSize uintptr = 8192 + +// cpuCount is the number of processors on the system +var cpuCount int type Handler interface { ServeHTTP(ResponseWriter, *Request) @@ -44,8 +49,8 @@ type ResponseWriter interface { } type Server struct { - Addr string - Handler Handler + Addr string + Handler Handler isShutdown atomic.Bool eventLoop []*eventLoop @@ -80,7 +85,7 @@ type serviceUserdata struct { asyncHandle *libuv.Async host [128]c.Char port [8]c.Char - executor *hyper.Executor + executor *hyper.Executor } func NewServer(addr string) *Server { @@ -130,7 +135,7 @@ func newEventLoop() (*eventLoop, error) { el.uvLoop.SetData(unsafe.Pointer(el)) if r := libuv.InitTcpEx(el.uvLoop, &el.uvServer, cnet.AF_INET); r != 0 { - return nil, fmt.Errorf("failed to init TCP: %v", libuv.Strerror(libuv.Errno(r))) + return nil, fmt.Errorf("failed to init TCP: %s", c.GoString(libuv.Strerror(libuv.Errno(r)))) } return el, nil @@ -139,19 +144,19 @@ func newEventLoop() (*eventLoop, error) { func (el *eventLoop) run(host string, port int) error { var sockaddr cnet.SockaddrIn if r := libuv.Ip4Addr(c.AllocaCStr(host), c.Int(port), &sockaddr); r != 0 { - return fmt.Errorf("failed to create IP address: %v", libuv.Strerror(libuv.Errno(r))) + return fmt.Errorf("failed to create IP address: %s", c.GoString(libuv.Strerror(libuv.Errno(r)))) } if err := setReuseAddr(&el.uvServer); err != nil { - return fmt.Errorf("failed to set SO_REUSEADDR: %v", err) + return fmt.Errorf("failed to set SO_REUSEADDR: %s", err) } if r := el.uvServer.Bind((*cnet.SockAddr)(unsafe.Pointer(&sockaddr)), 0); r != 0 { - return fmt.Errorf("failed to bind: %v", libuv.Strerror(libuv.Errno(r))) + return fmt.Errorf("failed to bind: %s", c.GoString(libuv.Strerror(libuv.Errno(r)))) } - if err := (*libuv.Stream)(&el.uvServer).Listen(128, onNewConnection); err != 0 { - return fmt.Errorf("failed to listen: %v", err) + if r := (*libuv.Stream)(&el.uvServer).Listen(128, onNewConnection); r != 0 { + return fmt.Errorf("failed to listen: %s", c.GoString(libuv.Strerror(libuv.Errno(r)))) } if r := libuv.InitIdle(el.uvLoop, &el.idleHandle); r != 0 { @@ -314,7 +319,7 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) { r := libuv.PollInit(el.uvLoop, &conn.pollHandle, libuv.OsFd(conn.stream.GetIoWatcherFd())) if r < 0 { - fmt.Fprintf(os.Stderr, "uv_poll_init error: %s\n", libuv.Strerror(libuv.Errno(r))) + fmt.Fprintf(os.Stderr, "uv_poll_init error: %s\n", c.GoString(libuv.Strerror(libuv.Errno(r)))) (*libuv.Handle)(unsafe.Pointer(&conn.stream)).Close(nil) return } @@ -482,8 +487,8 @@ func handleGetBodyTask(hyperTaskType hyper.TaskReturnType, task *hyper.Task, pay handleTaskBuffer(task, payload) case hyper.TaskEmpty: fmt.Println("[debug] Get body task closing request body") - if payload.requestBody != nil { - payload.requestBody.Close() + if payload.bodyStream != nil { + payload.bodyStream.Close() } task.Free() } @@ -513,7 +518,7 @@ func handleTaskError(task *hyper.Task) { func handleTaskBuffer(task *hyper.Task, payload *taskData) { buf := (*hyper.Buf)(task.Value()) bytes := unsafe.Slice(buf.Bytes(), buf.Len()) - payload.requestBody.readCh <- bytes + payload.bodyStream.readCh <- bytes fmt.Printf("[debug] Task get body writing to bodyWriter: %s\n", string(bytes)) buf.Free() task.Free() @@ -669,7 +674,7 @@ func updateConnRegistrations(conn *conn) bool { fmt.Printf("[debug] Starting poll with events: %d\n", events) r := conn.pollHandle.Start(events, onPoll) if r < 0 { - fmt.Fprintf(os.Stderr, "uv_poll_start error: %s\n", libuv.Strerror(libuv.Errno(r))) + fmt.Fprintf(os.Stderr, "uv_poll_start error: %s\n", c.GoString(libuv.Strerror(libuv.Errno(r)))) return false } return true diff --git a/x/net/http/transport.go b/x/net/http/transport.go index e47bd2a..1fc027f 100644 --- a/x/net/http/transport.go +++ b/x/net/http/transport.go @@ -36,7 +36,6 @@ var DefaultTransport RoundTripper = &Transport{ // DefaultMaxIdleConnsPerHost is the default value of Transport's // MaxIdleConnsPerHost. const DefaultMaxIdleConnsPerHost = 2 -const _SC_NPROCESSORS_ONLN c.Int = 58 // Debug switch provided for developers const ( @@ -98,7 +97,7 @@ type responseAndError struct { type timeoutData struct { timeoutch chan struct{} - taskData *taskData + clientTaskData *clientTaskData } type readTrackingBody struct { @@ -597,8 +596,6 @@ func getMilliseconds(deadline time.Time) uint64 { return uint64(milliseconds) } -var cpuCount int - func init() { cpuCount = int(c.Sysconf(_SC_NPROCESSORS_ONLN)) if cpuCount <= 0 { @@ -666,7 +663,7 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { libuv.InitTimer(eventLoop.loop, req.timer) ch := &timeoutData{ timeoutch: req.timeoutch, - taskData: nil, + clientTaskData: nil, } (*libuv.Handle)(c.Pointer(req.timer)).SetData(c.Pointer(ch)) @@ -1172,7 +1169,7 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err writeErrCh := make(chan error, 1) resc := make(chan responseAndError, 1) - taskData := &taskData{ + clientTaskData := &clientTaskData{ req: req, pc: pc, addedGzip: requestedGzip, @@ -1190,14 +1187,14 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err opts.Exec(pc.eventLoop.exec) // send the handshake handshakeTask := hyper.Handshake(hyperIo, opts) - taskData.taskId = handshake - handshakeTask.SetUserdata(c.Pointer(taskData), nil) + clientTaskData.taskId = handshake + handshakeTask.SetUserdata(c.Pointer(clientTaskData), nil) // Send the request to readWriteLoop(). pc.eventLoop.exec.Push(handshakeTask) //} else { // println("############### roundTrip: pc.client != nil") - // taskData.taskId = read - // err = req.write(pc.client, taskData, pc.eventLoop.exec) + // clientTaskData.taskId = read + // err = req.write(pc.client, clientTaskData, pc.eventLoop.exec) // if err != nil { // writeErrCh <- err // pc.close(err) @@ -1279,16 +1276,16 @@ func readWriteLoop(checker *libuv.Idle) { } func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { - taskData := (*taskData)(task.Userdata()) - if taskData == nil { + clientTaskData := (*clientTaskData)(task.Userdata()) + if clientTaskData == nil { // A background task for hyper_client completed... task.Free() return } var err error - pc := taskData.pc + pc := clientTaskData.pc // If original taskId is set, we need to check it - err = checkTaskType(task, taskData) + err = checkTaskType(task, clientTaskData) if err != nil { if debugSwitch { println("############### handleTask: checkTaskType err != nil") @@ -1296,7 +1293,7 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { closeAndRemoveIdleConn(pc, true) return } - switch taskData.taskId { + switch clientTaskData.taskId { case handshake: if debugReadWriteLoop { println("############### write") @@ -1314,12 +1311,12 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { task.Free() // TODO(hah) Proxy(writeLoop) - taskData.taskId = read - err = taskData.req.Request.write(pc.client, taskData, eventLoop.exec) + clientTaskData.taskId = read + err = clientTaskData.req.Request.write(pc.client, clientTaskData, eventLoop.exec) if err != nil { //pc.writeErrCh <- err // to the body reader, which might recycle us - taskData.writeErrCh <- err // to the roundTrip function + clientTaskData.writeErrCh <- err // to the roundTrip function if debugSwitch { println("############### handleTask: write err != nil") } @@ -1367,11 +1364,11 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { var resp *Response if err == nil { - pc.chunkAsync.SetData(c.Pointer(taskData)) - bc := newBodyChunk(pc.chunkAsync) - pc.bodyChunk = bc - resp, err = ReadResponse(bc, taskData.req.Request, hyperResp) - taskData.hyperBody = hyperResp.Body() + pc.chunkAsync.SetData(c.Pointer(clientTaskData)) + bc := newBodyStream(pc.chunkAsync) + pc.bodyStream = bc + resp, err = ReadResponse(bc, clientTaskData.req.Request, hyperResp) + clientTaskData.hyperBody = hyperResp.Body() } else { err = transportReadFromServerError{err} pc.closeErr = err @@ -1381,11 +1378,11 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { hyperResp.Free() if err != nil { - pc.bodyChunk.closeWithError(err) - taskData.closeHyperBody() + pc.bodyStream.closeWithError(err) + clientTaskData.closeHyperBody() select { - case taskData.resc <- responseAndError{err: err}: - case <-taskData.callerGone: + case clientTaskData.resc <- responseAndError{err: err}: + case <-clientTaskData.callerGone: if debugSwitch { println("############### handleTask read: callerGone") } @@ -1399,32 +1396,32 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { return } - taskData.taskId = readBodyChunk + clientTaskData.taskId = readBodyChunk - if !taskData.req.deadline.IsZero() { - (*timeoutData)((*libuv.Handle)(c.Pointer(taskData.req.timer)).GetData()).taskData = taskData + if !clientTaskData.req.deadline.IsZero() { + (*timeoutData)((*libuv.Handle)(c.Pointer(clientTaskData.req.timer)).GetData()).clientTaskData = clientTaskData } //pc.mu.Lock() pc.numExpectedResponses-- //pc.mu.Unlock() - needContinue := resp.checkRespBody(taskData) + needContinue := resp.checkRespBody(clientTaskData) if needContinue { return } - resp.wrapRespBody(taskData) + resp.wrapRespBody(clientTaskData) select { - case taskData.resc <- responseAndError{res: resp}: - case <-taskData.callerGone: + case clientTaskData.resc <- responseAndError{res: resp}: + case <-clientTaskData.callerGone: // defer if debugSwitch { println("############### handleTask read: callerGone 2") } - pc.bodyChunk.Close() - taskData.closeHyperBody() + pc.bodyStream.Close() + clientTaskData.closeHyperBody() closeAndRemoveIdleConn(pc, true) return } @@ -1446,7 +1443,7 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { chunk.Free() task.Free() // Write to the channel - pc.bodyChunk.readCh <- bytes + pc.bodyStream.readCh <- bytes if debugReadWriteLoop { println("############### readBodyChunk end [buf]") } @@ -1455,9 +1452,9 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { // taskType == taskEmpty (check in checkTaskType) task.Free() - pc.bodyChunk.closeWithError(io.EOF) - taskData.closeHyperBody() - replaced := pc.t.replaceReqCanceler(taskData.req.cancelKey, nil) // before pc might return to idle pool + pc.bodyStream.closeWithError(io.EOF) + clientTaskData.closeHyperBody() + replaced := pc.t.replaceReqCanceler(clientTaskData.req.cancelKey, nil) // before pc might return to idle pool pc.alive = pc.alive && replaced && pc.tryPutIdleConn() @@ -1473,10 +1470,10 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { } func readyToRead(aysnc *libuv.Async) { - taskData := (*taskData)(aysnc.GetData()) - dataTask := taskData.hyperBody.Data() - dataTask.SetUserdata(c.Pointer(taskData), nil) - taskData.pc.eventLoop.exec.Push(dataTask) + clientTaskData := (*clientTaskData)(aysnc.GetData()) + dataTask := clientTaskData.hyperBody.Data() + dataTask.SetUserdata(c.Pointer(clientTaskData), nil) + clientTaskData.pc.eventLoop.exec.Push(dataTask) } // closeAndRemoveIdleConn Replace the defer function of readLoop in stdlib @@ -1504,7 +1501,7 @@ type connData struct { isClosing atomic.Bool } -type taskData struct { +type clientTaskData struct { taskId taskId req *transportRequest pc *persistConn @@ -1545,7 +1542,7 @@ func (conn *connData) Close() { } } -func (d *taskData) closeHyperBody() { +func (d *clientTaskData) closeHyperBody() { if d.hyperBody != nil { d.hyperBody.Free() d.hyperBody = nil @@ -1666,11 +1663,11 @@ func onTimeout(timer *libuv.Timer) { close(data.timeoutch) timer.Stop() - taskData := data.taskData - if taskData != nil { - pc := taskData.pc + clientTaskData := data.clientTaskData + if clientTaskData != nil { + pc := clientTaskData.pc pc.alive = false - pc.t.cancelRequest(taskData.req.cancelKey, errors.New("timeout: req.Context().Err()")) + pc.t.cancelRequest(clientTaskData.req.cancelKey, errors.New("timeout: req.Context().Err()")) closeAndRemoveIdleConn(pc, true) } } @@ -1685,8 +1682,8 @@ func newHyperIo(connData *connData) *hyper.Io { } // checkTaskType checks the task type -func checkTaskType(task *hyper.Task, taskData *taskData) (err error) { - curTaskId := taskData.taskId +func checkTaskType(task *hyper.Task, clientTaskData *clientTaskData) (err error) { + curTaskId := clientTaskData.taskId taskType := task.Type() if taskType == hyper.TaskError { err = fail((*hyper.Error)(task.Value()), curTaskId) @@ -1710,18 +1707,18 @@ func checkTaskType(task *hyper.Task, taskData *taskData) (err error) { if err != nil { task.Free() if curTaskId == handshake || curTaskId == read { - taskData.writeErrCh <- err + clientTaskData.writeErrCh <- err if debugSwitch { println("############### checkTaskType: writeErrCh") } - taskData.pc.close(err) + clientTaskData.pc.close(err) } - if taskData.pc.bodyChunk != nil { - taskData.pc.bodyChunk.Close() - taskData.pc.bodyChunk = nil + if clientTaskData.pc.bodyStream != nil { + clientTaskData.pc.bodyStream.Close() + clientTaskData.pc.bodyStream = nil } - taskData.closeHyperBody() - taskData.pc.alive = false + clientTaskData.closeHyperBody() + clientTaskData.pc.alive = false } return } @@ -1919,7 +1916,7 @@ type persistConn struct { closeErr error // Replace the closeErr in readLoop tryPutIdleConn func() bool // Replace the tryPutIdleConn in readLoop client *hyper.ClientConn // http long connection client handle - bodyChunk *bodyChunk // Implement non-blocking consumption of each responseBody chunk + bodyStream *bodyStream // Implement non-blocking consumption of each responseBody chunk chunkAsync *libuv.Async // Notifying that the received chunk has been read } diff --git a/x/net/http/util.go b/x/net/http/util.go index bfd9fc3..c7fc4dc 100644 --- a/x/net/http/util.go +++ b/x/net/http/util.go @@ -5,9 +5,8 @@ import ( "unicode" "unicode/utf8" - "golang.org/x/net/idna" - "github.com/goplus/llgoexamples/x/net" + "golang.org/x/net/idna" ) /**