Skip to content

Commit

Permalink
WIP(x/net/http/client): Mutiple eventLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
spongehah committed Sep 20, 2024
1 parent e55b261 commit a4e8631
Show file tree
Hide file tree
Showing 10 changed files with 525 additions and 561 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/goplus/llgoexamples
go 1.20

require (
github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b
github.com/goplus/llgo v0.9.8-0.20240919105235-c6436ea6d196
golang.org/x/net v0.28.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b h1:iC0vVA8F2DNJ9wVyHI9fP9U0nM+si3LSQJ1TtGftXyo=
github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b/go.mod h1:5Fs+08NslqofJ7xtOiIXugkurYOoQvY02ZkFNWA1uEI=
github.com/goplus/llgo v0.9.8-0.20240919105235-c6436ea6d196 h1:LckJktvgChf3x0eex+GT//JkYRj1uiT4uMLzyrg3ChU=
github.com/goplus/llgo v0.9.8-0.20240919105235-c6436ea6d196/go.mod h1:5Fs+08NslqofJ7xtOiIXugkurYOoQvY02ZkFNWA1uEI=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
Expand Down
43 changes: 43 additions & 0 deletions x/net/http/_demo/parallelRequest/parallelRequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"fmt"
"sync"

"github.com/goplus/llgoexamples/x/net/http"
)

func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get("http://www.baidu.com")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(id, ":", resp.Status)
//body, err := io.ReadAll(resp.Body)
//if err != nil {
// fmt.Println(err)
// return
//}
//fmt.Println(string(body))
resp.Body.Close()
}

func main() {
var wait sync.WaitGroup
for i := 0; i < 500; i++ {
wait.Add(1)
go worker(i, &wait)
}
wait.Wait()
fmt.Println("All done")

resp, err := http.Get("http://www.baidu.com")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(resp.Status)
resp.Body.Close()
}
84 changes: 30 additions & 54 deletions x/net/http/bodyChunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,49 @@ package http

import (
"errors"
"io"
"sync"

"github.com/goplus/llgo/c/libuv"
)

type onceError struct {
sync.Mutex
err error
}

func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err
}

func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err
}

func newBodyChunk(asyncHandle *libuv.Async) *bodyChunk {
return &bodyChunk{
readCh: make(chan []byte, 1),
done: make(chan struct{}),
asyncHandle: asyncHandle,
}
}

type bodyChunk struct {
chunk []byte
readCh chan []byte
asyncHandle *libuv.Async

once sync.Once
done chan struct{}

rerr onceError
rerr error
}

var (
errClosedBodyChunk = errors.New("bodyChunk: read/write on closed body")
)

func newBodyChunk(asyncHandle *libuv.Async) *bodyChunk {
return &bodyChunk{
readCh: make(chan []byte, 1),
done: make(chan struct{}),
asyncHandle: asyncHandle,
}
}

func (bc *bodyChunk) Read(p []byte) (n int, err error) {
select {
case <-bc.done:
err = bc.readCloseError()
return
default:
}

for n < len(p) {
if len(bc.chunk) == 0 {
bc.asyncHandle.Send()
select {
case chunk, ok := <-bc.readCh:
if !ok {
if n > 0 {
return n, nil
}
return 0, bc.readCloseError()
}
case chunk := <-bc.readCh:
bc.chunk = chunk
bc.asyncHandle.Send()
case <-bc.done:
if n > 0 {
return n, nil
}
return 0, io.EOF
err = bc.readCloseError()
return
}
}

Expand All @@ -77,28 +53,28 @@ func (bc *bodyChunk) Read(p []byte) (n int, err error) {
bc.chunk = bc.chunk[copied:]
}

return n, nil
return
}

func (bc *bodyChunk) Close() error {
return bc.closeRead(nil)
return bc.closeWithError(nil)
}

func (bc *bodyChunk) readCloseError() error {
if rerr := bc.rerr.Load(); rerr != nil {
if rerr := bc.rerr; rerr != nil {
return rerr
}
return errClosedBodyChunk
}

func (bc *bodyChunk) closeRead(err error) error {
func (bc *bodyChunk) closeWithError(err error) error {
if bc.rerr != nil {
return nil
}
if err == nil {
err = io.EOF
err = errClosedBodyChunk
}
bc.rerr.Store(err)
bc.once.Do(func() {
close(bc.done)
})
//close(bc.done)
bc.rerr = err
close(bc.done)
return nil
}
Loading

0 comments on commit a4e8631

Please sign in to comment.