Skip to content

Commit

Permalink
Add support for compression_format: none for upload and download ba…
Browse files Browse the repository at this point in the history
…ckups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix #713
  • Loading branch information
Slach committed Aug 3, 2023
1 parent 109421c commit 6bfab10
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 32 deletions.
3 changes: 2 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ IMPROVEMENTS
- Implementation blacklist for table engines during backup / download / upload / restore [537](https://github.com/Altinity/clickhouse-backup/issues/537)
- restore RBAC / configs, refactoring restart clickhouse-server via `sql:SYSTEM SHUTDOWN` or `exec:systemctl restart clickhouse-server`, add `--rbac-only` and `--configs-only` options to `create`, `upload`, `download`, `restore` command. fix [706]https://github.com/Altinity/clickhouse-backup/issues/706
- Backup/Restore RBAC related objects from Zookeeper via direct connection to zookeeper/keeper, fix [604](https://github.com/Altinity/clickhouse-backup/issues/604)
- Add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values none (no sharding), table (table granularity), database (database granularity), first-replica (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648)
- Add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648)
- Add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713)

BUG FIXES
- fix possible create backup failures during UNFREEZE not exists tables, affected 2.2.7+ version, fix [704](https://github.com/Altinity/clickhouse-backup/issues/704)
Expand Down
2 changes: 2 additions & 0 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
apexLog "github.com/apex/log"
)

const DirectoryFormat = "directory"

var errShardOperationUnsupported = errors.New("sharded operations are not supported")

