Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow upgrading Postgres with timescaleDB extension. #70

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ Probably, it does not make sense to use this project with large databases. Howev
## Supported Databases

| Database | Image | Status | Upgrade Support |
|-------------|--------------|:------:|:---------------:|
| postgres | >= 12-alpine | beta | ✅ |
| rethinkdb | >= 2.4.0 | beta | ❌ |
| ETCD | >= 3.5 | alpha | ❌ |
| meilisearch | >= 1.2.0 | alpha | ✅ |
| redis | >= 6.0 | alpha | ❌ |
| keydb | >= 6.0 | alpha | ❌ |
| ----------- | ------------ | :----: | :-------------: |
| postgres | >= 12-alpine | beta | ✅ |
| rethinkdb | >= 2.4.0 | beta | ❌ |
| ETCD | >= 3.5 | alpha | ❌ |
| meilisearch | >= 1.2.0 | alpha | ✅ |
| redis | >= 6.0 | alpha | ❌ |
| keydb | >= 6.0 | alpha | ❌ |

Postgres also supports updates when using the TimescaleDB extension. Please consider the integration test for supported upgrade paths.

## Database Upgrades

Expand Down
83 changes: 82 additions & 1 deletion cmd/internal/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/utils"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"go.uber.org/zap"

_ "github.com/lib/pq"
)

