Skip to content

Commit

Permalink
refactor(x/net/http): Multiple fixes & adapt bodyStream struct
Browse files Browse the repository at this point in the history
Signed-off-by: hackerchai <[email protected]>
  • Loading branch information
hackerchai committed Sep 20, 2024
1 parent 6fdc040 commit 4bf50fe
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 88 deletions.
147 changes: 143 additions & 4 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"net/url"
"strings"
"time"

"github.com/goplus/llgo/c/libuv"
"golang.org/x/net/idna"
"unsafe"

"github.com/goplus/llgo/c"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgoexamples/rust/hyper"
"golang.org/x/net/idna"
)

type Request struct {
Expand Down Expand Up @@ -230,7 +230,7 @@ func (r *Request) ProtoAtLeast(major, minor int) bool {
// extraHeaders may be nil
// waitForContinue may be nil
// always closes body
func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hyper.Executor) (err error) {
func (r *Request) write(client *hyper.ClientConn, taskData *clientTaskData, exec *hyper.Executor) (err error) {
//trace := httptrace.ContextClientTrace(r.Context())
//if trace != nil && trace.WroteRequest != nil {
// defer func() {
Expand Down Expand Up @@ -488,3 +488,142 @@ func valueOrDefault(value, def string) string {
}
return def
}

func readRequest(executor *hyper.Executor, hyperReq *hyper.Request, requestNotifyHandle *libuv.Async, remoteAddr string) (*Request, error) {
println("[debug] readRequest called")
req := Request{
Header: make(Header),
Body: nil,
}
req.RemoteAddr = remoteAddr

headers := hyperReq.Headers()
if headers != nil {
headers.Foreach(addHeader, unsafe.Pointer(&req))
} else {
return nil, fmt.Errorf("failed to get request headers")
}

var host string
for key, values := range req.Header {
if strings.EqualFold(key, "Host") {
if len(values) > 0 {
host = values[0]
break
}
}

}

method := make([]byte, 32)
methodLen := unsafe.Sizeof(method)
if err := hyperReq.Method(&method[0], &methodLen); err != hyper.OK {

Check failure on line 520 in x/net/http/request.go

View workflow job for this annotation

GitHub Actions / build

hyperReq.Method undefined (type *hyper.Request has no field or method Method)

Check failure on line 520 in x/net/http/request.go

View workflow job for this annotation

GitHub Actions / build

hyperReq.Method undefined (type *hyper.Request has no field or method Method)

Check warning on line 520 in x/net/http/request.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/request.go#L520

hyperReq.Method undefined (type *hyper.Request has no field or method Method)
return nil, fmt.Errorf("failed to get method: %v", err)
}

methodStr := string(method[:methodLen])

var scheme, authority, pathAndQuery [1024]byte
schemeLen, authorityLen, pathAndQueryLen := unsafe.Sizeof(scheme), unsafe.Sizeof(authority), unsafe.Sizeof(pathAndQuery)
uriResult := hyperReq.URIParts(&scheme[0], &schemeLen, &authority[0], &authorityLen, &pathAndQuery[0], &pathAndQueryLen)

Check failure on line 528 in x/net/http/request.go

View workflow job for this annotation

GitHub Actions / build

hyperReq.URIParts undefined (type *hyper.Request has no field or method URIParts)

Check failure on line 528 in x/net/http/request.go

View workflow job for this annotation

GitHub Actions / build

hyperReq.URIParts undefined (type *hyper.Request has no field or method URIParts)

Check warning on line 528 in x/net/http/request.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/request.go#L528

hyperReq.URIParts undefined (type *hyper.Request has no field or method URIParts)
if uriResult != hyper.OK {
return nil, fmt.Errorf("failed to get URI parts: %v", uriResult)
}

var schemeStr, authorityStr, pathAndQueryStr string
if schemeLen == 0 {
schemeStr = "http"
} else {
schemeStr = string(scheme[:schemeLen])
}

if authorityLen == 0 {
authorityStr = host
} else {
authorityStr = string(authority[:authorityLen])
}

if pathAndQueryLen == 0 {
return nil, fmt.Errorf("failed to get URI path and query: %v", uriResult)
} else {
pathAndQueryStr = string(pathAndQuery[:pathAndQueryLen])
}
req.Host = authorityStr
req.Method = methodStr
req.RequestURI = pathAndQueryStr

var proto string
var protoMajor, protoMinor int
version := hyperReq.Version()

Check failure on line 557 in x/net/http/request.go

View workflow job for this annotation

GitHub Actions / build

hyperReq.Version undefined (type *hyper.Request has no field or method Version)

Check failure on line 557 in x/net/http/request.go

View workflow job for this annotation

GitHub Actions / build

hyperReq.Version undefined (type *hyper.Request has no field or method Version)

Check warning on line 557 in x/net/http/request.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/request.go#L557

hyperReq.Version undefined (type *hyper.Request has no field or method Version)
switch version {
case hyper.HTTPVersion10:
proto = "HTTP/1.0"
protoMajor = 1
protoMinor = 0
case hyper.HTTPVersion11:
proto = "HTTP/1.1"
protoMajor = 1
protoMinor = 1
case hyper.HTTPVersion2:
proto = "HTTP/2.0"
protoMajor = 2
protoMinor = 0
case hyper.HTTPVersionNone:
proto = "HTTP/0.0"
protoMajor = 0
protoMinor = 0
default:
return nil, fmt.Errorf("unknown HTTP version: %d", version)
}
req.Proto = proto
req.ProtoMajor = protoMajor
req.ProtoMinor = protoMinor

urlStr := fmt.Sprintf("%s://%s%s", schemeStr, authorityStr, pathAndQueryStr)
url, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
req.URL = url

body := hyperReq.Body()
if body != nil {
taskFlag := getBodyTask

bodyStream := newBodyStream(requestNotifyHandle)
req.Body = bodyStream

taskData := taskData{
hyperBody: body,
responseBody: nil,
bodyStream: bodyStream,
taskFlag: taskFlag,
executor: executor,
}

requestNotifyHandle.SetData(c.Pointer(&taskData))
fmt.Println("[debug] async task set")

} else {
return nil, fmt.Errorf("failed to get request body")
}

//hyperReq.Free()

return &req, nil
}

func addHeader(data unsafe.Pointer, name *byte, nameLen uintptr, value *byte, valueLen uintptr) c.Int {
req := (*Request)(data)
key := string(unsafe.Slice(name, nameLen))
val := string(unsafe.Slice(value, valueLen))
values := strings.Split(val, ",")
if len(values) > 1 {
for _, v := range values {
req.Header.Add(key, strings.TrimSpace(v))
}
} else {
req.Header.Add(key, val)
}
return hyper.IterContinue
}
11 changes: 4 additions & 7 deletions x/net/http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type responseBodyRaw struct {
type taskData struct {
hyperBody *hyper.Body
responseBody *responseBodyRaw
requestBody *requestBody
bodyStream *bodyStream
executor *hyper.Executor
taskFlag taskFlag
}
Expand All @@ -44,8 +44,6 @@ const (
getBodyTask
)

var DefaultChunkSize uintptr = 8192

func newResponse(hyperChannel *hyper.ResponseChannel) *response {

Check failure on line 47 in x/net/http/response.go

View workflow job for this annotation

GitHub Actions / build

undefined: hyper.ResponseChannel

Check failure on line 47 in x/net/http/response.go

View workflow job for this annotation

GitHub Actions / build

undefined: hyper.ResponseChannel

Check warning on line 47 in x/net/http/response.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/response.go#L47

undefined: hyper.ResponseChannel
fmt.Printf("[debug] newResponse called\n")

Expand Down Expand Up @@ -136,7 +134,7 @@ func (r *response) finalize() error {
taskData := &taskData{
hyperBody: body,
responseBody: &bodyData,
requestBody: nil,
bodyStream: nil,
executor: nil,
taskFlag: setBodyTask,
}
Expand Down Expand Up @@ -194,7 +192,6 @@ func setBodyDataFunc(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf)
return hyper.PollError
}


type Response struct {
Status string // e.g. "200 OK"
StatusCode int // e.g. 200
Expand Down Expand Up @@ -234,7 +231,7 @@ func (r *Response) Cookies() []*Cookie {
return readSetCookies(r.Header)
}

func (r *Response) checkRespBody(taskData *taskData) (needContinue bool) {
func (r *Response) checkRespBody(taskData *clientTaskData) (needContinue bool) {
pc := taskData.pc
bodyWritable := r.bodyIsWritable()
hasBody := taskData.req.Method != "HEAD" && r.ContentLength != 0
Expand Down Expand Up @@ -282,7 +279,7 @@ func (r *Response) checkRespBody(taskData *taskData) (needContinue bool) {
return false
}

func (r *Response) wrapRespBody(taskData *taskData) {
func (r *Response) wrapRespBody(taskData *clientTaskData) {
body := &bodyEOFSignal{
body: r.Body,
earlyCloseFn: func() error {
Expand Down
35 changes: 20 additions & 15 deletions x/net/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ import (
// well read them)
const maxPostHandlerReadBytes = 256 << 10

// _SC_NPROCESSORS_ONLN is the number of processors on the system
const _SC_NPROCESSORS_ONLN c.Int = 58
var cpuCount int

// DefaultChunkSize is the default chunk size for reading and writing data
var DefaultChunkSize uintptr = 8192

// cpuCount is the number of processors on the system
var cpuCount int

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
Expand All @@ -44,8 +49,8 @@ type ResponseWriter interface {
}

type Server struct {
Addr string
Handler Handler
Addr string
Handler Handler
isShutdown atomic.Bool

eventLoop []*eventLoop
Expand Down Expand Up @@ -80,7 +85,7 @@ type serviceUserdata struct {
asyncHandle *libuv.Async
host [128]c.Char
port [8]c.Char
executor *hyper.Executor
executor *hyper.Executor
}

func NewServer(addr string) *Server {
Expand Down Expand Up @@ -130,7 +135,7 @@ func newEventLoop() (*eventLoop, error) {
el.uvLoop.SetData(unsafe.Pointer(el))

if r := libuv.InitTcpEx(el.uvLoop, &el.uvServer, cnet.AF_INET); r != 0 {
return nil, fmt.Errorf("failed to init TCP: %v", libuv.Strerror(libuv.Errno(r)))
return nil, fmt.Errorf("failed to init TCP: %s", c.GoString(libuv.Strerror(libuv.Errno(r))))
}

return el, nil
Expand All @@ -139,19 +144,19 @@ func newEventLoop() (*eventLoop, error) {
func (el *eventLoop) run(host string, port int) error {
var sockaddr cnet.SockaddrIn
if r := libuv.Ip4Addr(c.AllocaCStr(host), c.Int(port), &sockaddr); r != 0 {
return fmt.Errorf("failed to create IP address: %v", libuv.Strerror(libuv.Errno(r)))
return fmt.Errorf("failed to create IP address: %s", c.GoString(libuv.Strerror(libuv.Errno(r))))
}

if err := setReuseAddr(&el.uvServer); err != nil {
return fmt.Errorf("failed to set SO_REUSEADDR: %v", err)
return fmt.Errorf("failed to set SO_REUSEADDR: %s", err)
}

if r := el.uvServer.Bind((*cnet.SockAddr)(unsafe.Pointer(&sockaddr)), 0); r != 0 {
return fmt.Errorf("failed to bind: %v", libuv.Strerror(libuv.Errno(r)))
return fmt.Errorf("failed to bind: %s", c.GoString(libuv.Strerror(libuv.Errno(r))))
}

if err := (*libuv.Stream)(&el.uvServer).Listen(128, onNewConnection); err != 0 {
return fmt.Errorf("failed to listen: %v", err)
if r := (*libuv.Stream)(&el.uvServer).Listen(128, onNewConnection); r != 0 {
return fmt.Errorf("failed to listen: %s", c.GoString(libuv.Strerror(libuv.Errno(r))))
}

if r := libuv.InitIdle(el.uvLoop, &el.idleHandle); r != 0 {
Expand Down Expand Up @@ -314,7 +319,7 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) {

r := libuv.PollInit(el.uvLoop, &conn.pollHandle, libuv.OsFd(conn.stream.GetIoWatcherFd()))
if r < 0 {
fmt.Fprintf(os.Stderr, "uv_poll_init error: %s\n", libuv.Strerror(libuv.Errno(r)))
fmt.Fprintf(os.Stderr, "uv_poll_init error: %s\n", c.GoString(libuv.Strerror(libuv.Errno(r))))
(*libuv.Handle)(unsafe.Pointer(&conn.stream)).Close(nil)
return
}
Expand Down Expand Up @@ -482,8 +487,8 @@ func handleGetBodyTask(hyperTaskType hyper.TaskReturnType, task *hyper.Task, pay
handleTaskBuffer(task, payload)
case hyper.TaskEmpty:
fmt.Println("[debug] Get body task closing request body")
if payload.requestBody != nil {
payload.requestBody.Close()
if payload.bodyStream != nil {
payload.bodyStream.Close()
}
task.Free()
}
Expand Down Expand Up @@ -513,7 +518,7 @@ func handleTaskError(task *hyper.Task) {
func handleTaskBuffer(task *hyper.Task, payload *taskData) {
buf := (*hyper.Buf)(task.Value())
bytes := unsafe.Slice(buf.Bytes(), buf.Len())
payload.requestBody.readCh <- bytes
payload.bodyStream.readCh <- bytes
fmt.Printf("[debug] Task get body writing to bodyWriter: %s\n", string(bytes))
buf.Free()
task.Free()
Expand Down Expand Up @@ -669,7 +674,7 @@ func updateConnRegistrations(conn *conn) bool {
fmt.Printf("[debug] Starting poll with events: %d\n", events)
r := conn.pollHandle.Start(events, onPoll)
if r < 0 {
fmt.Fprintf(os.Stderr, "uv_poll_start error: %s\n", libuv.Strerror(libuv.Errno(r)))
fmt.Fprintf(os.Stderr, "uv_poll_start error: %s\n", c.GoString(libuv.Strerror(libuv.Errno(r))))
return false
}
return true
Expand Down
Loading

0 comments on commit 4bf50fe

Please sign in to comment.