Skip to content

Commit

Permalink
WIP(x/net/http/client): Implement BodyChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
spongehah committed Sep 12, 2024
1 parent 0d8cc27 commit e55b261
Show file tree
Hide file tree
Showing 16 changed files with 893 additions and 784 deletions.
29 changes: 29 additions & 0 deletions x/net/http/_demo/chunked/chunked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"fmt"
"io"

"github.com/goplus/llgoexamples/x/net/http"
)

func main() {
resp, err := http.Get("http://localhost:8080/chunked")
if err != nil {
fmt.Println(err)
return
}
defer resp.Body.Close()
fmt.Println(resp.Status, "read bytes: ", resp.ContentLength)
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(body))
}
6 changes: 5 additions & 1 deletion x/net/http/_demo/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ func main() {
}
defer resp.Body.Close()
fmt.Println(resp.Status, "read bytes: ", resp.ContentLength)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand Down
6 changes: 5 additions & 1 deletion x/net/http/_demo/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func main() {
}
defer resp.Body.Close()
fmt.Println(resp.Status)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
println(err.Error())
Expand Down
6 changes: 5 additions & 1 deletion x/net/http/_demo/maxConnsPerHost/maxConnsPerHost.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func main() {
defer resp.Body.Close()
fmt.Println(resp.Status, "read bytes: ", resp.ContentLength)
fmt.Println(resp.Proto)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand Down
5 changes: 5 additions & 0 deletions x/net/http/_demo/post/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ func main() {
}
defer resp.Body.Close()
fmt.Println(resp.Status)
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand Down
6 changes: 5 additions & 1 deletion x/net/http/_demo/redirect/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ func main() {
defer resp.Body.Close()
fmt.Println(resp.Status, "read bytes: ", resp.ContentLength)
fmt.Println(resp.Proto)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand Down
12 changes: 10 additions & 2 deletions x/net/http/_demo/reuseConn/reuseConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ func main() {
return
}
fmt.Println(resp.Status, "read bytes: ", resp.ContentLength)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand All @@ -31,7 +35,11 @@ func main() {
return
}
fmt.Println(resp.Status, "read bytes: ", resp.ContentLength)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
body, err = io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand Down
42 changes: 42 additions & 0 deletions x/net/http/_demo/server/chunkedServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"fmt"
"net/http"
)

func chunkedHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "text/plain")

flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}

sentence := "This is a chunked encoded response. It will be sent in multiple parts. Note the delay between each section."

words := []string{}
start := 0
for i, r := range sentence {
if r == '。' || r == ',' || i == len(sentence)-1 {
words = append(words, sentence[start:i+1])
start = i + 1
}
}

for _, word := range words {
fmt.Fprintf(w, "%s", word)
flusher.Flush()
}
}

func main() {
http.HandleFunc("/chunked", chunkedHandler)
fmt.Println("Starting server on :8080")
err := http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Printf("Error starting server: %s\n", err)
}
}
8 changes: 6 additions & 2 deletions x/net/http/_demo/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func main() {
url := "http://httpbin.org/post"
//url := "http://localhost:8080"
filePath := "/Users/spongehah/go/src/llgo/x/net/http/_demo/upload/example.txt" // Replace with your file path
filePath := "/Users/spongehah/Documents/code/GOPATH/src/llgo/x/net/http/_demo/upload/example.txt" // Replace with your file path
//filePath := "/Users/spongehah/Downloads/xiaoshuo.txt" // Replace with your file path

file, err := os.Open(filePath)
Expand All @@ -36,7 +36,11 @@ func main() {
}
defer resp.Body.Close()
fmt.Println("Status:", resp.Status)
resp.PrintHeaders()
for key, values := range resp.Header {
for _, value := range values {
fmt.Printf("%s: %s\n", key, value)
}
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
Expand Down
104 changes: 104 additions & 0 deletions x/net/http/bodyChunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package http

import (
"errors"
"io"
"sync"

"github.com/goplus/llgo/c/libuv"
)

type onceError struct {
sync.Mutex
err error
}

func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err
}

func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err
}

func newBodyChunk(asyncHandle *libuv.Async) *bodyChunk {
return &bodyChunk{
readCh: make(chan []byte, 1),
done: make(chan struct{}),
asyncHandle: asyncHandle,
}
}

type bodyChunk struct {
chunk []byte
readCh chan []byte
asyncHandle *libuv.Async

once sync.Once
done chan struct{}

rerr onceError
}

var (
errClosedBodyChunk = errors.New("bodyChunk: read/write on closed body")
)

func (bc *bodyChunk) Read(p []byte) (n int, err error) {
for n < len(p) {
if len(bc.chunk) == 0 {
select {
case chunk, ok := <-bc.readCh:
if !ok {
if n > 0 {
return n, nil
}
return 0, bc.readCloseError()
}
bc.chunk = chunk
bc.asyncHandle.Send()
case <-bc.done:
if n > 0 {
return n, nil
}
return 0, io.EOF
}
}

copied := copy(p[n:], bc.chunk)
n += copied
bc.chunk = bc.chunk[copied:]
}

return n, nil
}

func (bc *bodyChunk) Close() error {
return bc.closeRead(nil)
}

func (bc *bodyChunk) readCloseError() error {
if rerr := bc.rerr.Load(); rerr != nil {
return rerr
}
return errClosedBodyChunk
}

func (bc *bodyChunk) closeRead(err error) error {
if err == nil {
err = io.EOF
}
bc.rerr.Store(err)
bc.once.Do(func() {
close(bc.done)
})
//close(bc.done)
return nil
}
3 changes: 1 addition & 2 deletions x/net/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, d
forkReq()
}

// TODO(spongehah) timeout(send)
// TODO(spongehah) tmp timeout(send)
//stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
req.timeoutch = make(chan struct{}, 1)
req.deadline = deadline
Expand Down Expand Up @@ -490,7 +490,6 @@ func knownRoundTripperImpl(rt RoundTripper, req *Request) bool {
return knownRoundTripperImpl(altRT, req)
}
return true
// TODO(spongehah) http2
//case *http2Transport, http2noDialH2RoundTripper:
// return true
}
Expand Down
Loading

0 comments on commit e55b261

Please sign in to comment.