Skip to content

Commit

Permalink
refactor(x/net/http): Implement requestBody logic
Browse files Browse the repository at this point in the history
Signed-off-by: hackerchai <[email protected]>
  • Loading branch information
hackerchai committed Sep 10, 2024
1 parent 94af890 commit 75c69d5
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 157 deletions.
13 changes: 7 additions & 6 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Request struct {
timeout time.Duration
}

func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request, error) {
func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
println("readRequest called")
req := Request{
Header: make(Header),
Expand Down Expand Up @@ -135,16 +135,17 @@ func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request

body := hyperReq.Body()
if body != nil {
fmt.Println("Body is not nil!!!!!!!")
requestBody := newRequestBody()
conn.requestBody = requestBody
req.Body = requestBody
//task := body.Foreach(getBodyChunk, c.Pointer(conn.bodyWriter), nil)
task := body.Data()
taskID := taskGetBody
taskData := taskData {
hyperBody: body,
body: nil,
conn: conn,
taskData := taskData{
hyperBody: body,
body: nil,
conn: conn,
hyperTaskID: taskID,
}
task.SetUserdata(c.Pointer(&taskData), nil)
Expand All @@ -163,7 +164,7 @@ func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request
return nil, fmt.Errorf("failed to get request body")
}

defer hyperReq.Free()
hyperReq.Free()

return &req, nil
}
Expand Down
122 changes: 122 additions & 0 deletions x/net/http/request_body.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package http

import (
"errors"
"fmt"
"io"
"sync"
)

type onceError struct {
sync.Mutex
err error
}

func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err
}

func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err
}

type requestBody struct {
chunk []byte
readCh chan []byte
readyCh chan struct{}

once sync.Once
done chan struct{}

rerr onceError
}

var (
ErrClosedRequestBody = errors.New("request body: read/write on closed body")
)

func newRequestBody() *requestBody {
return &requestBody{
readCh: make(chan []byte, 1),
readyCh: make(chan struct{}, 1),
done: make(chan struct{}),
}
}

func (rb *requestBody) Read(p []byte) (n int, err error) {
fmt.Println("RequestBody Read called")
select {
case <-rb.done:
fmt.Println("Read done")
return 0, rb.rerr.Load()
default:
}

if len(rb.chunk) > 0 {
fmt.Println("Read remaining chunk")
n = copy(p, rb.chunk)
rb.chunk = rb.chunk[n:]
if len(rb.chunk) > 0 {
return
}
}

fmt.Println("readyCh waiting")
select {
case rb.readyCh <- struct{}{}:
fmt.Println("readyCh signaled")
default:
fmt.Println("readyCh skipped (channel full)")
}

select {
case rb.chunk = <-rb.readCh:
fmt.Printf("Read chunk received: %s\n", string(rb.chunk))
case <-rb.done:
return 0, rb.rerr.Load()
default:
if len(rb.chunk) == 0 {
fmt.Println("Read ended")
return 0, io.EOF
}
}
fmt.Printf("Read chunk received: %s\n", string(rb.chunk))
if len(rb.chunk) == 0 {
fmt.Println("Read ended")
return 0, io.EOF
}
n = copy(p, rb.chunk)
rb.chunk = rb.chunk[n:]
fmt.Printf("Read chunk copied: %d bytes\n", n)
return
}

func (rb *requestBody) readCloseError() error {
if rerr := rb.rerr.Load(); rerr != nil {
return rerr
}
return ErrClosedRequestBody
}

func (rb *requestBody) closeRead(err error) error {
fmt.Println("closeRead called")
if err == nil {
err = ErrClosedRequestBody
}
rb.rerr.Store(err)
rb.once.Do(func() {
close(rb.done)
})
return nil
}

func (rb *requestBody) Close() error {
return rb.closeRead(nil)
}
16 changes: 7 additions & 9 deletions x/net/http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type taskData struct {
hyperBody *hyper.Body
body *body
conn *conn
hyperTaskID
hyperTaskID hyperTaskID
}

type hyperTaskID int
Expand All @@ -39,8 +39,6 @@ const (
taskGetBody
)



var DefaultChunkSize uintptr = 8192

func newResponse(request *Request, channel *hyper.ResponseChannel) *response {
Expand Down Expand Up @@ -95,12 +93,12 @@ func (r *response) WriteHeader(statusCode int) {
}

//debug
// fmt.Printf("< HTTP/1.1 %d\n", statusCode)
// for key, values := range r.header {
// for _, value := range values {
// fmt.Printf("< %s: %s\n", key, value)
// }
// }
fmt.Printf("< HTTP/1.1 %d\n", statusCode)
for key, values := range r.header {
for _, value := range values {
fmt.Printf("< %s: %s\n", key, value)
}
}
}

func (r *response) finalize() error {
Expand Down
Loading

0 comments on commit 75c69d5

Please sign in to comment.