From 1aea8186c7d842df872b0450ad30fbb803ee3147 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 14 Nov 2023 13:47:17 +0000 Subject: [PATCH] Enable s3 native copy for scenario: from s3 to s3. --- .../ObjectStorages/DiskObjectStorage.cpp | 22 ++++++++--- src/Disks/ObjectStorages/DiskObjectStorage.h | 1 + .../DiskObjectStorageTransaction.cpp | 38 +++++++++++++++++-- .../DiskObjectStorageTransaction.h | 25 +++++++++++- .../configs/disk_s3.xml | 1 + .../test_backup_restore_s3/test.py | 21 ++++++++++ .../02802_clickhouse_disks_s3_copy.reference | 2 + .../02802_clickhouse_disks_s3_copy.sh | 15 ++++++++ .../02802_clickhouse_disks_s3_copy.xml | 7 ++++ 9 files changed, 122 insertions(+), 10 deletions(-) diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index c1f053be7c67..965407c615a1 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -46,6 +46,17 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction() send_metadata ? metadata_helper.get() : nullptr); } +DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk) +{ + return std::make_shared( + *object_storage, + *metadata_storage, + *to_disk.getObjectStorage(), + *to_disk.getMetadataStorage(), + send_metadata ? metadata_helper.get() : nullptr); +} + + DiskObjectStorage::DiskObjectStorage( const String & name_, const String & object_key_prefix_, @@ -177,12 +188,13 @@ void DiskObjectStorage::copyFile( /// NOLINT const ReadSettings & read_settings, const WriteSettings & write_settings) { - if (this == &to_disk) + if (getDataSourceDescription().sameKind(to_disk.getDataSourceDescription())) { - /// It may use s3-server-side copy - auto transaction = createObjectStorageTransaction(); - transaction->copyFile(from_file_path, to_file_path); - transaction->commit(); + /// It may use s3-server-side copy + auto & to_disk_object_storage = dynamic_cast(to_disk); + auto transaction = createObjectStorageTransactionToAnotherDisk(to_disk_object_storage); + transaction->copyFile(from_file_path, to_file_path); + transaction->commit(); } else { diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index 66d1b02aea7b..4bcde9691526 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -220,6 +220,7 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper; /// Create actual disk object storage transaction for operations /// execution. DiskTransactionPtr createObjectStorageTransaction(); + DiskTransactionPtr createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk); String getReadResourceName() const; String getWriteResourceName() const; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp index 25de89a95484..4f1b31951b8d 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp @@ -38,6 +38,29 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction( , metadata_helper(metadata_helper_) {} + +DiskObjectStorageTransaction::DiskObjectStorageTransaction( + IObjectStorage & object_storage_, + IMetadataStorage & metadata_storage_, + DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_, + MetadataTransactionPtr metadata_transaction_) + : object_storage(object_storage_) + , metadata_storage(metadata_storage_) + , metadata_transaction(metadata_transaction_) + , metadata_helper(metadata_helper_) +{} + +MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction( + IObjectStorage & object_storage_, + IMetadataStorage & metadata_storage_, + IObjectStorage& destination_object_storage_, + IMetadataStorage& destination_metadata_storage_, + DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_) + : DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction()) + , destination_object_storage(destination_object_storage_) + , destination_metadata_storage(destination_metadata_storage_) +{} + namespace { /// Operation which affects only metadata. Simplest way to @@ -485,10 +508,12 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation std::string to_path; StoredObjects created_objects; + IObjectStorage& destination_object_storage; CopyFileObjectStorageOperation( IObjectStorage & object_storage_, IMetadataStorage & metadata_storage_, + IObjectStorage & destination_object_storage_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const std::string & from_path_, @@ -498,6 +523,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation , write_settings(write_settings_) , from_path(from_path_) , to_path(to_path_) + , destination_object_storage(destination_object_storage_) {} std::string getInfoForLog() const override @@ -515,7 +541,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation auto object_key = object_storage.generateObjectKeyForPath(to_path); auto object_to = StoredObject(object_key.serialize()); - object_storage.copyObject(object_from, object_to, read_settings, write_settings); + object_storage.copyObjectToAnotherObjectStorage(object_from, object_to,read_settings,write_settings, destination_object_storage); tx->addBlobToMetadata(to_path, object_key, object_from.bytes_size); @@ -526,7 +552,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation void undo() override { for (const auto & object : created_objects) - object_storage.removeObject(object); + destination_object_storage.removeObject(object); } void finalize() override @@ -859,7 +885,13 @@ void DiskObjectStorageTransaction::createFile(const std::string & path) void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) { operations_to_execute.emplace_back( - std::make_unique(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path)); + std::make_unique(object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path)); +} + +void MultipleDisksObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) +{ + operations_to_execute.emplace_back( + std::make_unique(object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path)); } void DiskObjectStorageTransaction::commit() diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h index 4b62a41e161a..67044751b840 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h @@ -50,9 +50,9 @@ using DiskObjectStorageOperations = std::vector; /// /// If something wrong happen on step 1 or 2 reverts all applied operations. /// If finalize failed -- nothing is reverted, garbage is left in blob storage. -struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable_shared_from_this +struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_shared_from_this { -private: +protected: IObjectStorage & object_storage; IMetadataStorage & metadata_storage; @@ -63,6 +63,12 @@ struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable DiskObjectStorageOperations operations_to_execute; + DiskObjectStorageTransaction( + IObjectStorage & object_storage_, + IMetadataStorage & metadata_storage_, + DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_, + MetadataTransactionPtr metadata_transaction_); + public: DiskObjectStorageTransaction( IObjectStorage & object_storage_, @@ -118,6 +124,21 @@ struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable void createHardLink(const std::string & src_path, const std::string & dst_path) override; }; +struct MultipleDisksObjectStorageTransaction final : public DiskObjectStorageTransaction, std::enable_shared_from_this +{ + IObjectStorage& destination_object_storage; + IMetadataStorage& destination_metadata_storage; + + MultipleDisksObjectStorageTransaction( + IObjectStorage & object_storage_, + IMetadataStorage & metadata_storage_, + IObjectStorage& destination_object_storage, + IMetadataStorage& destination_metadata_storage, + DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_); + + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override; +}; + using DiskObjectStorageTransactionPtr = std::shared_ptr; } diff --git a/tests/integration/test_backup_restore_s3/configs/disk_s3.xml b/tests/integration/test_backup_restore_s3/configs/disk_s3.xml index d635e39e13f8..45a1e17b0396 100644 --- a/tests/integration/test_backup_restore_s3/configs/disk_s3.xml +++ b/tests/integration/test_backup_restore_s3/configs/disk_s3.xml @@ -58,5 +58,6 @@ disk_s3 disk_s3_plain disk_s3_cache + disk_s3_other_bucket diff --git a/tests/integration/test_backup_restore_s3/test.py b/tests/integration/test_backup_restore_s3/test.py index f8ec39d240b4..60640c6bac9c 100644 --- a/tests/integration/test_backup_restore_s3/test.py +++ b/tests/integration/test_backup_restore_s3/test.py @@ -173,6 +173,27 @@ def test_backup_to_disk(storage_policy, to_disk): check_backup_and_restore(storage_policy, backup_destination) +@pytest.mark.parametrize( + "storage_policy, to_disk", + [ + pytest.param( + "policy_s3", + "disk_s3_other_bucket", + id="from_s3_to_s3", + ), + ], +) +def test_backup_from_s3_to_s3_disk_native_copy(storage_policy, to_disk): + backup_name = new_backup_name() + backup_destination = f"Disk('{to_disk}', '{backup_name}')" + (backup_events, restore_events) = check_backup_and_restore( + storage_policy, backup_destination + ) + + assert backup_events["S3CopyObject"] > 0 + assert restore_events["S3CopyObject"] > 0 + + def test_backup_to_s3(): storage_policy = "default" backup_name = new_backup_name() diff --git a/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.reference b/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.reference index 96860a2f90a4..8da02f5bef6a 100644 --- a/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.reference +++ b/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.reference @@ -2,3 +2,5 @@ s3_plain_native_copy Single operation copy has completed. s3_plain_no_native_copy Single part upload has completed. +copy from s3_plain_native_copy to s3_plain_another +Single operation copy has completed. diff --git a/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.sh b/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.sh index 33321607728e..2b9e5296a05e 100755 --- a/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.sh +++ b/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.sh @@ -24,5 +24,20 @@ function run_test_for_disk() clickhouse-disks -C "$config" --disk "$disk" remove $CLICKHOUSE_DATABASE/test.copy } +function run_test_copy_from_s3_to_s3(){ + local disk_src=$1 && shift + local disk_dest=$1 && shift + + echo "copy from $disk_src to $disk_dest" + clickhouse-disks -C "$config" --disk "$disk_src" write --input "$config" $CLICKHOUSE_DATABASE/test + + clickhouse-disks -C "$config" --log-level test copy --disk-from "$disk_src" --disk-to "$disk_dest" $CLICKHOUSE_DATABASE/test $CLICKHOUSE_DATABASE/test.copy |& { + grep -o -e "Single part upload has completed." -e "Single operation copy has completed." + } + clickhouse-disks -C "$config" --disk "$disk_dest" remove $CLICKHOUSE_DATABASE/test.copy/test + clickhouse-disks -C "$config" --disk "$disk_dest" remove $CLICKHOUSE_DATABASE/test.copy +} + run_test_for_disk s3_plain_native_copy run_test_for_disk s3_plain_no_native_copy +run_test_copy_from_s3_to_s3 s3_plain_native_copy s3_plain_another diff --git a/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.xml b/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.xml index d4235a709035..4b45815a125f 100644 --- a/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.xml +++ b/tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.xml @@ -8,6 +8,13 @@ clickhouse true + + s3_plain + http://localhost:11111/test/clickhouse-disks/ + clickhouse + clickhouse + true + s3_plain