Skip to content

Commit

Permalink
Cleanup queue unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 5, 2024
1 parent d93a714 commit ef9d9fe
Showing 1 changed file with 96 additions and 115 deletions.
211 changes: 96 additions & 115 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/redisx/assertredis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func getPool() *redis.Pool {
Expand Down Expand Up @@ -38,205 +40,184 @@ func getPool() *redis.Pool {
}

func TestLua(t *testing.T) {
assert := assert.New(t)
rp := getPool()
rc := rp.Get()
defer rc.Close()

// start our dethrottler
pool := getPool()
conn := pool.Get()
defer conn.Close()
quitter := make(chan bool)
wg := &sync.WaitGroup{}
StartDethrottler(pool, quitter, wg, "msgs")
StartDethrottler(rp, quitter, wg, "msgs")
defer close(quitter)

rate := 10

// add 20 messages with ids 0-19
for i := 0; i < 20; i++ {
err := PushOntoQueue(conn, "msgs", "chan1", rate, fmt.Sprintf(`[{"id":%d}]`, i), LowPriority)
assert.NoError(err)
err := PushOntoQueue(rc, "msgs", "chan1", rate, fmt.Sprintf(`[{"id":%d}]`, i), LowPriority)
require.NoError(t, err)
}

// get ourselves aligned with a second boundary
delay := time.Second*2 - time.Duration(time.Now().UnixNano()%int64(time.Second))
time.Sleep(delay)

conn.Do("SET", "rate_limit_bulk:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5)
// mark chan1 as rate limited
rc.Do("SET", "rate_limit_bulk:chan1", "engaged")
rc.Do("EXPIRE", "rate_limit_bulk:chan1", 5)

// we have the rate limit set,
queue, value, err := PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != "" && queue != EmptyQueue {
t.Fatal("Should be paused")
}
// popping shouldn't error or return a value
queue, value, err := PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.Equal(t, "", value)
assert.Equal(t, Retry, queue)

// When the redis paused key is remove, we get the values from bulk queue/low priority
conn.Do("DEL", "rate_limit_bulk:chan1")
// unmark chan1 as rate limited
rc.Do("DEL", "rate_limit_bulk:chan1")

// pop 10 items off
for i := 0; i < 10; i++ {
queue, value, err := PopFromQueue(conn, "msgs")
assert.NotEqual(queue, EmptyQueue)
assert.Equal(fmt.Sprintf(`{"id":%d}`, i), value)
assert.NoError(err)
queue, value, err := PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.NotEqual(t, queue, EmptyQueue)
assert.Equal(t, fmt.Sprintf(`{"id":%d}`, i), value)
}

// next value should be throttled
queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != "" && queue != EmptyQueue {
t.Fatal("Should be throttled")
}
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.Equal(t, "", value)
assert.Equal(t, Retry, queue)

// check our redis state
count, err := redis.Int(conn.Do("ZCARD", "msgs:throttled"))
assert.NoError(err)
assert.Equal(1, count, "Expected chan1 to be throttled")

count, err = redis.Int(conn.Do("ZCARD", "msgs:active"))
assert.NoError(err)
assert.Equal(0, count, "Expected chan1 to not be active")
assertredis.ZCard(t, rc, "msgs:active", 0)
assertredis.ZCard(t, rc, "msgs:throttled", 1)

// adding more items shouldn't change that
queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != "" && queue != EmptyQueue {
t.Fatal("Should be throttled")
}
err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":30}]`, LowPriority)
assert.NoError(err)
err = PushOntoQueue(rc, "msgs", "chan1", rate, `[{"id":30}]`, LowPriority)
assert.NoError(t, err)

count, err = redis.Int(conn.Do("ZCARD", "msgs:throttled"))
assert.NoError(err)
assert.Equal(1, count, "Expected chan1 to be throttled")

count, err = redis.Int(conn.Do("ZCARD", "msgs:active"))
assert.NoError(err)
assert.Equal(0, count, "Expected chan1 to not be active")
assertredis.ZCard(t, rc, "msgs:active", 0)
assertredis.ZCard(t, rc, "msgs:throttled", 1)

// but if we wait, our next msg should be our highest priority
time.Sleep(time.Second)
err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":31}]`, HighPriority)
assert.NoError(err)

err = PushOntoQueue(rc, "msgs", "chan1", rate, `[{"id":31}]`, HighPriority)
assert.NoError(t, err)

// make sure pause bulk key do not prevent use to get from the high priority queue
conn.Do("SET", "rate_limit_bulk:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5)
rc.Do("SET", "rate_limit_bulk:chan1", "engaged")
rc.Do("EXPIRE", "rate_limit_bulk:chan1", 5)

queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
assert.Equal(WorkerToken("msgs:chan1|10"), queue)
assert.Equal(`{"id":31}`, value)
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.Equal(t, WorkerToken("msgs:chan1|10"), queue)
assert.Equal(t, `{"id":31}`, value)

// make sure paused is not present for more tests
conn.Do("DEL", "rate_limit_bulk:chan1")
rc.Do("DEL", "rate_limit_bulk:chan1")

// should get next five bulk msgs fine
for i := 10; i < 15; i++ {
queue, value, err := PopFromQueue(conn, "msgs")
assert.NotEqual(queue, EmptyQueue)
assert.Equal(fmt.Sprintf(`{"id":%d}`, i), value)
assert.NoError(err)
queue, value, err := PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.NotEqual(t, queue, EmptyQueue)
assert.Equal(t, fmt.Sprintf(`{"id":%d}`, i), value)

}

// push on a compound message
err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":32}, {"id":33}]`, HighPriority)
assert.NoError(err)
// push a multi-message batch for a single contact
err = PushOntoQueue(rc, "msgs", "chan1", rate, `[{"id":32}, {"id":33}]`, HighPriority)
assert.NoError(t, err)

queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
assert.Equal(WorkerToken("msgs:chan1|10"), queue)
assert.Equal(`{"id":32}`, value)
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.Equal(t, WorkerToken("msgs:chan1|10"), queue)
assert.Equal(t, `{"id":32}`, value)

// sleep a few seconds
time.Sleep(2 * time.Second)

// pop remaining bulk off
for i := 15; i < 20; i++ {
queue, value, err := PopFromQueue(conn, "msgs")
assert.NotEqual(queue, EmptyQueue)
assert.Equal(fmt.Sprintf(`{"id":%d}`, i), value)
assert.NoError(err)
queue, value, err := PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.NotEqual(t, queue, EmptyQueue)
assert.Equal(t, fmt.Sprintf(`{"id":%d}`, i), value)
}

// next should be 30
queue, value, err = PopFromQueue(conn, "msgs")
assert.NotEqual(queue, EmptyQueue)
assert.Equal(`{"id":30}`, value)
assert.NoError(err)
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.NotEqual(t, queue, EmptyQueue)
assert.Equal(t, `{"id":30}`, value)

// popping again should give us nothing since it is too soon to send 33
queue = Retry
for queue == Retry {
queue, value, err = PopFromQueue(conn, "msgs")
queue, value, err = PopFromQueue(rc, "msgs")
}
assert.NoError(err)
assert.Equal(EmptyQueue, queue)
assert.Empty(value)
assert.NoError(t, err)
assert.Equal(t, EmptyQueue, queue)
assert.Empty(t, value)

// but if we sleep 6 seconds should get it
time.Sleep(time.Second * 6)

queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
assert.Equal(WorkerToken("msgs:chan1|10"), queue)
assert.Equal(`{"id":33}`, value)
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.Equal(t, WorkerToken("msgs:chan1|10"), queue)
assert.Equal(t, `{"id":33}`, value)

// nothing should be left
queue = Retry
for queue == Retry {
queue, value, err = PopFromQueue(conn, "msgs")
queue, value, err = PopFromQueue(rc, "msgs")
}
assert.NoError(err)
assert.Equal(EmptyQueue, queue)
assert.Empty(value)
assert.NoError(t, err)
assert.Equal(t, EmptyQueue, queue)
assert.Empty(t, value)

err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority)
assert.NoError(err)
err = PushOntoQueue(rc, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority)
assert.NoError(t, err)

conn.Do("SET", "rate_limit:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit:chan1", 5)
rc.Do("SET", "rate_limit:chan1", "engaged")
rc.Do("EXPIRE", "rate_limit:chan1", 5)

// we have the rate limit set,
queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
// we have the rate limit set
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
if value != "" && queue != EmptyQueue {
t.Fatal("Should be throttled")
}

time.Sleep(2 * time.Second)
queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != "" && queue != EmptyQueue {
t.Fatal("Should be throttled")
}

count, err = redis.Int(conn.Do("ZCARD", "msgs:throttled"))
assert.NoError(err)
assert.Equal(1, count, "Expected chan1 to be throttled")
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.Equal(t, "", value)
assert.Equal(t, Retry, queue)

count, err = redis.Int(conn.Do("ZCARD", "msgs:active"))
assert.NoError(err)
assert.Equal(0, count, "Expected chan1 to not be active")
assertredis.ZCard(t, rc, "msgs:active", 0)
assertredis.ZCard(t, rc, "msgs:throttled", 1)

// but if we wait for the rate limit to expire
time.Sleep(3 * time.Second)

// next should be 34
queue, value, err = PopFromQueue(conn, "msgs")
assert.NotEqual(queue, EmptyQueue)
assert.Equal(`{"id":34}`, value)
assert.NoError(err)
queue, value, err = PopFromQueue(rc, "msgs")
assert.NoError(t, err)
assert.NotEqual(t, queue, EmptyQueue)
assert.Equal(t, `{"id":34}`, value)

// nothing should be left
queue = Retry
for queue == Retry {
queue, value, err = PopFromQueue(conn, "msgs")
queue, value, err = PopFromQueue(rc, "msgs")
}
assert.NoError(err)
assert.Equal(EmptyQueue, queue)
assert.Empty(value)

assert.NoError(t, err)
assert.Equal(t, EmptyQueue, queue)
assert.Empty(t, value)
}

func TestThrottle(t *testing.T) {
Expand Down

0 comments on commit ef9d9fe

Please sign in to comment.