Skip to content

Streaming Rows from SQL Databases

Viktor Nikolaiev edited this page Jun 19, 2024 · 1 revision

This code demonstrates how to stream rows from an SQL query into a rill stream of models. This approach allows processing SQL rows concurrently and on the fly without loading all rows into memory at once.

The StreamSQLRows generic reusable function takes an *sql.Rows (which can be obtained from an SQL query) and a mapping function f that converts each row into an object of type T. It returns a stream of objects of type T.

Here's an example of how to use StreamSQLRows to stream users from an SQL database. It uses ramsql instead of real database for demonstration purposes.

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/proullon/ramsql/driver"

	"github.com/destel/rill"
)

// StreamSQLRows streams objects of type T from an *sql.Rows.
// User provided function f scans the current row and returns an object of type T.
func StreamSQLRows[T any](rows *sql.Rows, f func(*sql.Rows) (T, error)) <-chan rill.Try[T] {
	var zero T
	out := make(chan rill.Try[T])
	go func() {
		defer rows.Close()
		defer close(out)

		for rows.Next() {
			item, err := f(rows)
			if err != nil {
				out <- rill.Wrap(zero, err)
				return
			}

			out <- rill.Wrap(item, nil)
		}

		if err := rows.Err(); err != nil {
			out <- rill.Wrap(zero, err)
		}

		if err := rows.Close(); err != nil {
			out <- rill.Wrap(zero, err)
		}
	}()
	return out
}

// initDB initializes a test database with sample data.
func initDB() (*sql.DB, error) {
	db, err := sql.Open("ramsql", "testdb")
	if err != nil {
		return nil, err
	}

	_, err = db.Exec("CREATE TABLE users (id INT, username VARCHAR(255), PRIMARY KEY (username))")
	if err != nil {
		return nil, err
	}

	for i := 0; i < 50; i++ {
		_, err := db.Exec(`INSERT INTO users (id, username) VALUES (?, ?)`, i, fmt.Sprintf("user_%d", i))
		if err != nil {
			return nil, err
		}
	}

	return db, nil
}

// User represents a user record from the database.
type User struct {
	ID       int
	Username string
}

func main() {
	db, err := initDB()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	rows, err := db.Query("SELECT id, username FROM users")
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	// Stream users from the query result
	users := StreamSQLRows(rows, func(rows *sql.Rows) (User, error) {
		var u User
		err := rows.Scan(&u.ID, &u.Username)
		return u, err
	})

	err = rill.ForEach(users, 1, func(user User) error {
		fmt.Printf("%+v\n", user)
		return nil
	})

	if err != nil {
		fmt.Println("Error:", err)
		return
	}
}
Clone this wiki locally