Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client interface #144

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* text=auto eol=lf
46 changes: 46 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,49 @@
_*
*.out
*~

.idea
.vscode

# Build
build
dist

.DS_Store
*.[56789ao]
*.a[56789o]
*.so
*.pyc
._*
.nfs.*
[56789a].out
*~
*.orig
*.rej
*.exe
.*.swp
*.cgo*.go
*.cgo*.c
_cgo_*
_obj
_test
_testmain.go

*.exe~
*.dll
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
vendor/
.env
config.json
*.bak
.env.bak

tmp
112 changes: 69 additions & 43 deletions memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,38 @@ var (
// New returns a memcache client using the provided server(s)
// with equal weight. If a server is listed multiple times,
// it gets a proportional amount of weight.
func New(server ...string) *Client {
func New(server ...string) Client {
ss := new(ServerList)
ss.SetServers(server...)
return NewFromSelector(ss)
}

// NewFromSelector returns a new Client using the provided ServerSelector.
func NewFromSelector(ss ServerSelector) *Client {
return &Client{selector: ss}
func NewFromSelector(ss ServerSelector) Client {
return &client{selector: ss}
}

type Client interface {
FlushAll() error
Get(key string) (item *Item, err error)
Touch(key string, seconds int32) (err error)
GetMulti(keys []string) (map[string]*Item, error)
Set(item *Item) error
Add(item *Item) error
Replace(item *Item) error
CompareAndSwap(item *Item) error
Delete(key string) error
DeleteAll() error
Ping() error
Increment(key string, delta uint64) (newValue uint64, err error)
Decrement(key string, delta uint64) (newValue uint64, err error)
GetConn(addr net.Addr) (*conn, error)
OnItem(item *Item, fn func(*bufio.ReadWriter, *Item) error) error
}

// Client is a memcache client.
// It is safe for unlocked use by multiple concurrent goroutines.
type Client struct {
type client struct {
// Timeout specifies the socket read/write timeout.
// If zero, DefaultTimeout is used.
Timeout time.Duration
Expand Down Expand Up @@ -176,7 +194,7 @@ type conn struct {
nc net.Conn
rw *bufio.ReadWriter
addr net.Addr
c *Client
c *client
}

// release returns this connection back to the client's free pool
Expand All @@ -200,7 +218,7 @@ func (cn *conn) condRelease(err *error) {
}
}

func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
func (c *client) putFreeConn(addr net.Addr, cn *conn) {
c.lk.Lock()
defer c.lk.Unlock()
if c.freeconn == nil {
Expand All @@ -214,7 +232,7 @@ func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
c.freeconn[addr.String()] = append(freelist, cn)
}

func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
func (c *client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
c.lk.Lock()
defer c.lk.Unlock()
if c.freeconn == nil {
Expand All @@ -229,14 +247,14 @@ func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
return cn, true
}

func (c *Client) netTimeout() time.Duration {
func (c *client) netTimeout() time.Duration {
if c.Timeout != 0 {
return c.Timeout
}
return DefaultTimeout
}

func (c *Client) maxIdleConns() int {
func (c *client) maxIdleConns() int {
if c.MaxIdleConns > 0 {
return c.MaxIdleConns
}
Expand All @@ -254,7 +272,7 @@ func (cte *ConnectTimeoutError) Error() string {
return "memcache: connect timeout to " + cte.Addr.String()
}

func (c *Client) dial(addr net.Addr) (net.Conn, error) {
func (c *client) dial(addr net.Addr) (net.Conn, error) {
type connError struct {
cn net.Conn
err error
Expand All @@ -272,7 +290,11 @@ func (c *Client) dial(addr net.Addr) (net.Conn, error) {
return nil, err
}

func (c *Client) getConn(addr net.Addr) (*conn, error) {
func (c *client) GetConn(addr net.Addr) (*conn, error) {
return c.getConn(addr)
}

func (c *client) getConn(addr net.Addr) (*conn, error) {
cn, ok := c.getFreeConn(addr)
if ok {
cn.extendDeadline()
Expand All @@ -292,7 +314,11 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) {
return cn, nil
}

func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error {
func (c *client) OnItem(item *Item, fn func(*bufio.ReadWriter, *Item) error) error {
return c.onItem(item, fn)
}

func (c *client) onItem(item *Item, fn func(*bufio.ReadWriter, *Item) error) error {
addr, err := c.selector.PickServer(item.Key)
if err != nil {
return err
Expand All @@ -302,19 +328,19 @@ func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) e
return err
}
defer cn.condRelease(&err)
if err = fn(c, cn.rw, item); err != nil {
if err = fn(cn.rw, item); err != nil {
return err
}
return nil
}

func (c *Client) FlushAll() error {
func (c *client) FlushAll() error {
return c.selector.Each(c.flushAllFromAddr)
}

// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
func (c *Client) Get(key string) (item *Item, err error) {
func (c *client) Get(key string) (item *Item, err error) {
err = c.withKeyAddr(key, func(addr net.Addr) error {
return c.getFromAddr(addr, []string{key}, func(it *Item) { item = it })
})
Expand All @@ -329,13 +355,13 @@ func (c *Client) Get(key string) (item *Item, err error) {
// into the future at which time the item will expire. Zero means the item has
// no expiration time. ErrCacheMiss is returned if the key is not in the cache.
// The key must be at most 250 bytes in length.
func (c *Client) Touch(key string, seconds int32) (err error) {
func (c *client) Touch(key string, seconds int32) (err error) {
return c.withKeyAddr(key, func(addr net.Addr) error {
return c.touchFromAddr(addr, []string{key}, seconds)
})
}

func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) {
func (c *client) withKeyAddr(key string, fn func(net.Addr) error) (err error) {
if !legalKey(key) {
return ErrMalformedKey
}
Expand All @@ -346,7 +372,7 @@ func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) {
return fn(addr)
}

func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (err error) {
func (c *client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (err error) {
cn, err := c.getConn(addr)
if err != nil {
return err
Expand All @@ -355,13 +381,13 @@ func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (er
return fn(cn.rw)
}

func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error {
func (c *client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error {
return c.withKeyAddr(key, func(addr net.Addr) error {
return c.withAddrRw(addr, fn)
})
}

func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error {
func (c *client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
return err
Expand All @@ -377,7 +403,7 @@ func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error
}

// flushAllFromAddr send the flush_all command to the given addr
func (c *Client) flushAllFromAddr(addr net.Addr) error {
func (c *client) flushAllFromAddr(addr net.Addr) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "flush_all\r\n"); err != nil {
return err
Expand All @@ -400,7 +426,7 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error {
}

// ping sends the version command to the given addr
func (c *Client) ping(addr net.Addr) error {
func (c *client) ping(addr net.Addr) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil {
return err
Expand All @@ -423,7 +449,7 @@ func (c *Client) ping(addr net.Addr) error {
})
}

func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
func (c *client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
for _, key := range keys {
if _, err := fmt.Fprintf(rw, "touch %s %d\r\n", key, expiration); err != nil {
Expand Down Expand Up @@ -453,7 +479,7 @@ func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) e
// items may have fewer elements than the input slice, due to memcache
// cache misses. Each key must be at most 250 bytes in length.
// If no error is returned, the returned map will also be non-nil.
func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
func (c *client) GetMulti(keys []string) (map[string]*Item, error) {
var lk sync.Mutex
m := make(map[string]*Item)
addItemToMap := func(it *Item) {
Expand Down Expand Up @@ -538,31 +564,31 @@ func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
}

// Set writes the given item, unconditionally.
func (c *Client) Set(item *Item) error {
return c.onItem(item, (*Client).set)
func (c *client) Set(item *Item) error {
return c.onItem(item, c.set)
}

func (c *Client) set(rw *bufio.ReadWriter, item *Item) error {
func (c *client) set(rw *bufio.ReadWriter, item *Item) error {
return c.populateOne(rw, "set", item)
}

// Add writes the given item, if no value already exists for its
// key. ErrNotStored is returned if that condition is not met.
func (c *Client) Add(item *Item) error {
return c.onItem(item, (*Client).add)
func (c *client) Add(item *Item) error {
return c.onItem(item, c.add)
}

func (c *Client) add(rw *bufio.ReadWriter, item *Item) error {
func (c *client) add(rw *bufio.ReadWriter, item *Item) error {
return c.populateOne(rw, "add", item)
}

// Replace writes the given item, but only if the server *does*
// already hold data for this key
func (c *Client) Replace(item *Item) error {
return c.onItem(item, (*Client).replace)
func (c *client) Replace(item *Item) error {
return c.onItem(item, c.replace)
}

func (c *Client) replace(rw *bufio.ReadWriter, item *Item) error {
func (c *client) replace(rw *bufio.ReadWriter, item *Item) error {
return c.populateOne(rw, "replace", item)
}

Expand All @@ -573,15 +599,15 @@ func (c *Client) replace(rw *bufio.ReadWriter, item *Item) error {
// is returned if the value was modified in between the
// calls. ErrNotStored is returned if the value was evicted in between
// the calls.
func (c *Client) CompareAndSwap(item *Item) error {
return c.onItem(item, (*Client).cas)
func (c *client) CompareAndSwap(item *Item) error {
return c.onItem(item, c.cas)
}

func (c *Client) cas(rw *bufio.ReadWriter, item *Item) error {
func (c *client) cas(rw *bufio.ReadWriter, item *Item) error {
return c.populateOne(rw, "cas", item)
}

func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error {
func (c *client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error {
if !legalKey(item.Key) {
return ErrMalformedKey
}
Expand Down Expand Up @@ -656,22 +682,22 @@ func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...in

// Delete deletes the item with the provided key. The error ErrCacheMiss is
// returned if the item didn't already exist in the cache.
func (c *Client) Delete(key string) error {
func (c *client) Delete(key string) error {
return c.withKeyRw(key, func(rw *bufio.ReadWriter) error {
return writeExpectf(rw, resultDeleted, "delete %s\r\n", key)
})
}

// DeleteAll deletes all items in the cache.
func (c *Client) DeleteAll() error {
func (c *client) DeleteAll() error {
return c.withKeyRw("", func(rw *bufio.ReadWriter) error {
return writeExpectf(rw, resultDeleted, "flush_all\r\n")
})
}

// Ping checks all instances if they are alive. Returns error if any
// of them is down.
func (c *Client) Ping() error {
func (c *client) Ping() error {
return c.selector.Each(c.ping)
}

Expand All @@ -680,7 +706,7 @@ func (c *Client) Ping() error {
// didn't exist in memcached the error is ErrCacheMiss. The value in
// memcached must be an decimal number, or an error will be returned.
// On 64-bit overflow, the new value wraps around.
func (c *Client) Increment(key string, delta uint64) (newValue uint64, err error) {
func (c *client) Increment(key string, delta uint64) (newValue uint64, err error) {
return c.incrDecr("incr", key, delta)
}

Expand All @@ -690,11 +716,11 @@ func (c *Client) Increment(key string, delta uint64) (newValue uint64, err error
// memcached must be an decimal number, or an error will be returned.
// On underflow, the new value is capped at zero and does not wrap
// around.
func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error) {
func (c *client) Decrement(key string, delta uint64) (newValue uint64, err error) {
return c.incrDecr("decr", key, delta)
}

func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
func (c *client) incrDecr(verb, key string, delta uint64) (uint64, error) {
var val uint64
err := c.withKeyRw(key, func(rw *bufio.ReadWriter) error {
line, err := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta)
Expand Down
Loading