Skip to content

Commit

Permalink
refactor(x/net/http): Temporarily fix pipe write stuck
Browse files Browse the repository at this point in the history
Signed-off-by: hackerchai <[email protected]>
  • Loading branch information
hackerchai committed Sep 5, 2024
1 parent f03b73f commit 3f94d3f
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 83 deletions.
32 changes: 25 additions & 7 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
if body != nil {
req.Body, conn.bodyWriter = io.Pipe()
task := body.Foreach(getBodyChunk, c.Pointer(conn.bodyWriter), nil)
taskData := taskData {
body: nil,
conn: conn,
hyperTaskID: taskGetBody,
}
task.SetUserdata(c.Pointer(&taskData), nil)
if task != nil {
r := conn.executor.Push(task)
if r != hyper.OK {
Expand Down Expand Up @@ -175,20 +181,32 @@ func addHeader(data unsafe.Pointer, name *byte, nameLen uintptr, value *byte, va
func getBodyChunk(userdata c.Pointer, chunk *hyper.Buf) c.Int {
fmt.Printf("getBodyChunk called\n")
writer := (*io.PipeWriter)(userdata)
if writer == nil {
fmt.Printf("writer is nil\n")
return hyper.IterBreak
}
buf := chunk.Bytes()
len := chunk.Len()
bytes := unsafe.Slice(buf, len)
//debug
fmt.Printf("Writing %d bytes to response body\n", len)
fmt.Printf("Body chunk: %s\n", string(bytes))

_, err := writer.Write(bytes)
fmt.Printf("Body chunk written\n")
if err != nil {
fmt.Println("Error writing to response body:", err)
writer.Close()
return hyper.IterBreak
}
go func() {
count, err := writer.Write(bytes)
fmt.Printf("Body chunk written: %d bytes\n", count)
if err != nil {
fmt.Println("Error writing to response body:", err)
writer.Close()
}
}()
// count, err := writer.Write(bytes)
// fmt.Printf("Body chunk written: %d bytes\n", count)
// if err != nil {
// fmt.Println("Error writing to response body:", err)
// writer.Close()
// return hyper.IterBreak
// }

return hyper.IterContinue
}
35 changes: 32 additions & 3 deletions x/net/http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type response struct {
body []byte
channel *hyper.ResponseChannel
resp *hyper.Response
request *Request
}

type body struct {
Expand All @@ -24,13 +25,29 @@ type body struct {
readLen uintptr
}

type taskData struct {
body *body
conn *conn
hyperTaskID
}

type hyperTaskID int

const (
taskSetBody hyperTaskID = iota
taskGetBody
)



var DefaultChunkSize uintptr = 8192

func newResponse(channel *hyper.ResponseChannel) *response {
func newResponse(request *Request, channel *hyper.ResponseChannel) *response {
fmt.Printf("newResponse called\n")
resp := response{
header: make(Header),
channel: channel,
request: request,
}
return &resp
}
Expand Down Expand Up @@ -90,6 +107,12 @@ func (r *response) WriteHeader(statusCode int) {

func (r *response) finalize() error {
fmt.Printf("finalize called\n")
err := r.request.Body.Close()
if err != nil {
return err
}
fmt.Printf("request body closed\n")

if !r.written {
r.WriteHeader(200)
}
Expand All @@ -105,8 +128,13 @@ func (r *response) finalize() error {
if body == nil {
return fmt.Errorf("failed to create body")
}
taskData := taskData{
body: &bodyData,
conn: nil,
hyperTaskID: taskSetBody,
}
body.SetDataFunc(setBodyDataFunc)
body.SetUserdata(unsafe.Pointer(&bodyData), nil)
body.SetUserdata(unsafe.Pointer(&taskData), nil)
fmt.Printf("bodyData userdata set\n")

fmt.Printf("bodyData set\n")
Expand All @@ -124,12 +152,13 @@ func (r *response) finalize() error {

func setBodyDataFunc(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) c.Int {
fmt.Printf("setBodyDataFunc called\n")
body := (*body)(userdata)
body := (*taskData)(userdata).body

if body.len > 0 {
//debug
fmt.Println("<")
fmt.Printf("%s", string(body.data))
fmt.Println("")

if body.len > DefaultChunkSize {
*chunk = hyper.CopyBuf(&body.data[body.readLen], DefaultChunkSize)
Expand Down
Loading

0 comments on commit 3f94d3f

Please sign in to comment.