Skip to content

Commit

Permalink
Merge pull request #124 from adjust/drain
Browse files Browse the repository at this point in the history
Add queue.Drain and redisClient.RPop method
  • Loading branch information
psampaz authored Sep 1, 2022
2 parents 09ba460 + 795b0d4 commit 4630247
Show file tree
Hide file tree
Showing 17 changed files with 99 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Let's take a look at how to use rmq.
Of course you need to import rmq wherever you want to use it.

```go
import "github.com/adjust/rmq/v4"
import "github.com/adjust/rmq/v5"
```

### Connection
Expand Down
2 changes: 1 addition & 1 deletion example/batch_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"syscall"
"time"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion example/cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"
"time"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"syscall"
"time"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion example/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"net/http"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"time"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion example/purger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"log"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/returner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"
"math"

"github.com/adjust/rmq/v4"
"github.com/adjust/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/adjust/rmq/v4
module github.com/adjust/rmq/v5

go 1.13

Expand Down
11 changes: 0 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-redis/redis/v8 v8.3.2 h1:1bJscgN2yGtKLW6MsTRosa2LHyeq94j0hnNAgRZzj/M=
github.com/go-redis/redis/v8 v8.3.2/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeYhKWrBejTU=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
Expand All @@ -24,7 +22,6 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand All @@ -50,28 +47,23 @@ go.opentelemetry.io/otel v0.13.0 h1:2isEnyzjjJZq6r2EKMsFj4TxiQiexsM04AVhwbR/oBA=
go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e h1:N7DeIrjYszNmSW409R3frPPwglRwMkXSBzwVbkOjLLA=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -85,15 +77,12 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
21 changes: 21 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Queue interface {
ReturnUnacked(max int64) (int64, error)
ReturnRejected(max int64) (int64, error)
Destroy() (readyCount, rejectedCount int64, err error)
Drain(count int64) ([]string, error)

// internals
// used in cleaner
Expand Down Expand Up @@ -459,6 +460,26 @@ func (queue *redisQueue) move(from, to string, max int64) (n int64, error error)
return n, nil
}

// Drain removes and returns 'count' elements from the queue. In case of an error,
// Drain return all elements removed until the error occurred and the error itself.
func (queue *redisQueue) Drain(count int64) ([]string, error) {
var (
n int64
err error
)
out := make([]string, 0, count)

for n = 0; n < count; n++ {
val, err := queue.redisClient.RPop(queue.readyKey)
if err != nil {
return out, err
}
out = append(out, val)
}

return out, err
}

// Destroy purges and removes the queue from the list of queues
func (queue *redisQueue) Destroy() (readyCount, rejectedCount int64, err error) {
readyCount, err = queue.PurgeReady()
Expand Down
22 changes: 22 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,25 @@ func Test_jitteredDuration(t *testing.T) {
assert.GreaterOrEqual(t, int64(110*time.Millisecond), int64(d))
}
}

func TestQueueDrain(t *testing.T) {
connection, err := OpenConnection("drain-connection", "tcp", "localhost:6379", 1, nil)
assert.NoError(t, err)
require.NotNil(t, connection)

queue, err := connection.OpenQueue("drain-queue")
assert.NoError(t, err)

for x := 0; x < 100; x++ {
queue.Publish(fmt.Sprintf("%d", x))
}

eventuallyReady(t, queue, 100)

for x := 1; x <= 10; x++ {
values, err := queue.Drain(10)
assert.NoError(t, err)
assert.Equal(t, 10, len(values))
eventuallyReady(t, queue, int64(100-x*10))
}
}
1 change: 1 addition & 0 deletions redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type RedisClient interface {
LRem(key string, count int64, value string) (affected int64, err error)
LTrim(key string, start, stop int64) error
RPopLPush(source, destination string) (value string, err error)
RPop(key string) (value string, err error)

// sets
SAdd(key, value string) (total int64, err error)
Expand Down
4 changes: 4 additions & 0 deletions redis_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (wrapper RedisWrapper) LTrim(key string, start, stop int64) error {
return wrapper.rawClient.LTrim(unusedContext, key, int64(start), int64(stop)).Err()
}

func (wrapper RedisWrapper) RPop(key string) (value string, err error) {
return wrapper.rawClient.RPop(unusedContext, key).Result()
}

func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error) {
value, err = wrapper.rawClient.RPopLPush(unusedContext, source, destination).Result()
// println("RPopLPush", source, destination, value, err)
Expand Down
1 change: 1 addition & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported
func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) PurgeRejected() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) Destroy() (int64, int64, error) { panic(errorNotSupported) }
func (*TestQueue) Drain(count int64) ([]string, error) { panic(errorNotSupported) }
func (*TestQueue) closeInStaleConnection() error { panic(errorNotSupported) }
func (*TestQueue) readyCount() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) unackedCount() (int64, error) { panic(errorNotSupported) }
Expand Down
25 changes: 24 additions & 1 deletion test_redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type TestRedisClient struct {
store sync.Map
ttl sync.Map
mx sync.Mutex
mx sync.Mutex
}

// NewTestRedisClient returns a NewTestRedisClient
Expand Down Expand Up @@ -130,6 +130,29 @@ func (client *TestRedisClient) LPush(key string, values ...string) (total int64,
return int64(len(newList)), nil
}

// RPop removes and returns one value from the tail of the list stored at key.
// When key holds a value that is not a list, an error is returned.
func (client *TestRedisClient) RPop(key string) (value string, err error) {

client.mx.Lock()
defer client.mx.Unlock()

list, err := client.findList(key)
// not a list
if err != nil {
return "", ErrorNotFound
}
// list is empty
if len(list) == 0 {
return "", ErrorNotFound
}

// Remove the last element of source (tail)
client.storeList(key, list[0:len(list)-1])

return list[len(list)-1], nil
}

// LLen returns the length of the list stored at key.
// If key does not exist, it is interpreted as an empty list and 0 is returned.
// An error is returned when the value stored at key is not a list.
Expand Down
17 changes: 17 additions & 0 deletions test_redis_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,20 @@ func TestTestRedisClient_LPush_Len(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(6), total)
}

func TestTestRedisClient_RPop(t *testing.T) {
client := NewTestRedisClient()
key := "list-key"

total, err := client.LPush(key, "1", "2", "3")
assert.NoError(t, err)
assert.Equal(t, int64(3), total)

value, err := client.RPop(key)
assert.NoError(t, err)
assert.Equal(t, "3", value)

total, err = client.LLen(key)
assert.NoError(t, err)
assert.Equal(t, int64(2), total)
}

0 comments on commit 4630247

Please sign in to comment.