Skip to content

Commit

Permalink
Add Queue.SendAndReturnID method
Browse files Browse the repository at this point in the history
The id is useful to interact with the message without receiving it first.

See #42
  • Loading branch information
markuswustenberg committed May 24, 2024
1 parent 5e48a13 commit 118fd39
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
26 changes: 22 additions & 4 deletions goqite.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,35 @@ func (q *Queue) Send(ctx context.Context, m Message) error {

// SendTx is like Send, but within an existing transaction.
func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error {
_, err := q.SendAndReturnIDTx(ctx, tx, m)
return err
}

// SendAndReturnID is like Send, but also returns the message ID.
func (q *Queue) SendAndReturnID(ctx context.Context, m Message) (ID, error) {
var id ID
err := internalsql.InTx(q.db, func(tx *sql.Tx) error {
var err error
id, err = q.SendAndReturnIDTx(ctx, tx, m)
return err
})
return id, err
}

// SendAndReturnIDTx is like SendAndReturnID, but within an existing transaction.
func (q *Queue) SendAndReturnIDTx(ctx context.Context, tx *sql.Tx, m Message) (ID, error) {
if m.Delay < 0 {
panic("delay cannot be negative")
}

timeout := time.Now().Add(m.Delay).Format(rfc3339Milli)

_, err := tx.ExecContext(ctx, `insert into goqite (queue, body, timeout) values (?, ?, ?)`, q.name, m.Body, timeout)
if err != nil {
return err
var id ID
query := `insert into goqite (queue, body, timeout) values (?, ?, ?) returning id`
if err := tx.QueryRowContext(ctx, query, q.name, m.Body, timeout).Scan(&id); err != nil {
return "", err

Check warning on line 117 in goqite.go

View check run for this annotation

Codecov / codecov/patch

goqite.go#L117

Added line #L117 was not covered by tests
}
return nil
return id, nil
}

// Receive a Message from the queue, or nil if there is none.
Expand Down
17 changes: 17 additions & 0 deletions goqite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,23 @@ func TestQueue_Receive(t *testing.T) {
})
}

func TestQueue_SendAndReturnID(t *testing.T) {
t.Run("returns the message ID", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{}, ":memory:")

m := goqite.Message{
Body: []byte("yo"),
}

id, err := q.SendAndReturnID(context.Background(), m)
is.NotError(t, err)
is.Equal(t, 34, len(id))

err = q.Delete(context.Background(), id)
is.NotError(t, err)
})
}

func TestQueue_Extend(t *testing.T) {
t.Run("does not receive a message that has had the timeout extended", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{Timeout: time.Millisecond}, ":memory:")
Expand Down

0 comments on commit 118fd39

Please sign in to comment.