Skip to content

Commit

Permalink
Add jobs.Runner (#38)
Browse files Browse the repository at this point in the history
This adds a new package `jobs` which provides an abstraction to run
background jobs, backed by the queue.
  • Loading branch information
markuswustenberg authored May 3, 2024
2 parents 9abc8f8 + b907807 commit 78755b2
Show file tree
Hide file tree
Showing 10 changed files with 676 additions and 55 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Made in 🇩🇰 by [maragu](https://www.maragu.dk/), maker of [online Go course
- Messages are sent to and received from the queue, and are guaranteed to not be redelivered before a timeout occurs.
- Support for multiple queues in one table.
- Message timeouts can be extended, to support e.g. long-running tasks.
- A job runner abstraction is provided on top of the queue, for your background tasks.
- A simple HTTP handler is provided for your convenience.
- No non-test dependencies. Bring your own SQLite driver.

Expand Down Expand Up @@ -45,6 +46,8 @@ func main() {
if err != nil {
log.Fatalln(err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

if err := goqite.Setup(context.Background(), db); err != nil {
log.Fatalln(err)
Expand Down
62 changes: 62 additions & 0 deletions docs/examples/jobs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"

_ "github.com/mattn/go-sqlite3"

"github.com/maragudk/goqite"
"github.com/maragudk/goqite/jobs"
)

func main() {
log := slog.Default()

// Setup the db and goqite schema.
db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
if err != nil {
log.Info("Error opening db", "error", err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

if err := goqite.Setup(context.Background(), db); err != nil {
log.Info("Error in setup", "error", err)
}

// Make a new queue for the jobs. You can have as many of these as you like, just name them differently.
q := goqite.New(goqite.NewOpts{
DB: db,
Name: "jobs",
})

// Make a job runner with a job limit of 1 and a short message poll interval.
r := jobs.NewRunner(jobs.NewRunnerOpts{
Limit: 1,
Log: slog.Default(),
PollInterval: 10 * time.Millisecond,
Queue: q,
})

// Register our "print" job.
r.Register("print", func(ctx context.Context, m []byte) error {
fmt.Println(string(m))
return nil
})

// Create a "print" job with a message.
if err := jobs.Create(context.Background(), q, "print", []byte("Yo")); err != nil {
log.Info("Error creating job", "error", err)
}

// Stop the job runner after a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
defer cancel()

// Start the job runner and see the job run.
r.Start(ctx)
}
2 changes: 2 additions & 0 deletions docs/example.go → docs/examples/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func main() {
if err != nil {
log.Fatalln(err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

if err := goqite.Setup(context.Background(), db); err != nil {
log.Fatalln(err)
Expand Down
74 changes: 72 additions & 2 deletions docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ <h1>goqite</h1>
<pre><code class="language-bash">$ go get github.com/maragudk/goqite</code></pre>
<p><a href="https://github.com/maragudk/goqite">See goqite on Github</a></p>

<h2>Example</h2>
<h2>Examples</h2>

<h3>Queue</h3>

<pre><code class="language-go">package main

Expand All @@ -49,6 +51,8 @@ <h2>Example</h2>
if err != nil {
log.Fatalln(err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

if err := goqite.Setup(context.Background(), db); err != nil {
log.Fatalln(err)
Expand Down Expand Up @@ -92,11 +96,77 @@ <h2>Example</h2>
log.Fatalln(err)
}
}
</code></pre>

<h3>Jobs</h3>

<pre><code class="language-go">package main

import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"

_ "github.com/mattn/go-sqlite3"

"github.com/maragudk/goqite"
"github.com/maragudk/goqite/jobs"
)

func main() {
log := slog.Default()

// Setup the db and goqite schema.
db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
if err != nil {
log.Info("Error opening db", "error", err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

if err := goqite.Setup(context.Background(), db); err != nil {
log.Info("Error in setup", "error", err)
}

// Make a new queue for the jobs. You can have as many of these as you like, just name them differently.
q := goqite.New(goqite.NewOpts{
DB: db,
Name: "jobs",
})

// Make a job runner with a job limit of 1 and a short message poll interval.
r := jobs.NewRunner(jobs.NewRunnerOpts{
Limit: 1,
Log: slog.Default(),
PollInterval: 10 * time.Millisecond,
Queue: q,
})

// Register our "print" job.
r.Register("print", func(ctx context.Context, m []byte) error {
fmt.Println(string(m))
return nil
})

// Create a "print" job with a message.
if err := jobs.Create(context.Background(), q, "print", []byte("Yo")); err != nil {
log.Info("Error creating job", "error", err)
}

// Stop the job runner after a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
defer cancel()

// Start the job runner and see the job run.
r.Start(ctx)
}
</code></pre>
</div>
</div>

<footer class="prose prose-sm prose-purple"><p>made in 🇩🇰 by <a href="https://www.maragu.dk">maragu</a>, maker of <a href="https://www.golang.dk">online Go courses</a></p></footer></div>
<footer class="prose prose-sm prose-purple mt-8"><p>made in 🇩🇰 by <a href="https://www.maragu.dk">maragu</a>, maker of <a href="https://www.golang.dk">online Go courses</a></p></footer></div>
</div>
</body>
</html>
61 changes: 8 additions & 53 deletions goqite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"database/sql"
_ "embed"
"errors"
"fmt"
"time"

internalsql "github.com/maragudk/goqite/internal/sql"
)

//go:embed schema.sql
Expand All @@ -18,13 +19,8 @@ var schema string
// zeros removed.
const rfc3339Milli = "2006-01-02T15:04:05.000Z07:00"

type logger interface {
Println(v ...any)
}

type NewOpts struct {
DB *sql.DB
Log logger
MaxReceive int // Max receive count for messages before they cannot be received anymore.
Name string
Timeout time.Duration // Default timeout for messages before they can be re-received.
Expand All @@ -44,10 +40,6 @@ func New(opts NewOpts) *Queue {
panic("name cannot be empty")
}

if opts.Log == nil {
opts.Log = &discardLogger{}
}

if opts.MaxReceive < 0 {
panic("max receive cannot be negative")
}
Expand All @@ -67,15 +59,13 @@ func New(opts NewOpts) *Queue {
return &Queue{
db: opts.DB,
name: opts.Name,
log: opts.Log,
maxReceive: opts.MaxReceive,
timeout: opts.Timeout,
}
}

type Queue struct {
db *sql.DB
log logger
maxReceive int
name string
timeout time.Duration
Expand All @@ -91,7 +81,7 @@ type Message struct {

// Send a Message to the queue with an optional delay.
func (q *Queue) Send(ctx context.Context, m Message) error {
return q.inTx(func(tx *sql.Tx) error {
return internalsql.InTx(q.db, func(tx *sql.Tx) error {
return q.SendTx(ctx, tx, m)
})
}
Expand All @@ -114,7 +104,7 @@ func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error {
// Receive a Message from the queue, or nil if there is none.
func (q *Queue) Receive(ctx context.Context) (*Message, error) {
var m *Message
err := q.inTx(func(tx *sql.Tx) error {
err := internalsql.InTx(q.db, func(tx *sql.Tx) error {
var err error
m, err = q.ReceiveTx(ctx, tx)
return err
Expand Down Expand Up @@ -154,8 +144,8 @@ func (q *Queue) ReceiveTx(ctx context.Context, tx *sql.Tx) (*Message, error) {
return &m, nil
}

// ReceiveAndWait for a Message from the queue or the context is cancelled.
// If the context is cancelled, the error will be non-nil. See context.Context.Err.
// ReceiveAndWait for a Message from the queue, polling at the given interval, until the context is cancelled.
// If the context is cancelled, the error will be non-nil. See [context.Context.Err].
func (q *Queue) ReceiveAndWait(ctx context.Context, interval time.Duration) (*Message, error) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand All @@ -178,7 +168,7 @@ func (q *Queue) ReceiveAndWait(ctx context.Context, interval time.Duration) (*Me

// Extend a Message timeout by the given delay from now.
func (q *Queue) Extend(ctx context.Context, id ID, delay time.Duration) error {
return q.inTx(func(tx *sql.Tx) error {
return internalsql.InTx(q.db, func(tx *sql.Tx) error {
return q.ExtendTx(ctx, tx, id, delay)
})
}
Expand All @@ -197,7 +187,7 @@ func (q *Queue) ExtendTx(ctx context.Context, tx *sql.Tx, id ID, delay time.Dura

// Delete a Message from the queue by id.
func (q *Queue) Delete(ctx context.Context, id ID) error {
return q.inTx(func(tx *sql.Tx) error {
return internalsql.InTx(q.db, func(tx *sql.Tx) error {
return q.DeleteTx(ctx, tx, id)
})
}
Expand All @@ -208,41 +198,6 @@ func (q *Queue) DeleteTx(ctx context.Context, tx *sql.Tx, id ID) error {
return err
}

func (q *Queue) inTx(cb func(*sql.Tx) error) (err error) {
tx, txErr := q.db.Begin()
if txErr != nil {
return fmt.Errorf("cannot start tx: %w", txErr)
}

defer func() {
if rec := recover(); rec != nil {
err = rollback(tx, nil)
panic(rec)
}
}()

if err := cb(tx); err != nil {
return rollback(tx, err)
}

if txErr := tx.Commit(); txErr != nil {
return fmt.Errorf("cannot commit tx: %w", txErr)
}

return nil
}

func rollback(tx *sql.Tx, err error) error {
if txErr := tx.Rollback(); txErr != nil {
return fmt.Errorf("cannot roll back tx after error (tx error: %v), original error: %w", txErr, err)
}
return err
}

type discardLogger struct{}

func (l *discardLogger) Println(v ...any) {}

// Setup the queue in the database.
func Setup(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, schema)
Expand Down
37 changes: 37 additions & 0 deletions internal/sql/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sql

import (
"database/sql"
"fmt"
)

func InTx(db *sql.DB, cb func(*sql.Tx) error) (err error) {
tx, txErr := db.Begin()
if txErr != nil {
return fmt.Errorf("cannot start tx: %w", txErr)
}

defer func() {
if rec := recover(); rec != nil {
err = rollback(tx, nil)
panic(rec)
}
}()

if err := cb(tx); err != nil {
return rollback(tx, err)
}

if txErr := tx.Commit(); txErr != nil {
return fmt.Errorf("cannot commit tx: %w", txErr)
}

return nil
}

func rollback(tx *sql.Tx, err error) error {
if txErr := tx.Rollback(); txErr != nil {
return fmt.Errorf("cannot roll back tx after error (tx error: %v), original error: %w", txErr, err)
}
return err
}
15 changes: 15 additions & 0 deletions internal/testing/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
create table goqite (
id text primary key default ('m_' || lower(hex(randomblob(16)))),
created text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
updated text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
queue text not null,
body blob not null,
timeout text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
received integer not null default 0
) strict;

create trigger goqite_updated_timestamp after update on goqite begin
update goqite set updated = strftime('%Y-%m-%dT%H:%M:%fZ') where id = old.id;
end;

create index goqite_queue_created_idx on goqite (queue, created);
Loading

0 comments on commit 78755b2

Please sign in to comment.