Skip to content

Commit

Permalink
refactor(x/net/http/demo): Implement requestBody logic and intergrate…
Browse files Browse the repository at this point in the history
… with main loop

Signed-off-by: hackerchai <[email protected]>
  • Loading branch information
hackerchai committed Sep 11, 2024
1 parent 75c69d5 commit 0250377
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 209 deletions.
18 changes: 9 additions & 9 deletions x/net/http/_demo/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
)

func echoHandler(w http.ResponseWriter, r *http.Request) {
fmt.Printf("echoHandler called\n")
fmt.Printf("> %s %s HTTP/%d.%d\n", r.Method, r.RequestURI, r.ProtoMajor, r.ProtoMinor)
fmt.Printf("[debug] echoHandler called\n")
fmt.Printf(">> %s %s HTTP/%d.%d\n", r.Method, r.RequestURI, r.ProtoMajor, r.ProtoMinor)
for key, values := range r.Header {
for _, value := range values {
fmt.Printf("> %s: %s\n", key, value)
fmt.Printf(">> %s: %s\n", key, value)
}
}
fmt.Printf("URL: %s\n", r.URL.String())
fmt.Printf("RemoteAddr: %s\n", r.RemoteAddr)
fmt.Printf(">> URL: %s\n", r.URL.String())
fmt.Printf(">> RemoteAddr: %s\n", r.RemoteAddr)
// fmt.Println("ContentLength: %d", r.ContentLength)
// fmt.Println("TransferEncoding: %s", r.TransferEncoding)

Expand All @@ -25,7 +25,8 @@ func echoHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Error reading request body", http.StatusInternalServerError)
return
}

defer r.Body.Close()

// var body []byte
// buffer := make([]byte, 1024)
// for {
Expand All @@ -40,9 +41,8 @@ func echoHandler(w http.ResponseWriter, r *http.Request) {
// }
// }


r.Body.Close()
fmt.Println("body read")
fmt.Printf(">> Body: %s\n", string(body))
fmt.Println("[debug] body read done")
w.Header().Set("Content-Type", "text/plain")
w.Write(body)
}
Expand Down
39 changes: 7 additions & 32 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Request struct {
}

