Skip to content

Commit

Permalink
Enable s3 native copy for scenario: from s3 to s3.
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Nov 14, 2023
1 parent 945fbb3 commit 1aea818
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 10 deletions.
22 changes: 17 additions & 5 deletions src/Disks/ObjectStorages/DiskObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}

DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk)
{
return std::make_shared<MultipleDisksObjectStorageTransaction>(
*object_storage,
*metadata_storage,
*to_disk.getObjectStorage(),
*to_disk.getMetadataStorage(),
send_metadata ? metadata_helper.get() : nullptr);
}


DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_key_prefix_,
Expand Down Expand Up @@ -177,12 +188,13 @@ void DiskObjectStorage::copyFile( /// NOLINT
const ReadSettings & read_settings,
const WriteSettings & write_settings)
{
if (this == &to_disk)
if (getDataSourceDescription().sameKind(to_disk.getDataSourceDescription()))
{
/// It may use s3-server-side copy
auto transaction = createObjectStorageTransaction();
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
/// It may use s3-server-side copy
auto & to_disk_object_storage = dynamic_cast<DiskObjectStorage &>(to_disk);
auto transaction = createObjectStorageTransactionToAnotherDisk(to_disk_object_storage);
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
}
else
{
Expand Down
1 change: 1 addition & 0 deletions src/Disks/ObjectStorages/DiskObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
/// Create actual disk object storage transaction for operations
/// execution.
DiskTransactionPtr createObjectStorageTransaction();
DiskTransactionPtr createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk);

String getReadResourceName() const;
String getWriteResourceName() const;
Expand Down
38 changes: 35 additions & 3 deletions src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,29 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction(
, metadata_helper(metadata_helper_)
{}


DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_transaction_)
, metadata_helper(metadata_helper_)
{}

MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage_,
IMetadataStorage& destination_metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
: DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction())
, destination_object_storage(destination_object_storage_)
, destination_metadata_storage(destination_metadata_storage_)
{}

namespace
{
/// Operation which affects only metadata. Simplest way to
Expand Down Expand Up @@ -485,10 +508,12 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
std::string to_path;

StoredObjects created_objects;
IObjectStorage& destination_object_storage;

CopyFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage & destination_object_storage_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const std::string & from_path_,
Expand All @@ -498,6 +523,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
, write_settings(write_settings_)
, from_path(from_path_)
, to_path(to_path_)
, destination_object_storage(destination_object_storage_)
{}

std::string getInfoForLog() const override
Expand All @@ -515,7 +541,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
auto object_key = object_storage.generateObjectKeyForPath(to_path);
auto object_to = StoredObject(object_key.serialize());

object_storage.copyObject(object_from, object_to, read_settings, write_settings);
object_storage.copyObjectToAnotherObjectStorage(object_from, object_to,read_settings,write_settings, destination_object_storage);

tx->addBlobToMetadata(to_path, object_key, object_from.bytes_size);

Expand All @@ -526,7 +552,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
void undo() override
{
for (const auto & object : created_objects)
object_storage.removeObject(object);
destination_object_storage.removeObject(object);
}

void finalize() override
Expand Down Expand Up @@ -859,7 +885,13 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path));
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path));
}

void MultipleDisksObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path));
}

void DiskObjectStorageTransaction::commit()
Expand Down
25 changes: 23 additions & 2 deletions src/Disks/ObjectStorages/DiskObjectStorageTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ using DiskObjectStorageOperations = std::vector<DiskObjectStorageOperation>;
///
/// If something wrong happen on step 1 or 2 reverts all applied operations.
/// If finalize failed -- nothing is reverted, garbage is left in blob storage.
struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
{
private:
protected:
IObjectStorage & object_storage;
IMetadataStorage & metadata_storage;

Expand All @@ -63,6 +63,12 @@ struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable

DiskObjectStorageOperations operations_to_execute;

DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_);