const (
Expand Down Expand Up @@ -153,7 +155,7 @@ func (db *Postgres) Recover(ctx context.Context) error {
func (db *Postgres) Probe(ctx context.Context) error {
// TODO is postgres db OK ?
connString := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable", db.host, db.port, db.user, db.password)
var err error

dbc, err := sql.Open("postgres", connString)
if err != nil {
return fmt.Errorf("unable to open postgres connection %w", err)
Expand All @@ -164,5 +166,84 @@ func (db *Postgres) Probe(ctx context.Context) error {
if err != nil {
return fmt.Errorf("unable to ping postgres connection %w", err)
}

runsTimescaleDB, err := db.runningTimescaleDB(ctx, postgresConfigCmd)
if err == nil && runsTimescaleDB {
db.log.Infow("detected running timescaledb, running post-start hook to update timescaledb extension if necessary")

err = db.updateTimescaleDB(ctx, dbc)
if err != nil {
return fmt.Errorf("unable to update timescaledb: %w", err)
}
}

return nil
}

func (db *Postgres) updateTimescaleDB(ctx context.Context, dbc *sql.DB) error {
var (
databaseNames []string
)

databaseNameRows, err := dbc.QueryContext(ctx, "SELECT datname,datallowconn FROM pg_database")
if err != nil {
return fmt.Errorf("unable to get database names: %w", err)
}
defer databaseNameRows.Close()

for databaseNameRows.Next() {
var name string
var allowed bool
if err := databaseNameRows.Scan(&name, &allowed); err != nil {
return err
}

if allowed {
databaseNames = append(databaseNames, name)
}
}
if err := databaseNameRows.Err(); err != nil {
return err
}

for _, dbName := range databaseNames {
connString := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", db.host, db.port, db.user, db.password, dbName)
dbc2, err := sql.Open("postgres", connString)
if err != nil {
return fmt.Errorf("unable to open postgres connection %w", err)
}
defer dbc2.Close()

rows, err := dbc2.QueryContext(ctx, "SELECT extname FROM pg_extension")
if err != nil {
return fmt.Errorf("unable to get extensions: %w", err)
}
defer rows.Close()

for rows.Next() {
var extName string
if err := rows.Scan(&extName); err != nil {
return err
}

if extName != "timescaledb" {
continue
}

db.log.Infow("updating timescaledb extension", "db-name", dbName)

_, err = dbc2.ExecContext(ctx, "ALTER EXTENSION timescaledb UPDATE")
if err != nil {
return fmt.Errorf("unable to update extension: %w", err)
}

break
}

if err := rows.Err(); err != nil {
return err
}
}

return nil
}
52 changes: 51 additions & 1 deletion cmd/internal/database/postgres/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,24 @@ func (db *Postgres) Upgrade(ctx context.Context) error {
if err != nil {
return err
}
gid, err := strconv.Atoi(pgUser.Gid)
if err != nil {
return err
}

// remove /data/postgres-new if present
newDataDirTemp := path.Join("/data", "postgres-new")
newDataDirTemp := path.Join("/data", "postgres-new") // TODO: /data should not be hardcoded
err = os.RemoveAll(newDataDirTemp)
if err != nil {
db.log.Errorw("unable to remove new datadir, skipping upgrade", "error", err)
return nil
}

err = os.Chown("/data", uid, gid)
if err != nil {
return err
}

// initdb -D /data/postgres-new
cmd := exec.Command(postgresInitDBCmd, "-D", newDataDirTemp)
cmd.Stdout = os.Stdout
Expand Down Expand Up @@ -171,6 +180,24 @@ func (db *Postgres) Upgrade(ctx context.Context) error {
"--new-bindir", newPostgresBinDir,
"--link",
}

runsTimescaleDB, err := db.runningTimescaleDB(ctx, postgresConfigCmd)
if err != nil {
return err
}

if runsTimescaleDB {
// see https://github.com/timescale/timescaledb/issues/1844 and https://github.com/timescale/timescaledb/issues/4503#issuecomment-1860883843
db.log.Infow("running timescaledb, applying custom options for upgrade command")

// timescaledb libraries in this container are only compatible with the current postgres version
// do not load them anymore with the old postgresql server
pgUpgradeArgs = append(pgUpgradeArgs,
"--old-options", "-c shared_preload_libraries=''",
"--new-options", "-c timescaledb.restoring=on -c shared_preload_libraries=timescaledb",
)
}

cmd = exec.CommandContext(ctx, postgresUpgradeCmd, pgUpgradeArgs...) //nolint:gosec
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
Expand Down Expand Up @@ -255,6 +282,29 @@ func (db *Postgres) getBinDir(ctx context.Context, pgConfigCmd string) (string,
return strings.TrimSpace(string(out)), nil
}

func (db *Postgres) runningTimescaleDB(ctx context.Context, pgConfigCmd string) (bool, error) {
libDir, err := db.getLibDir(ctx, pgConfigCmd)
if err != nil {
return false, err
}

if _, err := os.Stat(path.Join(libDir, "timescaledb.so")); err == nil {
return true, nil
}

return false, nil
}

func (db *Postgres) getLibDir(ctx context.Context, pgConfigCmd string) (string, error) {
cmd := exec.CommandContext(ctx, pgConfigCmd, "--pkglibdir")
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("unable to figure out lib dir: %w", err)
}

return strings.TrimSpace(string(out)), nil
}

// copyPostgresBinaries is needed to save old postgres binaries for a later major upgrade
func (db *Postgres) copyPostgresBinaries(ctx context.Context, override bool) error {
binDir, err := db.getBinDir(ctx, postgresConfigCmd)
Expand Down
30 changes: 28 additions & 2 deletions cmd/internal/utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"strings"
"time"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ func (c *CmdExecutor) ExecWithStreamingOutput(ctx context.Context, command strin

parts := strings.Fields(command)

cmd := exec.CommandContext(ctx, parts[0], parts[1:]...) // nolint:gosec
cmd := exec.Command(parts[0], parts[1:]...) // nolint:gosec

c.log.Debugw("running command", "command", cmd.Path, "args", cmd.Args)

Expand All @@ -61,5 +62,30 @@ func (c *CmdExecutor) ExecWithStreamingOutput(ctx context.Context, command strin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout

return cmd.Run()
err := cmd.Start()
if err != nil {
return err
}

go func() {
<-ctx.Done()

go func() {
time.Sleep(10 * time.Second)

c.log.Infow("force killing post-exec command now")
if err := cmd.Process.Signal(os.Kill); err != nil {
panic(err)
}
}()

c.log.Infow("sending sigint to post-exec command process")

err := cmd.Process.Signal(os.Interrupt)
if err != nil {
c.log.Errorw("unable to send interrupt to post-exec command", "error", err)
}
}()

return cmd.Wait()
}
104 changes: 104 additions & 0 deletions integration/postgres_timescaledb_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//go:build integration

package integration_test

import (
"context"
"testing"

"github.com/metal-stack/backup-restore-sidecar/pkg/generate/examples/examples"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

_ "github.com/lib/pq"
)

func Test_Postgres_TimescaleDB_Upgrade(t *testing.T) {
backingResources := examples.PostgresBackingResources(namespaceName(t))

modified := false

for _, r := range backingResources {
cm, ok := r.(*corev1.ConfigMap)
if !ok {
continue
}

if cm.Name != "backup-restore-sidecar-config-postgres" {
continue
}

cm.Data = map[string]string{
"config.yaml": `---
bind-addr: 0.0.0.0
db: postgres
db-data-directory: /data/postgres/
backup-provider: local
backup-cron-schedule: "*/1 * * * *"
object-prefix: postgres-test
compression-method: tar
post-exec-cmds:
- docker-entrypoint.sh postgres -c shared_preload_libraries=timescaledb
`}

modified = true
break
}

require.True(t, modified)

upgradeFlow(t, &upgradeFlowSpec{
flowSpec: flowSpec{
databaseType: examples.Postgres,
sts: examples.PostgresSts,
backingResources: func(namespace string) []client.Object {
return backingResources
},
addTestData: addTimescaleDbTestData,
verifyTestData: verifyPostgresTestData,
},
databaseImages: []string{
"timescale/timescaledb:2.11.2-pg12",
"timescale/timescaledb:2.11.2-pg15",
// it is allowed to skip a minor version
// "timescale/timescaledb:2.12.2-pg15",
"timescale/timescaledb:2.13.1-pg15",
"timescale/timescaledb:2.13.1-pg16",
},
})
}

func addTimescaleDbTestData(t *testing.T, ctx context.Context) {
db := newPostgresSession(t, ctx)
defer db.Close()

var (
createStmt = `

CREATE EXTENSION IF NOT EXISTS timescaledb;

CREATE TABLE IF NOT EXISTS backuprestore (
timestamp timestamp NOT NULL,
data text NOT NULL,
PRIMARY KEY(timestamp, data)
);
SELECT create_hypertable('backuprestore', 'timestamp', chunk_time_interval => INTERVAL '1 days', if_not_exists => TRUE);

ALTER TABLE backuprestore SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'data',
timescaledb.compress_orderby = 'timestamp'
);
SELECT add_compression_policy('backuprestore', INTERVAL '1 days');

`
insertStmt = `INSERT INTO backuprestore("timestamp", "data") VALUES ('2024-01-01 12:00:00.000', 'I am precious');`
)

_, err := db.Exec(createStmt)
require.NoError(t, err)

_, err = db.Exec(insertStmt)
require.NoError(t, err)
}
Loading