func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
println("readRequest called")
println("[debug] readRequest called")
req := Request{
Header: make(Header),
timeout: 0,
Expand Down Expand Up @@ -135,11 +135,6 @@ func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {

body := hyperReq.Body()
if body != nil {
fmt.Println("Body is not nil!!!!!!!")
requestBody := newRequestBody()
conn.requestBody = requestBody
req.Body = requestBody
//task := body.Foreach(getBodyChunk, c.Pointer(conn.bodyWriter), nil)
task := body.Data()
taskID := taskGetBody
taskData := taskData{
Expand All @@ -149,6 +144,12 @@ func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
hyperTaskID: taskID,
}
task.SetUserdata(c.Pointer(&taskData), nil)
requestBody := newRequestBody(conn.asyncHandle)
conn.requestBody = requestBody
req.Body = requestBody

conn.asyncHandle.SetData(c.Pointer(&taskData))
fmt.Println("[debug] async task set")
if task != nil {
r := conn.executor.Push(task)
if r != hyper.OK {
Expand Down Expand Up @@ -183,29 +184,3 @@ func addHeader(data unsafe.Pointer, name *byte, nameLen uintptr, value *byte, va
}
return hyper.IterContinue
}

// func getBodyChunk(userdata c.Pointer, chunk *hyper.Buf) c.Int {
// fmt.Printf("getBodyChunk called\n")
// writer := (*io.PipeWriter)(userdata)
// if writer == nil {
// fmt.Printf("writer is nil\n")
// return hyper.IterBreak
// }
// buf := chunk.Bytes()
// len := chunk.Len()
// bytes := unsafe.Slice(buf, len)
// //debug
// fmt.Printf("Writing %d bytes to response body\n", len)
// fmt.Printf("Body chunk: %s\n", string(bytes))

// go func() {
// count, err := writer.Write(bytes)
// fmt.Printf("Body chunk written: %d bytes\n", count)
// if err != nil {
// fmt.Println("Error writing to response body:", err)
// writer.Close()
// }
// }()

// return hyper.IterContinue
// }
77 changes: 31 additions & 46 deletions x/net/http/request_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"sync"

"github.com/goplus/llgo/c/libuv"

Check warning on line 9 in x/net/http/request_body.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/request_body.go#L9

could not import github.com/goplus/llgo/c/libuv (-: # github.com/goplus/llgo/c/libuv
)

type onceError struct {
Expand All @@ -28,9 +30,9 @@ func (a *onceError) Load() error {
}

type requestBody struct {
chunk []byte
readCh chan []byte
readyCh chan struct{}
chunk []byte
readCh chan []byte
asyncHandle *libuv.Async

once sync.Once
done chan struct{}
Expand All @@ -42,60 +44,42 @@ var (
ErrClosedRequestBody = errors.New("request body: read/write on closed body")
)

func newRequestBody() *requestBody {
func newRequestBody(asyncHandle *libuv.Async) *requestBody {
return &requestBody{
readCh: make(chan []byte, 1),
readyCh: make(chan struct{}, 1),
done: make(chan struct{}),
readCh: make(chan []byte, 1),
done: make(chan struct{}),
asyncHandle: asyncHandle,
}
}

func (rb *requestBody) Read(p []byte) (n int, err error) {
fmt.Println("RequestBody Read called")
select {
case <-rb.done:
fmt.Println("Read done")
return 0, rb.rerr.Load()
default:
}

fmt.Println("[debug] requestBody.Read called")
// If there are still unread chunks, read them first
if len(rb.chunk) > 0 {
fmt.Println("Read remaining chunk")
n = copy(p, rb.chunk)
rb.chunk = rb.chunk[n:]
if len(rb.chunk) > 0 {
return
}
return n, nil
}

fmt.Println("readyCh waiting")
// Attempt to read a new chunk from a channel
select {
case rb.readyCh <- struct{}{}:
fmt.Println("readyCh signaled")
default:
fmt.Println("readyCh skipped (channel full)")
}

select {
case rb.chunk = <-rb.readCh:
fmt.Printf("Read chunk received: %s\n", string(rb.chunk))
case <-rb.done:
return 0, rb.rerr.Load()
default:
if len(rb.chunk) == 0 {
fmt.Println("Read ended")
return 0, io.EOF
case chunk, ok := <-rb.readCh:
if !ok {
// The channel has been closed, indicating that all data has been read
return 0, rb.readCloseError()
}
n = copy(p, chunk)
if n < len(chunk) {
// If the capacity of p is insufficient to hold the whole chunk, save the rest of the chunk
rb.chunk = chunk[n:]
}
fmt.Println("[debug] requestBody.Read async send")
rb.asyncHandle.Send()
return n, nil
case <-rb.done:
// If the done channel is closed, the read needs to be terminated
return 0, rb.readCloseError()
}
fmt.Printf("Read chunk received: %s\n", string(rb.chunk))
if len(rb.chunk) == 0 {
fmt.Println("Read ended")
return 0, io.EOF
}
n = copy(p, rb.chunk)
rb.chunk = rb.chunk[n:]
fmt.Printf("Read chunk copied: %d bytes\n", n)
return
}

func (rb *requestBody) readCloseError() error {
Expand All @@ -106,14 +90,15 @@ func (rb *requestBody) readCloseError() error {
}

func (rb *requestBody) closeRead(err error) error {
fmt.Println("closeRead called")
fmt.Println("[debug] RequestBody closeRead called")
if err == nil {
err = ErrClosedRequestBody
err = io.EOF
}
rb.rerr.Store(err)
rb.once.Do(func() {
close(rb.done)
})
//close(rb.done)
return nil
}

Expand Down
Loading

0 comments on commit 0250377

Please sign in to comment.