public:
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
Expand Down Expand Up @@ -118,6 +124,21 @@ struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable
void createHardLink(const std::string & src_path, const std::string & dst_path) override;
};

struct MultipleDisksObjectStorageTransaction final : public DiskObjectStorageTransaction, std::enable_shared_from_this<MultipleDisksObjectStorageTransaction>
{
IObjectStorage& destination_object_storage;
IMetadataStorage& destination_metadata_storage;

MultipleDisksObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage,
IMetadataStorage& destination_metadata_storage,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);

void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
};

using DiskObjectStorageTransactionPtr = std::shared_ptr<DiskObjectStorageTransaction>;

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@
<allowed_disk>disk_s3</allowed_disk>
<allowed_disk>disk_s3_plain</allowed_disk>
<allowed_disk>disk_s3_cache</allowed_disk>
<allowed_disk>disk_s3_other_bucket</allowed_disk>
</backups>
</clickhouse>
21 changes: 21 additions & 0 deletions tests/integration/test_backup_restore_s3/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ def test_backup_to_disk(storage_policy, to_disk):
check_backup_and_restore(storage_policy, backup_destination)


@pytest.mark.parametrize(
"storage_policy, to_disk",
[
pytest.param(
"policy_s3",
"disk_s3_other_bucket",
id="from_s3_to_s3",
),
],
)
def test_backup_from_s3_to_s3_disk_native_copy(storage_policy, to_disk):
backup_name = new_backup_name()
backup_destination = f"Disk('{to_disk}', '{backup_name}')"
(backup_events, restore_events) = check_backup_and_restore(
storage_policy, backup_destination
)

assert backup_events["S3CopyObject"] > 0
assert restore_events["S3CopyObject"] > 0


def test_backup_to_s3():
storage_policy = "default"
backup_name = new_backup_name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ s3_plain_native_copy
Single operation copy has completed.
s3_plain_no_native_copy
Single part upload has completed.
copy from s3_plain_native_copy to s3_plain_another
Single operation copy has completed.
15 changes: 15 additions & 0 deletions tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,20 @@ function run_test_for_disk()
clickhouse-disks -C "$config" --disk "$disk" remove $CLICKHOUSE_DATABASE/test.copy
}

function run_test_copy_from_s3_to_s3(){
local disk_src=$1 && shift
local disk_dest=$1 && shift

echo "copy from $disk_src to $disk_dest"
clickhouse-disks -C "$config" --disk "$disk_src" write --input "$config" $CLICKHOUSE_DATABASE/test

clickhouse-disks -C "$config" --log-level test copy --disk-from "$disk_src" --disk-to "$disk_dest" $CLICKHOUSE_DATABASE/test $CLICKHOUSE_DATABASE/test.copy |& {
grep -o -e "Single part upload has completed." -e "Single operation copy has completed."
}
clickhouse-disks -C "$config" --disk "$disk_dest" remove $CLICKHOUSE_DATABASE/test.copy/test
clickhouse-disks -C "$config" --disk "$disk_dest" remove $CLICKHOUSE_DATABASE/test.copy
}

run_test_for_disk s3_plain_native_copy
run_test_for_disk s3_plain_no_native_copy
run_test_copy_from_s3_to_s3 s3_plain_native_copy s3_plain_another
7 changes: 7 additions & 0 deletions tests/queries/0_stateless/02802_clickhouse_disks_s3_copy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
<secret_access_key>clickhouse</secret_access_key>
<s3_allow_native_copy>true</s3_allow_native_copy>
</s3_plain_native_copy>
<s3_plain_another>
<type>s3_plain</type>
<endpoint>http://localhost:11111/test/clickhouse-disks/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<s3_allow_native_copy>true</s3_allow_native_copy>
</s3_plain_another>

<s3_plain_no_native_copy>
<type>s3_plain</type>
Expand Down

0 comments on commit 1aea818

Please sign in to comment.