Skip to content

Commit

Permalink
feat: support rdb persistence (#42)
Browse files Browse the repository at this point in the history
* feat: abstract encoder for all data structures

* feat: support rdb persistance

* feat: support save command

---------

Co-authored-by: guangzhixu <[email protected]>
  • Loading branch information
xgzlucario and guangzhixu authored Nov 8, 2024
1 parent 4971cae commit d392d29
Show file tree
Hide file tree
Showing 41 changed files with 1,377 additions and 1,187 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ coverage.*
*.aof
.idea/
redis
rotom
rotom
bench/
*.rdb
12 changes: 5 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
run:
go run .

run-gc:
GODEBUG=gctrace=1 go run .

test-cover:
rm -f *.aof
go test ./... -race -coverprofile=coverage.txt -covermode=atomic
make clean
go test . -race -coverprofile=coverage.txt -covermode=atomic
go tool cover -html=coverage.txt -o coverage.html
rm coverage.txt
rm -f *.aof

fuzz-test:
go test -fuzz=FuzzRESPReader
Expand All @@ -20,6 +15,9 @@ pprof:
heap:
go tool pprof http://192.168.1.6:6060/debug/pprof/heap

clean:
rm -f coverage.* *.aof *.rdb

bench:
go test -bench . -benchmem

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This is rotom, a high performance, low latency tiny Redis Server written in Go.
1. Implements the AeLoop single-threaded event loop from Redis using the epoll network model.
2. Compatible with the Redis RESP protocol, allowing any Redis client to connect to rotom.
3. Implements data structures such as dict, list, map, zipmap, set, zipset, and zset.
4. Supports AOF.
4. Supports RDB & AOF.
5. Supports 20+ commonly used commands.
6. Supports execute lua scripts.

Expand Down Expand Up @@ -109,7 +109,7 @@ Alternatively, you can run it in a container. First, build the Docker image by r

```
REPOSITORY TAG IMAGE ID CREATED SIZE
rotom latest 22f42ce9ae0e 8 seconds ago 20.5MB
rotom latest e93cf2060e5f 13 seconds ago 40.2MB
```

Then, start the container:
Expand Down
4 changes: 2 additions & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
1. 基于 epoll 网络模型,还原了 Redis 中的 AeLoop 单线程事件循环
2. 兼容 Redis RESP 协议,你可以使用任何 redis 客户端连接 rotom
3. 实现了 dict, list, map, zipmap, set, zipset, zset 数据结构
4. AOF 支持
4. RDB 和 AOF 持久化支持
5. 支持 20 多种常用命令
6. 支持执行 lua 脚本

Expand Down Expand Up @@ -109,7 +109,7 @@ $ go run .

```
REPOSITORY TAG IMAGE ID CREATED SIZE
rotom latest 22f42ce9ae0e 8 seconds ago 20.5MB
rotom latest e93cf2060e5f 13 seconds ago 40.2MB
```

然后启动容器:
Expand Down
14 changes: 6 additions & 8 deletions ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package main

import (
"errors"
"github.com/xgzlucario/rotom/internal/dict"
"golang.org/x/sys/unix"
"time"
)

type TeType int

const (
AE_NORMAL TeType = iota + 1
AE_ONCE
AeNormal TeType = iota + 1
AeOnce
)

type FileProc func(loop *AeLoop, fd int, extra interface{})
Expand All @@ -37,7 +37,6 @@ type AeLoop struct {
TimeEvents *AeTimeEvent
fileEventFd int
timeEventNextId int
stop bool

events []*AeFileEvent // file events cache
}
Expand Down Expand Up @@ -95,7 +94,7 @@ func (loop *AeLoop) ModDetach(fd int) {
}

func GetMsTime() int64 {
return dict.GetNanoTime() / 1e6
return time.Now().UnixMilli()
}

func (loop *AeLoop) AddTimeEvent(mask TeType, interval int64, proc TimeProc, extra interface{}) int {
Expand Down Expand Up @@ -141,7 +140,6 @@ func AeLoopCreate() (*AeLoop, error) {
FileEvents: make(map[int]*AeFileEvent),
fileEventFd: epollFd,
timeEventNextId: 1,
stop: false,
events: make([]*AeFileEvent, 128), // pre alloc
}, nil
}
Expand Down Expand Up @@ -206,7 +204,7 @@ retry:
func (loop *AeLoop) AeProcess(tes []*AeTimeEvent, fes []*AeFileEvent) {
for _, te := range tes {
te.proc(loop, te.id, te.extra)
if te.mask == AE_ONCE {
if te.mask == AeOnce {
loop.RemoveTimeEvent(te.id)
} else {
te.when = GetMsTime() + te.interval
Expand All @@ -218,7 +216,7 @@ func (loop *AeLoop) AeProcess(tes []*AeTimeEvent, fes []*AeFileEvent) {
}

func (loop *AeLoop) AeMain() {
for !loop.stop {
for {
loop.AeProcess(loop.AeWait())
}
}
147 changes: 73 additions & 74 deletions aof.go
Original file line number Diff line number Diff line change
@@ -1,74 +1,73 @@
package main

import (
"bytes"
"io"
"os"

"github.com/tidwall/mmap"
)

const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
)

// Aof manages an append-only file system for storing data.
type Aof struct {
filePath string
file *os.File
buf *bytes.Buffer
}

func NewAof(path string) (*Aof, error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
return &Aof{
file: fd,
filePath: path,
buf: bytes.NewBuffer(make([]byte, 0, KB)),
}, nil
}

func (aof *Aof) Close() error {
return aof.file.Close()
}

func (aof *Aof) Write(buf []byte) (int, error) {
return aof.buf.Write(buf)
}

func (aof *Aof) Flush() error {
aof.buf.WriteTo(aof.file)
return aof.file.Sync()
}

func (aof *Aof) Read(fn func(args []RESP)) error {
// Read file data by mmap.
data, err := mmap.Open(aof.filePath, false)
if len(data) == 0 {
return nil
}
if err != nil {
return err
}

// Iterate over the records in the file, applying the function to each.
reader := NewReader(data)
argsBuf := make([]RESP, 8)
for {
args, _, err := reader.ReadNextCommand(argsBuf)
if err != nil {
if err == io.EOF {
break
}
return err
}
fn(args)
}

return nil
}
package main

import (
"bytes"
"github.com/xgzlucario/rotom/internal/resp"
"io"
"os"

"github.com/tidwall/mmap"
)

const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
)

// Aof manages an append-only file system for storing data.
type Aof struct {
file *os.File
buf *bytes.Buffer
}

func NewAof(path string) (*Aof, error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
return &Aof{
file: fd,
buf: bytes.NewBuffer(make([]byte, 0, KB)),
}, nil
}

func (a *Aof) Close() error {
return a.file.Close()
}

func (a *Aof) Write(buf []byte) (int, error) {
return a.buf.Write(buf)
}

func (a *Aof) Flush() error {
_, _ = a.buf.WriteTo(a.file)
return a.file.Sync()
}

func (a *Aof) Read(fn func(args []resp.RESP)) error {
// Read file data by mmap.
data, err := mmap.MapFile(a.file, false)
if len(data) == 0 {
return nil
}
if err != nil {
return err
}

// Iterate over the records in the file, applying the function to each.
reader := resp.NewReader(data)
argsBuf := make([]resp.RESP, 8)
for {
args, _, err := reader.ReadNextCommand(argsBuf)
if err != nil {
if err == io.EOF {
break
}
return err
}
fn(args)
}

return nil
}
19 changes: 12 additions & 7 deletions aof_test.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,55 @@
package main

import (
"github.com/xgzlucario/rotom/internal/resp"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAof(t *testing.T) {
ast := assert.New(t)
setCommand := []byte("*3\r\n$3\r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n")
cmdStr := []byte("*3\r\n$3\r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n")

t.Run("write", func(t *testing.T) {
aof, err := NewAof("test.aof")
ast.Nil(err)
defer aof.Close()

_ = aof.Flush()
_, _ = aof.Write(setCommand)
_, _ = aof.Write(cmdStr)
_ = aof.Flush()
})

t.Run("read", func(t *testing.T) {
aof, err := NewAof("test.aof")
ast.Nil(err)

_ = aof.Read(func(args []RESP) {
_ = aof.Read(func(args []resp.RESP) {
// SET foo bar
ast.Equal(len(args), 3)
ast.Equal(args[0].ToString(), "set")
ast.Equal(args[1].ToString(), "foo")
ast.Equal(args[2].ToString(), "bar")
})

defer aof.Close()
})

t.Run("read-err-content", func(t *testing.T) {
aof, _ := NewAof("LICENSE")
err := aof.Read(func(args []resp.RESP) {})
ast.NotNil(err)
})

t.Run("empty-aof", func(t *testing.T) {
aof, _ := NewAof("not-exist.aof")
defer aof.Close()

_ = aof.Read(func(args []RESP) {
_ = aof.Read(func(args []resp.RESP) {
panic("should not call")
})
})

t.Run("read-wrong-file", func(t *testing.T) {
t.Run("read-err-fileType", func(t *testing.T) {
_, err := NewAof("internal")
ast.NotNil(err)
})
Expand Down
Loading

0 comments on commit d392d29

Please sign in to comment.