diff --git a/x/net/http/request.go b/x/net/http/request.go index 58c1c82..3179e55 100644 --- a/x/net/http/request.go +++ b/x/net/http/request.go @@ -35,7 +35,7 @@ type Request struct { timeout time.Duration } -func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request, error) { +func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) { println("readRequest called") req := Request{ Header: make(Header), @@ -135,16 +135,17 @@ func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request body := hyperReq.Body() if body != nil { + fmt.Println("Body is not nil!!!!!!!") requestBody := newRequestBody() conn.requestBody = requestBody req.Body = requestBody //task := body.Foreach(getBodyChunk, c.Pointer(conn.bodyWriter), nil) task := body.Data() taskID := taskGetBody - taskData := taskData { - hyperBody: body, - body: nil, - conn: conn, + taskData := taskData{ + hyperBody: body, + body: nil, + conn: conn, hyperTaskID: taskID, } task.SetUserdata(c.Pointer(&taskData), nil) @@ -163,7 +164,7 @@ func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request return nil, fmt.Errorf("failed to get request body") } - defer hyperReq.Free() + hyperReq.Free() return &req, nil } diff --git a/x/net/http/request_body.go b/x/net/http/request_body.go new file mode 100644 index 0000000..4a85ccd --- /dev/null +++ b/x/net/http/request_body.go @@ -0,0 +1,122 @@ +package http + +import ( + "errors" + "fmt" + "io" + "sync" +) + +type onceError struct { + sync.Mutex + err error +} + +func (a *onceError) Store(err error) { + a.Lock() + defer a.Unlock() + if a.err != nil { + return + } + a.err = err +} + +func (a *onceError) Load() error { + a.Lock() + defer a.Unlock() + return a.err +} + +type requestBody struct { + chunk []byte + readCh chan []byte + readyCh chan struct{} + + once sync.Once + done chan struct{} + + rerr onceError +} + +var ( + ErrClosedRequestBody = errors.New("request body: read/write on closed body") +) + +func newRequestBody() *requestBody { + return &requestBody{ + readCh: make(chan []byte, 1), + readyCh: make(chan struct{}, 1), + done: make(chan struct{}), + } +} + +func (rb *requestBody) Read(p []byte) (n int, err error) { + fmt.Println("RequestBody Read called") + select { + case <-rb.done: + fmt.Println("Read done") + return 0, rb.rerr.Load() + default: + } + + if len(rb.chunk) > 0 { + fmt.Println("Read remaining chunk") + n = copy(p, rb.chunk) + rb.chunk = rb.chunk[n:] + if len(rb.chunk) > 0 { + return + } + } + + fmt.Println("readyCh waiting") + select { + case rb.readyCh <- struct{}{}: + fmt.Println("readyCh signaled") + default: + fmt.Println("readyCh skipped (channel full)") + } + + select { + case rb.chunk = <-rb.readCh: + fmt.Printf("Read chunk received: %s\n", string(rb.chunk)) + case <-rb.done: + return 0, rb.rerr.Load() + default: + if len(rb.chunk) == 0 { + fmt.Println("Read ended") + return 0, io.EOF + } + } + fmt.Printf("Read chunk received: %s\n", string(rb.chunk)) + if len(rb.chunk) == 0 { + fmt.Println("Read ended") + return 0, io.EOF + } + n = copy(p, rb.chunk) + rb.chunk = rb.chunk[n:] + fmt.Printf("Read chunk copied: %d bytes\n", n) + return +} + +func (rb *requestBody) readCloseError() error { + if rerr := rb.rerr.Load(); rerr != nil { + return rerr + } + return ErrClosedRequestBody +} + +func (rb *requestBody) closeRead(err error) error { + fmt.Println("closeRead called") + if err == nil { + err = ErrClosedRequestBody + } + rb.rerr.Store(err) + rb.once.Do(func() { + close(rb.done) + }) + return nil +} + +func (rb *requestBody) Close() error { + return rb.closeRead(nil) +} diff --git a/x/net/http/response.go b/x/net/http/response.go index 4d150ba..0f3cb49 100644 --- a/x/net/http/response.go +++ b/x/net/http/response.go @@ -29,7 +29,7 @@ type taskData struct { hyperBody *hyper.Body body *body conn *conn - hyperTaskID + hyperTaskID hyperTaskID } type hyperTaskID int @@ -39,8 +39,6 @@ const ( taskGetBody ) - - var DefaultChunkSize uintptr = 8192 func newResponse(request *Request, channel *hyper.ResponseChannel) *response { @@ -95,12 +93,12 @@ func (r *response) WriteHeader(statusCode int) { } //debug - // fmt.Printf("< HTTP/1.1 %d\n", statusCode) - // for key, values := range r.header { - // for _, value := range values { - // fmt.Printf("< %s: %s\n", key, value) - // } - // } + fmt.Printf("< HTTP/1.1 %d\n", statusCode) + for key, values := range r.header { + for _, value := range values { + fmt.Printf("< %s: %s\n", key, value) + } + } } func (r *response) finalize() error { diff --git a/x/net/http/server.go b/x/net/http/server.go index 23ccdd1..0455f7d 100644 --- a/x/net/http/server.go +++ b/x/net/http/server.go @@ -16,7 +16,6 @@ import ( "github.com/goplus/llgo/c/syscall" "github.com/goplus/llgo/rust/hyper" "github.com/goplus/llgo/x/net" - "github.com/goplus/llgo/x/net/http/response_stream" ) type Handler interface { @@ -41,9 +40,6 @@ type Server struct { mu sync.Mutex activeConnections map[*conn]struct{} - - channels []<-chan struct{} - channelMutex sync.RWMutex } type conn struct { @@ -61,34 +57,6 @@ type conn struct { requestBody *requestBody } -type requestBody struct { - chunk []byte - readCh chan []byte - readToReadCh chan struct{} -} - -func newRequestBody() *requestBody { - return &requestBody{ - readCh: make(chan []byte, 1), - readToReadCh: make(chan struct{}, 1), - } -} - -func (rb *requestBody) Read(p []byte) (n int, err error) { - if len(rb.chunk) == 0 { - n = copy(p, rb.chunk) - rb.chunk = rb.chunk[n:] - if len(rb.chunk) > 0 { - return - } - } - rb.readToReadCh <- struct{}{} - rb.chunk = <-rb.readCh - n = copy(p, rb.chunk) - rb.chunk = rb.chunk[n:] - return -} - type serviceUserdata struct { host [128]c.Char port [8]c.Char @@ -336,55 +304,16 @@ func onIdle(handle *libuv.Idle) { //fmt.Println("onIdle called") srv := (*Server)((*libuv.Handle)(unsafe.Pointer(handle)).GetData()) for conn := range srv.activeConnections { - //fmt.Println("onIdle conn called") if conn.executor != nil { task := conn.executor.Poll() - select { - case <-conn.requestBody.readToReadCh: - fmt.Println("readToReadCh signaled") - payload := (*taskData)(task.Userdata()) - if payload == nil { - fmt.Println("taskData is nil") - task.Free() - return - } - - taskID := payload.hyperTaskID - - fmt.Printf("taskID: %d\n", taskID) - - fmt.Println("taskGetBody get body task") - getBodyTask := payload.hyperBody.Data() - getBodyTask.SetUserdata(c.Pointer(payload), nil) - if getBodyTask != nil { - fmt.Println("taskGetBody push get body task") - r := payload.conn.executor.Push(getBodyTask) - fmt.Printf("taskGetBody push get body task: %d\n", r) - if r != hyper.OK { - fmt.Printf("failed to push get body task: %d\n", r) - getBodyTask.Free() - } - } - default: - fmt.Println("readToReadCh not signaled") - } for task != nil { - srv.handleTask(task) + srv.handleTask(conn, task) task = conn.executor.Poll() + srv.handleRead(conn, task) } } } - srv.channelMutex.RLock() - for _, ch := range srv.channels { - select { - case <-ch: - fmt.Printf("Received signal from channel\n") - default: - } - } - srv.channelMutex.RUnlock() - if srv.shuttingDown() { fmt.Println("Shutdown initiated, cleaning up...") handle.Stop() @@ -399,7 +328,7 @@ func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *h return } - req, err := userData.conn.readRequest(userData.server, hyperReq) + req, err := userData.conn.readRequest(hyperReq) if err != nil { fmt.Printf("Error creating request: %v\n", err) return @@ -418,82 +347,124 @@ func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *h // res.finalize() } -func (srv *Server) handleTask(task *hyper.Task) { - - taskType := task.Type() - fmt.Printf("taskType: %d\n", taskType) +func (srv *Server) handleRead(conn *conn, task *hyper.Task) { payload := (*taskData)(task.Userdata()) if payload == nil { - fmt.Println("taskData is nil") - task.Free() + fmt.Println("taskData is nil, no need to handle read") return } - - taskID := payload.hyperTaskID - - fmt.Printf("taskID: %d\n", taskID) - - if taskID == taskGetBody { - fmt.Println("taskGetBody called") - if taskType == hyper.TaskError { - fmt.Println("taskGetBody error") - err := (*hyper.Error)(task.Value()) - fmt.Printf("error code: %d\n", err.Code()) - - var errbuf [256]byte - errlen := err.Print(&errbuf[0], unsafe.Sizeof(errbuf)) - fmt.Printf("details: %s\n", errbuf[:errlen]) - err.Free() - task.Free() - } - - if taskType == hyper.TaskBuf { - fmt.Println("taskGetBody write buf") - buf := (*hyper.Buf)(task.Value()) - bytes := unsafe.Slice(buf.Bytes(), buf.Len()) - payload.conn.requestBody.readCh <- bytes - fmt.Println("taskGetBody wrote to bodyWriter") - buf.Free() - task.Free() - fmt.Println("taskGetBody free task") - - fmt.Println("taskGetBody get body task") - getBodyTask := payload.hyperBody.Data() - getBodyTask.SetUserdata(c.Pointer(payload), nil) - if getBodyTask != nil { - fmt.Println("taskGetBody push get body task") - r := payload.conn.executor.Push(getBodyTask) - fmt.Printf("taskGetBody push get body task: %d\n", r) - if r != hyper.OK { - fmt.Printf("failed to push get body task: %d\n", r) - getBodyTask.Free() - } + + select { + case <-conn.requestBody.readyCh: + fmt.Println("readyCh signaled") + + fmt.Println("taskGetBody get body task form readyCh") + getBodyTask := payload.hyperBody.Data() + getBodyTask.SetUserdata(c.Pointer(payload), nil) + if getBodyTask != nil { + fmt.Println("taskGetBody push get body task") + r := payload.conn.executor.Push(getBodyTask) + fmt.Printf("taskGetBody push get body task: %d\n", r) + if r != hyper.OK { + fmt.Printf("failed to push get body task: %d\n", r) + getBodyTask.Free() } } + default: + fmt.Println("readToReadCh not signaled") + } +} - if taskType == hyper.TaskEmpty { - fmt.Println("taskGetBody close bodyWriter") - payload.conn.bodyWriter.Close() - fmt.Println("taskGetBody free task") - task.Free() - } - } else if taskID == taskSetBody { - fmt.Println("taskSetBody called") - if taskType == hyper.TaskError { - fmt.Println("taskSetBody error") - err := (*hyper.Error)(task.Value()) - fmt.Printf("error code: %d\n", err.Code()) - - var errbuf [256]byte - errlen := err.Print(&errbuf[0], unsafe.Sizeof(errbuf)) - fmt.Printf("details: %s\n", errbuf[:errlen]) - err.Free() - task.Free() - } +func (srv *Server) handleTask(conn *conn, task *hyper.Task) { + taskType := task.Type() + //debug + switch taskType { + case hyper.TaskEmpty: + fmt.Println("Task type: Empty") + case hyper.TaskBuf: + fmt.Println("Task type: Buffer") + case hyper.TaskError: + fmt.Println("Task type: Error") + case hyper.TaskServerconn: + fmt.Println("Task type: Serverconn") + default: + fmt.Println("Unknown task type") + } - if taskType == hyper.TaskEmpty { - fmt.Println("taskSetBody free task") - task.Free() + payload := (*taskData)(task.Userdata()) + if payload != nil { + taskID := payload.hyperTaskID + + // select { + // case <-conn.requestBody.readyCh: + // fmt.Println("readyCh recieved") + + // fmt.Println("taskGetBody get body task form readyCh") + // getBodyTask := payload.hyperBody.Data() + // getBodyTask.SetUserdata(c.Pointer(payload), nil) + // if getBodyTask != nil { + // fmt.Println("taskGetBody push get body task") + // r := payload.conn.executor.Push(getBodyTask) + // fmt.Printf("taskGetBody push get body task: %d\n", r) + // if r != hyper.OK { + // fmt.Printf("failed to push get body task: %d\n", r) + // getBodyTask.Free() + // } + // } + // default: + // fmt.Println("readyCh not recieved") + // } + + if taskID == taskGetBody { + fmt.Println("taskGetBody called") + if taskType == hyper.TaskError { + fmt.Println("taskGetBody error") + err := (*hyper.Error)(task.Value()) + fmt.Printf("error code: %d\n", err.Code()) + + var errbuf [256]byte + errlen := err.Print(&errbuf[0], unsafe.Sizeof(errbuf)) + fmt.Printf("details: %s\n", errbuf[:errlen]) + err.Free() + task.Free() + } + + if taskType == hyper.TaskBuf { + fmt.Println("taskGetBody write buf") + buf := (*hyper.Buf)(task.Value()) + bytes := unsafe.Slice(buf.Bytes(), buf.Len()) + fmt.Printf("taskGetBody writing to bodyWriter: %s\n", string(bytes)) + buf.Free() + task.Free() + fmt.Println("taskGetBody free task") + payload.conn.requestBody.readCh <- bytes + fmt.Println("taskGetBody wrote to bodyWriter") + } + + if taskType == hyper.TaskEmpty { + fmt.Println("taskGetBody close requestBody") + payload.conn.requestBody.Close() + fmt.Println("taskGetBody free task") + task.Free() + } + } else if taskID == taskSetBody { + fmt.Println("taskSetBody called") + if taskType == hyper.TaskError { + fmt.Println("taskSetBody error") + err := (*hyper.Error)(task.Value()) + fmt.Printf("error code: %d\n", err.Code()) + + var errbuf [256]byte + errlen := err.Print(&errbuf[0], unsafe.Sizeof(errbuf)) + fmt.Printf("details: %s\n", errbuf[:errlen]) + err.Free() + task.Free() + } + + if taskType == hyper.TaskEmpty { + fmt.Println("taskSetBody free task") + task.Free() + } } }