Skip to content

Commit

Permalink
- fix corner cases when /var/lib/clickhouse/access already broken, fix
Browse files Browse the repository at this point in the history
…#977

- finish migrate from `apex/log` to `rs/zerolog`, fix #624, thanks @rdmrcv
  • Loading branch information
Slach committed Aug 7, 2024
1 parent 524857b commit 76d4fee
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 98 deletions.
5 changes: 5 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# v2.5.26
BUG FIXES
- fix corner cases when /var/lib/clickhouse/access already broken, fix [977](https://github.com/Altinity/clickhouse-backup/issues/977)
- finish migrate from `apex/log` to `rs/zerolog`, fix [624](https://github.com/Altinity/clickhouse-backup/issues/624), thanks @rdmrcv

# v2.5.25
BUG FIXES
- fix corner cases for wrong parsing RBAC name, during resolve conflict for complex multi line RBAC objects, fix [976](https://github.com/Altinity/clickhouse-backup/issues/976)
Expand Down
20 changes: 8 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,20 @@ require (
github.com/prometheus/client_golang v1.19.1
github.com/puzpuzpuz/xsync v1.5.2
github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.9.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.54
github.com/urfave/cli v1.22.15
github.com/xyproto/gionice v1.3.0
github.com/yargevad/filepathx v1.0.0
golang.org/x/crypto v0.25.0
golang.org/x/crypto v0.26.0
golang.org/x/mod v0.18.0
golang.org/x/sync v0.7.0
golang.org/x/sync v0.8.0
golang.org/x/text v0.17.0
google.golang.org/api v0.190.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/apex/log v1.9.0
github.com/rs/zerolog v1.33.0
golang.org/x/text v0.16.0
)

require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.7.3 // indirect
Expand Down Expand Up @@ -129,10 +125,10 @@ require (
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/time v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
Expand Down
59 changes: 15 additions & 44 deletions go.sum

Large diffs are not rendered by default.

71 changes: 40 additions & 31 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"context"
"encoding/json"
"fmt"
apexLog "github.com/apex/log"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"io"
"io/fs"
"net/url"
Expand All @@ -21,7 +21,6 @@ import (

"github.com/mattn/go-shellwords"
recursiveCopy "github.com/otiai10/copy"
"github.com/rs/zerolog/log"
"github.com/yargevad/filepathx"
"golang.org/x/sync/errgroup"
"golang.org/x/text/cases"
Expand Down Expand Up @@ -517,10 +516,10 @@ func (b *Backuper) resolveRBACConflictIfExist(ctx context.Context, sql string, a
if detectErr != nil {
return detectErr
}
if isExists, existsRBACType, existsRBACObjectId := b.isRBACExists(ctx, kind, name, accessPath, version, k, replicatedUserDirectories); isExists {
if isExists, existsRBACType, existsRBACObjectIds := b.isRBACExists(ctx, kind, name, accessPath, version, k, replicatedUserDirectories); isExists {
log.Warn().Msgf("RBAC object kind=%s, name=%s already present, will %s", kind, name, b.cfg.General.RBACConflictResolution)
if b.cfg.General.RBACConflictResolution == "recreate" || dropExists {
if dropErr := b.dropExistsRBAC(ctx, kind, name, accessPath, existsRBACType, existsRBACObjectId, k); dropErr != nil {
if dropErr := b.dropExistsRBAC(ctx, kind, name, accessPath, existsRBACType, existsRBACObjectIds, k); dropErr != nil {
return dropErr
}
return nil
Expand All @@ -532,7 +531,7 @@ func (b *Backuper) resolveRBACConflictIfExist(ctx context.Context, sql string, a
return nil
}

func (b *Backuper) isRBACExists(ctx context.Context, kind string, name string, accessPath string, version int, k *keeper.Keeper, replicatedUserDirectories []clickhouse.UserDirectory) (bool, string, string) {
func (b *Backuper) isRBACExists(ctx context.Context, kind string, name string, accessPath string, version int, k *keeper.Keeper, replicatedUserDirectories []clickhouse.UserDirectory) (bool, string, []string) {
//search in sql system.users, system.quotas, system.row_policies, system.roles, system.settings_profiles
if version > 22003000 {
var rbacSystemTableNames = map[string]string{
Expand All @@ -545,18 +544,17 @@ func (b *Backuper) isRBACExists(ctx context.Context, kind string, name string, a
systemTable, systemTableExists := rbacSystemTableNames[kind]
if !systemTableExists {
log.Error().Msgf("unsupported RBAC object kind: %s", kind)
return false, "", ""
return false, "", nil
}
isRBACExistsSQL := fmt.Sprintf("SELECT toString(id) AS id, name FROM `system`.`%s` WHERE name=? LIMIT 1", systemTable)
existsRBACRow := make([]clickhouse.RBACObject, 0)
if err := b.ch.SelectContext(ctx, &existsRBACRow, isRBACExistsSQL, name); err != nil {
log.Warn().Msgf("RBAC object resolve failed, check SQL GRANTS or <access_management> settings for user which you use to connect to clickhouse-server, kind: %s, name: %s, error: %v", kind, name, err)
return false, "", ""
return false, "", nil
}
if len(existsRBACRow) == 0 {
return false, "", ""
if len(existsRBACRow) != 0 {
return true, "sql", []string{existsRBACRow[0].Id}
}
return true, "sql", existsRBACRow[0].Id
}

checkRBACExists := func(sql string) bool {
Expand All @@ -573,37 +571,40 @@ func (b *Backuper) isRBACExists(ctx context.Context, kind string, name string, a

// search in local user directory
if sqlFiles, globErr := filepath.Glob(path.Join(accessPath, "*.sql")); globErr == nil {
existsRBACObjectIds := []string{}
for _, f := range sqlFiles {
sql, readErr := os.ReadFile(f)
if readErr != nil {
log.Warn().Msgf("read %s error: %v", f, readErr)
continue
}
if checkRBACExists(string(sql)) {
return true, "local", strings.TrimSuffix(filepath.Base(f), filepath.Ext(f))
existsRBACObjectIds = append(existsRBACObjectIds, strings.TrimSuffix(filepath.Base(f), filepath.Ext(f)))
}
}
if len(existsRBACObjectIds) > 0 {
return true, "local", existsRBACObjectIds
}
} else {
log.Warn().Msgf("access/*.sql error: %v", globErr)
}

//search in keeper replicated user directory
if k != nil && len(replicatedUserDirectories) > 0 {
var existsObjectIds []string
for _, userDirectory := range replicatedUserDirectories {
replicatedAccessPath, getAccessErr := k.GetReplicatedAccessPath(userDirectory.Name)
if getAccessErr != nil {
log.Warn().Msgf("b.isRBACExists -> k.GetReplicatedAccessPath error: %v", getAccessErr)
continue
}
isExists := false
existsObjectId := ""
walkErr := k.Walk(replicatedAccessPath, "uuid", true, func(node keeper.DumpNode) (bool, error) {
if node.Value == "" {
return false, nil
}
if checkRBACExists(node.Value) {
isExists = true
existsObjectId = strings.TrimPrefix(node.Path, path.Join(replicatedAccessPath, "uuid")+"/")
existsObjectId := strings.TrimPrefix(node.Path, path.Join(replicatedAccessPath, "uuid")+"/")
existsObjectIds = append(existsObjectIds, existsObjectId)
return true, nil
}
return false, nil
Expand All @@ -612,18 +613,18 @@ func (b *Backuper) isRBACExists(ctx context.Context, kind string, name string, a
log.Warn().Msgf("b.isRBACExists -> k.Walk error: %v", walkErr)
continue
}
if isExists {
return true, userDirectory.Name, existsObjectId
if len(existsObjectIds) > 0 {
return true, userDirectory.Name, existsObjectIds
}
}
}
return false, "", ""
return false, "", nil
}

// https://github.com/Altinity/clickhouse-backup/issues/930
var needQuoteRBACRE = regexp.MustCompile(`[^0-9a-zA-Z_]`)

func (b *Backuper) dropExistsRBAC(ctx context.Context, kind string, name string, accessPath string, rbacType, rbacObjectId string, k *keeper.Keeper) error {
func (b *Backuper) dropExistsRBAC(ctx context.Context, kind string, name string, accessPath string, rbacType string, rbacObjectIds []string, k *keeper.Keeper) error {
//sql
if rbacType == "sql" {
// https://github.com/Altinity/clickhouse-backup/issues/930
Expand All @@ -635,7 +636,12 @@ func (b *Backuper) dropExistsRBAC(ctx context.Context, kind string, name string,
}
//local
if rbacType == "local" {
return os.Remove(path.Join(accessPath, rbacObjectId+".sql"))
for _, rbacObjectId := range rbacObjectIds {
if err := os.Remove(path.Join(accessPath, rbacObjectId+".sql")); err != nil {
return err
}
}
return nil
}
//keeper
var keeperPrefixesRBAC = map[string]string{
Expand All @@ -653,12 +659,15 @@ func (b *Backuper) dropExistsRBAC(ctx context.Context, kind string, name string,
if err != nil {
return fmt.Errorf("b.dropExistsRBAC -> k.GetReplicatedAccessPath error: %v", err)
}
deletedNodes := []string{
path.Join(prefix, "uuid", rbacObjectId),
deletedNodes := make([]string, len(rbacObjectIds))
for i := range rbacObjectIds {
deletedNodes[i] = path.Join(prefix, "uuid", rbacObjectIds[i])
}
walkErr := k.Walk(prefix, keeperRBACTypePrefix, true, func(node keeper.DumpNode) (bool, error) {
if node.Value == rbacObjectId {
deletedNodes = append(deletedNodes, node.Path)
for _, rbacObjectId := range rbacObjectIds {
if node.Value == rbacObjectId {
deletedNodes = append(deletedNodes, node.Path)
}
}
return false, nil
})
Expand Down Expand Up @@ -1332,10 +1341,10 @@ func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName str
}

func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName string, backupMetadata metadata.BackupMetadata, backupTable metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk) (int64, error) {
log := apexLog.WithFields(apexLog.Fields{
logger := log.With().Fields(map[string]interface{}{
"operation": "downloadObjectDiskParts",
"table": fmt.Sprintf("%s.%s", backupTable.Database, backupTable.Table),
})
}).Logger()
size := int64(0)
dbAndTableDir := path.Join(common.TablePathEncode(backupTable.Database), common.TablePathEncode(backupTable.Table))
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -1393,7 +1402,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
// copy from required backup for required data parts, https://github.com/Altinity/clickhouse-backup/issues/865
if part.Required && backupMetadata.RequiredBackup != "" {
var findRecursiveErr error
srcBackupName, srcDiskName, findRecursiveErr = b.findObjectDiskPartRecursive(ctx, backupMetadata, backupTable, part, diskName, log)
srcBackupName, srcDiskName, findRecursiveErr = b.findObjectDiskPartRecursive(ctx, backupMetadata, backupTable, part, diskName, logger)
if findRecursiveErr != nil {
return 0, findRecursiveErr
}
Expand All @@ -1420,7 +1429,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if objMeta.RefCount > 0 || objMeta.ReadOnly {
objMeta.RefCount = 0
objMeta.ReadOnly = false
log.Debugf("%s %#v set RefCount=0 and ReadOnly=0", fPath, objMeta.StorageObjects)
logger.Debug().Msgf("%s %#v set RefCount=0 and ReadOnly=0", fPath, objMeta.StorageObjects)
if writeMetaErr := object_disk.WriteMetadataToFile(objMeta, fPath); writeMetaErr != nil {
return fmt.Errorf("%s: object_disk.WriteMetadataToFile return error: %v", fPath, writeMetaErr)
}
Expand Down Expand Up @@ -1460,14 +1469,14 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if wgWaitErr := downloadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil {
return 0, fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr)
}
log.WithField("disk", diskName).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data downloaded")
logger.Info().Str("disk", diskName).Str("duration", utils.HumanizeDuration(time.Since(start))).Str("size", utils.FormatBytes(uint64(size))).Msg("object_disk data downloaded")
}
}

return size, nil
}

func (b *Backuper) findObjectDiskPartRecursive(ctx context.Context, backup metadata.BackupMetadata, table metadata.TableMetadata, part metadata.Part, diskName string, log *apexLog.Entry) (string, string, error) {
func (b *Backuper) findObjectDiskPartRecursive(ctx context.Context, backup metadata.BackupMetadata, table metadata.TableMetadata, part metadata.Part, diskName string, logger zerolog.Logger) (string, string, error) {
if !part.Required {
return backup.BackupName, diskName, nil
}
Expand All @@ -1485,7 +1494,7 @@ func (b *Backuper) findObjectDiskPartRecursive(ctx context.Context, backup metad
for _, requiredPart := range parts {
if requiredPart.Name == part.Name {
if requiredPart.Required {
return b.findObjectDiskPartRecursive(ctx, *requiredBackup, *requiredTable, requiredPart, requiredDiskName, log)
return b.findObjectDiskPartRecursive(ctx, *requiredBackup, *requiredTable, requiredPart, requiredDiskName, logger)
}
return requiredBackup.BackupName, requiredDiskName, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/config_darwin.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package config

import (
"github.com/apex/log"
"syscall"
"github.com/rs/zerolog/log"
"os"
"syscall"
)

func (cfg *Config) SetPriority() error {
var executable string
if err := syscall.Setpriority(0, 0, cfg.General.CPUNicePriority); err != nil {
executable, err = os.Executable()
if err != nil {
log.Warnf("can't get current executable path: %v", err)
log.Warn().Msgf("can't get current executable path: %v", err)
}
log.Warnf("can't set CPU priority %s, error: %v, use `sudo setcap cap_sys_nice+ep %s` to fix it", cfg.General.CPUNicePriority, err, executable)
log.Warn().Msgf("can't set CPU priority %s, error: %v, use `sudo setcap cap_sys_nice+ep %s` to fix it", cfg.General.CPUNicePriority, err, executable)
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/config/config_linux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package config

import (
"github.com/apex/log"
"github.com/rs/zerolog/log"
"github.com/xyproto/gionice"
"os"
)
Expand All @@ -13,17 +13,17 @@ func (cfg *Config) SetPriority() error {
var nicePriority gionice.PriClass
executable, err = os.Executable()
if err != nil {
log.Warnf("can't get current executable path: %v", err)
log.Warn().Msgf("can't get current executable path: %v", err)
}
if nicePriority, err = gionice.Parse(cfg.General.IONicePriority); err != nil {
return err
}
if err = gionice.SetIDPri(0, nicePriority, 7, gionice.IOPRIO_WHO_PGRP); err != nil {
log.Warnf("can't set i/o priority %s, error: %v, use `sudo setcap cap_sys_nice+ep %s` to fix it", cfg.General.IONicePriority, err, executable)
log.Warn().Msgf("can't set i/o priority %s, error: %v, use `sudo setcap cap_sys_nice+ep %s` to fix it", cfg.General.IONicePriority, err, executable)
}
}
if err = gionice.SetNicePri(0, gionice.PRIO_PROCESS, cfg.General.CPUNicePriority); err != nil {
log.Warnf("can't set CPU priority %v, error: %v, use `sudo setcap cap_sys_nice+ep %s` to fix it", cfg.General.CPUNicePriority, err, executable)
log.Warn().Msgf("can't set CPU priority %v, error: %v, use `sudo setcap cap_sys_nice+ep %s` to fix it", cfg.General.CPUNicePriority, err, executable)
}
return nil
}
5 changes: 2 additions & 3 deletions pkg/metadata/table_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package metadata

import (
"encoding/json"
apexLog "github.com/apex/log"
"github.com/rs/zerolog/log"
"os"
"path"
)
Expand Down Expand Up @@ -51,14 +51,13 @@ func (tm *TableMetadata) Save(location string, metadataOnly bool) (uint64, error
}

func (tm *TableMetadata) Load(location string) (uint64, error) {
log := apexLog.WithField("logger", "metadata.Load")
data, err := os.ReadFile(location)
if err != nil {
return 0, err
}
if err := json.Unmarshal(data, tm); err != nil {
return 0, err
}
log.Debugf("success %s", location)
log.Debug().Msgf("success %s", location)
return uint64(len(data)), nil
}

0 comments on commit 76d4fee

Please sign in to comment.