Skip to content

Commit

Permalink
opt: aof file write raw request buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Jun 3, 2024
1 parent 6aaa9b6 commit d7ec096
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 46 deletions.
6 changes: 3 additions & 3 deletions aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func (aof *Aof) Close() error {
return aof.file.Close()
}

func (aof *Aof) Write(value Value) error {
func (aof *Aof) Write(buf []byte) error {
aof.mu.Lock()
defer aof.mu.Unlock()
_, err := aof.buf.Write(value.Marshal())
_, err := aof.buf.Write(buf)
aof.mu.Unlock()
return err
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980/go.mod h1:zwEumjdcK
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731 h1:frRQxMZFCPWfoiWau4bPcYmNDGNVPLqM9nqnsp6Uakg=
github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731/go.mod h1:sPwGPAuvd9WdiONTmusXGNocqcY5L/J7+st1upAMlX8=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b h1:C/+nN/kFJ6yrmEhIu+5Ra2jx/W8w+Ayu8pTiZfuU5Xc=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1ZgyZNk91XIllYdOPpwP+9L2RCw6QGSy6alTYF+Z0iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc h1:O9NuF4s+E/PvMIy+9IUZB9znFwUIXEWSstNjek6VpVg=
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
Expand Down
8 changes: 3 additions & 5 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (
func startup() {
config := &Config{Port: 20082}
if err := InitDB(config); err != nil {
log.Panicf("init db error: %v\n", err)
log.Panic("init db error:", err)
}
setLimit()
debug()
server.config = config
server.RunServe()
}
Expand All @@ -28,13 +26,13 @@ func TestHandler(t *testing.T) {
assert := assert.New(t)

go startup()
time.Sleep(time.Second / 10)
time.Sleep(time.Second / 2)

// wait for client starup
rdb := redis.NewClient(&redis.Options{
Addr: ":20082",
})
time.Sleep(time.Second / 10)
time.Sleep(time.Second / 2)

t.Run("ping", func(t *testing.T) {
res, _ := rdb.Ping(ctx).Result()
Expand Down
15 changes: 0 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"log"
"net/http"
_ "net/http/pprof"
"syscall"
)

func debug() {
Expand All @@ -20,21 +19,7 @@ func main() {
if err = InitDB(config); err != nil {
log.Panicf("init db error: %v\n", err)
}
setLimit()
debug()
server.config = config
server.RunServe()
}

func setLimit() {
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
panic(err)
}
rLimit.Cur = rLimit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
panic(err)
}

log.Printf("set cur fd limit: %d", rLimit.Cur)
}
35 changes: 15 additions & 20 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"fmt"
"io"
"log"
"net"
"os"
Expand Down Expand Up @@ -31,7 +32,7 @@ type DB struct {
type Server struct {
config *Config
epoller *epoll
workerpool *pool.Pool
workerPool *pool.Pool
}

type Command struct {
Expand Down Expand Up @@ -66,12 +67,6 @@ func lookupCommand(command string) (*Command, error) {
return nil, fmt.Errorf("invalid command: %s", command)
}

func (cmd *Command) writeAofFile(aof *Aof, args []Value) {
if cmd.aofNeed {
aof.Write(newArrayValue(args))
}
}

func (cmd *Command) processCommand(args []Value) Value {
// Check command arguments.
if len(args) < cmd.arity {
Expand Down Expand Up @@ -128,11 +123,12 @@ func (server *Server) RunServe() {
}
server.epoller = epoller

// Start goroutine workerpool.
server.workerpool = pool.New().WithMaxGoroutines(runtime.NumCPU())
// Start goroutine workerPool.
server.workerPool = pool.New().WithMaxGoroutines(runtime.NumCPU())

go func() {
var buf = make([]byte, 512)

for {
connections, err := epoller.Wait()
if err != nil {
Expand All @@ -144,18 +140,14 @@ func (server *Server) RunServe() {
if conn == nil {
break
}

if n, err := conn.Read(buf); err != nil {
if err := epoller.Remove(conn); err != nil {
log.Println("failed to remove:", err)
}
conn.Close()

} else {
// bench test
// _ = n
// pool.Go(func() {
// conn.Write(ValueOK.Marshal())
// })
server.handleConnection(buf[:n], conn)
}
}
Expand All @@ -179,6 +171,9 @@ func (server *Server) handleConnection(buf []byte, conn net.Conn) {
for {
value, err := resp.Read()
if err != nil {
if err != io.EOF {
log.Println("read resp error:", err)
}
return
}

Expand All @@ -196,17 +191,17 @@ func (server *Server) handleConnection(buf []byte, conn net.Conn) {
res = newErrValue(err)

} else {
// Write aof file if needed.
if server.config.AppendOnly {
cmd.writeAofFile(db.aof, value.array)
}

// Process command.
res = cmd.processCommand(value.array[1:])

// Write aof file after proccess success.
if server.config.AppendOnly && cmd.aofNeed && res.typ != ERROR {
db.aof.Write(buf)
}
}

// Async write result.
server.workerpool.Go(func() {
server.workerPool.Go(func() {
if _, err = conn.Write(res.Marshal()); err != nil {
log.Println("write reply error:", err)
}
Expand Down

0 comments on commit d7ec096

Please sign in to comment.