// versioner is an interface for determining the version of Clickhouse
Expand Down
59 changes: 46 additions & 13 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -159,7 +161,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [

if !schemaOnly && !b.cfg.General.DownloadByPart && remoteBackup.RequiredBackup != "" {
err := b.Download(remoteBackup.RequiredBackup, tablePattern, partitions, schemaOnly, b.resume, commandId)
if err != nil && err != ErrBackupIsAlreadyExists {
if err != nil && !errors.Is(err, ErrBackupIsAlreadyExists) {
return err
}
}
Expand Down Expand Up @@ -330,7 +332,7 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
metadataFiles := map[string]string{}
remoteMedataPrefix := path.Join(backupName, "metadata", common.TablePathEncode(tableTitle.Database), common.TablePathEncode(tableTitle.Table))
metadataFiles[fmt.Sprintf("%s.json", remoteMedataPrefix)] = path.Join(b.DefaultDataPath, "backup", backupName, "metadata", common.TablePathEncode(tableTitle.Database), fmt.Sprintf("%s.json", common.TablePathEncode(tableTitle.Table)))
partitionsIdMap := make(map[metadata.TableTitle]common.EmptyMap, 0)
partitionsIdMap := make(map[metadata.TableTitle]common.EmptyMap)
if b.isEmbedded {
metadataFiles[fmt.Sprintf("%s.sql", remoteMedataPrefix)] = path.Join(b.EmbeddedBackupDataPath, backupName, "metadata", common.TablePathEncode(tableTitle.Database), fmt.Sprintf("%s.sql", common.TablePathEncode(tableTitle.Table)))
metadataFiles[fmt.Sprintf("%s.json", remoteMedataPrefix)] = path.Join(b.EmbeddedBackupDataPath, backupName, "metadata", common.TablePathEncode(tableTitle.Database), fmt.Sprintf("%s.json", common.TablePathEncode(tableTitle.Table)))
Expand Down Expand Up @@ -425,28 +427,59 @@ func (b *Backuper) downloadConfigData(ctx context.Context, remoteBackup storage.

func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup storage.Backup, prefix string) (uint64, error) {
log := b.log.WithField("logger", "downloadBackupRelatedDir")
archiveFile := fmt.Sprintf("%s.%s", prefix, b.cfg.GetArchiveExtension())
remoteFile := path.Join(remoteBackup.BackupName, archiveFile)

localDir := path.Join(b.DefaultDataPath, "backup", remoteBackup.BackupName, prefix)

if remoteBackup.DataFormat != DirectoryFormat {
prefix = fmt.Sprintf("%s.%s", prefix, b.cfg.GetArchiveExtension())
}
remoteSource := path.Join(remoteBackup.BackupName, prefix)

if b.resume {
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed {
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteSource); isProcessed {
return uint64(processedSize), nil
}
}
localDir := path.Join(b.DefaultDataPath, "backup", remoteBackup.BackupName, prefix)
remoteFileInfo, err := b.dst.StatFile(ctx, remoteFile)

if remoteBackup.DataFormat == DirectoryFormat {
if err := b.dst.DownloadPath(ctx, 0, remoteSource, localDir, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration); err != nil {
return 0, err
}
downloadedBytes := int64(0)
if _, err := os.Stat(localDir); err != nil && os.IsNotExist(err) {
return 0, nil
}
if err := filepath.Walk(localDir, func(fPath string, fInfo fs.FileInfo, err error) error {
if err != nil {
return err
}
if fInfo.IsDir() {
return nil
}
downloadedBytes += fInfo.Size()
return nil
}); err != nil {
return 0, err
}
if b.resume {
b.resumableState.AppendToState(remoteSource, downloadedBytes)
}
return uint64(downloadedBytes), nil
}
remoteFileInfo, err := b.dst.StatFile(ctx, remoteSource)
if err != nil {
log.Debugf("%s not exists on remote storage, skip download", remoteFile)
log.Debugf("%s not exists on remote storage, skip download", remoteSource)
return 0, nil
}
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
err = retry.RunCtx(ctx, func(ctx context.Context) error {
return b.dst.DownloadCompressedStream(ctx, remoteFile, localDir)
return b.dst.DownloadCompressedStream(ctx, remoteSource, localDir)
})
if err != nil {
return 0, err
}
if b.resume {
b.resumableState.AppendToState(remoteFile, remoteFileInfo.Size())
b.resumableState.AppendToState(remoteSource, remoteFileInfo.Size())
}
return uint64(remoteFileInfo.Size()), nil
}
Expand All @@ -458,9 +491,9 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency))
g, dataCtx := errgroup.WithContext(ctx)

if remoteBackup.DataFormat != "directory" {
if remoteBackup.DataFormat != DirectoryFormat {
capacity := 0
downloadOffset := make(map[string]int, 0)
downloadOffset := make(map[string]int)
for disk := range table.Files {
capacity += len(table.Files[disk])
downloadOffset[disk] = 0
Expand Down Expand Up @@ -781,7 +814,7 @@ func (b *Backuper) findDiffOnePart(ctx context.Context, requiredBackup *metadata
log.Debugf("start")
tableRemoteFiles := make(map[string]string)
// find same disk and part name archive
if requiredBackup.DataFormat != "directory" {
if requiredBackup.DataFormat != DirectoryFormat {
if tableRemoteFile, tableLocalDir, err := b.findDiffOnePartArchive(ctx, requiredBackup, table, localDisk, remoteDisk, part); err == nil {
tableRemoteFiles[tableRemoteFile] = tableLocalDir
return tableRemoteFiles, nil, true
Expand Down
51 changes: 36 additions & 15 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
if b.cfg.GetCompressionFormat() != "none" {
backupMetadata.DataFormat = b.cfg.GetCompressionFormat()
} else {
backupMetadata.DataFormat = "directory"
backupMetadata.DataFormat = DirectoryFormat
}
newBackupMetadataBody, err := json.MarshalIndent(backupMetadata, "", "\t")
if err != nil {
Expand Down Expand Up @@ -419,24 +419,31 @@ func (b *Backuper) validateUploadParams(ctx context.Context, backupName string,
func (b *Backuper) uploadConfigData(ctx context.Context, backupName string) (uint64, error) {
configBackupPath := path.Join(b.DefaultDataPath, "backup", backupName, "configs")
configFilesGlobPattern := path.Join(configBackupPath, "**/*.*")
if b.cfg.GetCompressionFormat() == "none" {
remoteConfigsDir := path.Join(backupName, "configs")
return b.uploadBackupRelatedDir(ctx, configBackupPath, configFilesGlobPattern, remoteConfigsDir)
}
remoteConfigsArchive := path.Join(backupName, fmt.Sprintf("configs.%s", b.cfg.GetArchiveExtension()))
return b.uploadAndArchiveBackupRelatedDir(ctx, configBackupPath, configFilesGlobPattern, remoteConfigsArchive)

return b.uploadBackupRelatedDir(ctx, configBackupPath, configFilesGlobPattern, remoteConfigsArchive)
}

func (b *Backuper) uploadRBACData(ctx context.Context, backupName string) (uint64, error) {
rbacBackupPath := path.Join(b.DefaultDataPath, "backup", backupName, "access")
accessFilesGlobPattern := path.Join(rbacBackupPath, "*.*")
if b.cfg.GetCompressionFormat() == "none" {
remoteRBACDir := path.Join(backupName, "access")
return b.uploadBackupRelatedDir(ctx, rbacBackupPath, accessFilesGlobPattern, remoteRBACDir)
}
remoteRBACArchive := path.Join(backupName, fmt.Sprintf("access.%s", b.cfg.GetArchiveExtension()))
return b.uploadAndArchiveBackupRelatedDir(ctx, rbacBackupPath, accessFilesGlobPattern, remoteRBACArchive)
return b.uploadBackupRelatedDir(ctx, rbacBackupPath, accessFilesGlobPattern, remoteRBACArchive)
}

func (b *Backuper) uploadAndArchiveBackupRelatedDir(ctx context.Context, localBackupRelatedDir, localFilesGlobPattern, remoteFile string) (uint64, error) {
func (b *Backuper) uploadBackupRelatedDir(ctx context.Context, localBackupRelatedDir, localFilesGlobPattern, destinationRemote string) (uint64, error) {
if _, err := os.Stat(localBackupRelatedDir); os.IsNotExist(err) {
return 0, nil
}
if b.resume {
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed {
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(destinationRemote); isProcessed {
return uint64(processedSize), nil
}
}
Expand All @@ -445,24 +452,38 @@ func (b *Backuper) uploadAndArchiveBackupRelatedDir(ctx context.Context, localBa
if localFiles, err = filepathx.Glob(localFilesGlobPattern); err != nil || localFiles == nil || len(localFiles) == 0 {
return 0, fmt.Errorf("list %s return list=%v with err=%v", localFilesGlobPattern, localFiles, err)
}
for i := range localFiles {
localFiles[i] = strings.Replace(localFiles[i], localBackupRelatedDir, "", 1)
}

for i := 0; i < len(localFiles); i++ {
if fileInfo, err := os.Stat(localFiles[i]); err == nil && fileInfo.IsDir() {
localFiles = append(localFiles[:i], localFiles[i+1:]...)
i--
} else {
localFiles[i] = strings.Replace(localFiles[i], localBackupRelatedDir, "", 1)
}
}
if b.cfg.GetCompressionFormat() == "none" {
remoteUploadedBytes := int64(0)
if remoteUploadedBytes, err = b.dst.UploadPath(ctx, 0, localBackupRelatedDir, localFiles, destinationRemote, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration); err != nil {
return 0, fmt.Errorf("can't RBAC or config upload %s: %v", destinationRemote, err)
}
if b.resume {
b.resumableState.AppendToState(destinationRemote, remoteUploadedBytes)
}
return uint64(remoteUploadedBytes), nil
}
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
err = retry.RunCtx(ctx, func(ctx context.Context) error {
return b.dst.UploadCompressedStream(ctx, localBackupRelatedDir, localFiles, remoteFile)
return b.dst.UploadCompressedStream(ctx, localBackupRelatedDir, localFiles, destinationRemote)
})

if err != nil {
return 0, fmt.Errorf("can't RBAC or config upload: %v", err)
return 0, fmt.Errorf("can't RBAC or config upload compressed %s: %v", destinationRemote, err)
}
remoteUploaded, err := b.dst.StatFile(ctx, remoteFile)
remoteUploaded, err := b.dst.StatFile(ctx, destinationRemote)
if err != nil {
return 0, fmt.Errorf("can't check uploaded remoteFile: %s, error: %v", remoteFile, err)
return 0, fmt.Errorf("can't check uploaded destinationRemote: %s, error: %v", destinationRemote, err)
}
if b.resume {
b.resumableState.AppendToState(remoteFile, remoteUploaded.Size())
b.resumableState.AppendToState(destinationRemote, remoteUploaded.Size())
}
return uint64(remoteUploaded.Size()), nil
}
Expand Down
4 changes: 3 additions & 1 deletion test/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ services:
ports:
- "7171:7171"
# for delve debugger
- "40001:40001"
# - "40001:40001"
networks:
- clickhouse-backup
depends_on:
Expand Down Expand Up @@ -203,6 +203,8 @@ services:
ports:
- "8123:8123"
- "9000:9000"
# for delve debugger
- "40001:40001"
networks:
- clickhouse-backup
links:
Expand Down
1 change: 1 addition & 0 deletions test/integration/install_delve.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# golang
rm -rf /etc/apt/sources.list.d/clickhouse.list
apt-get update && apt-get install -y software-properties-common
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 52B59B1571A79DBC054901C0F6BC817356A3D45E
add-apt-repository -y ppa:longsleep/golang-backports
Expand Down
4 changes: 2 additions & 2 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ func TestDoRestoreRBAC(t *testing.T) {

// TestDoRestoreConfigs - require direct access to `/etc/clickhouse-backup/`, so executed inside `clickhouse` container
func TestDoRestoreConfigs(t *testing.T) {
if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "1.1.54391") == -1 {
if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "1.1.54391") < 0 {
t.Skipf("Test skipped, users.d is not available for %s version", os.Getenv("CLICKHOUSE_VERSION"))
}
ch := &TestClickHouse{}
Expand All @@ -1116,7 +1116,7 @@ func TestDoRestoreConfigs(t *testing.T) {

r.NoError(dockerExec("clickhouse", "clickhouse-backup", "create", "--configs", "--configs-only", "test_configs_backup"))
ch.queryWithNoError(r, "DROP TABLE IF EXISTS default.test_configs")
r.NoError(dockerExec("clickhouse", "bash", "-xec", "ALLOW_EMPTY_BACKUPS=1 clickhouse-backup upload test_configs_backup"))
r.NoError(dockerExec("clickhouse", "bash", "-xec", "S3_COMPRESSION_FORMAT=none ALLOW_EMPTY_BACKUPS=1 clickhouse-backup upload test_configs_backup"))
r.NoError(dockerExec("clickhouse", "clickhouse-backup", "delete", "local", "test_configs_backup"))

ch.chbackend.Close()
Expand Down

0 comments on commit 6bfab10

Please sign in to comment.