From 7d0214018cf4319b1fa920d3d39a74afcdde1a29 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Mon, 13 May 2024 12:46:42 +0000 Subject: [PATCH] Enable truncate operation for object storage disks. --- src/Common/FailPoint.cpp | 3 +- src/Disks/DiskEncryptedTransaction.h | 7 +++ src/Disks/FakeDiskTransaction.h | 11 ++++ src/Disks/IDiskTransaction.h | 3 ++ .../ObjectStorages/DiskObjectStorage.cpp | 8 +++ src/Disks/ObjectStorages/DiskObjectStorage.h | 2 + .../DiskObjectStorageMetadata.cpp | 13 +++++ .../DiskObjectStorageMetadata.h | 1 + .../DiskObjectStorageTransaction.cpp | 52 +++++++++++++++++++ .../DiskObjectStorageTransaction.h | 2 + src/Disks/ObjectStorages/IMetadataStorage.h | 14 +++++ .../MetadataStorageFromDisk.cpp | 8 +++ .../ObjectStorages/MetadataStorageFromDisk.h | 2 +- ...taStorageFromDiskTransactionOperations.cpp | 37 +++++++++++++ ...dataStorageFromDiskTransactionOperations.h | 30 +++++++++++ src/Storages/StorageStripeLog.cpp | 11 ++++ .../test_log_family_s3/configs/minio.xml | 12 ----- .../configs/storage_configuration.xml | 34 ++++++++++++ tests/integration/test_log_family_s3/test.py | 38 +++++++++++++- 19 files changed, 273 insertions(+), 15 deletions(-) delete mode 100644 tests/integration/test_log_family_s3/configs/minio.xml create mode 100644 tests/integration/test_log_family_s3/configs/storage_configuration.xml diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index 2434c6004ad7..942ef7bc8688 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -54,7 +54,8 @@ static struct InitFiu PAUSEABLE_ONCE(finish_set_quorum_failed_parts) \ PAUSEABLE_ONCE(finish_clean_quorum_failed_parts) \ PAUSEABLE(dummy_pausable_failpoint) \ - ONCE(execute_query_calling_empty_set_result_func_on_exception) + ONCE(execute_query_calling_empty_set_result_func_on_exception)\ + REGULAR(stripe_log_sink_write_fallpoint) namespace FailPoints { diff --git a/src/Disks/DiskEncryptedTransaction.h b/src/Disks/DiskEncryptedTransaction.h index 6cb2941cc112..0d440b8eafda 100644 --- a/src/Disks/DiskEncryptedTransaction.h +++ b/src/Disks/DiskEncryptedTransaction.h @@ -244,6 +244,13 @@ class DiskEncryptedTransaction : public IDiskTransaction return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings); } + /// Truncate file to the target size. + void truncateFile(const std::string & src_path, size_t target_size) override + { + auto wrapped_path = wrappedPath(src_path); + delegate_transaction->truncateFile(wrapped_path, target_size); + } + private: diff --git a/src/Disks/FakeDiskTransaction.h b/src/Disks/FakeDiskTransaction.h index f83642eee568..65a42481e70f 100644 --- a/src/Disks/FakeDiskTransaction.h +++ b/src/Disks/FakeDiskTransaction.h @@ -2,10 +2,16 @@ #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + /// Fake disk transaction implementation. /// Just execute all operations immediately, commit is noop operation. /// No support for atomicity and rollback. @@ -134,6 +140,11 @@ struct FakeDiskTransaction final : public IDiskTransaction disk.createHardLink(src_path, dst_path); } + void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented"); + } + private: IDisk & disk; }; diff --git a/src/Disks/IDiskTransaction.h b/src/Disks/IDiskTransaction.h index 7df1b71eb2b0..1cc043da9c2f 100644 --- a/src/Disks/IDiskTransaction.h +++ b/src/Disks/IDiskTransaction.h @@ -123,6 +123,9 @@ struct IDiskTransaction : private boost::noncopyable /// Create hardlink from `src_path` to `dst_path`. virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0; + + /// Truncate file to the target size. + virtual void truncateFile(const std::string & src_path, size_t target_size) = 0; }; using DiskTransactionPtr = std::shared_ptr; diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index c43845116ddd..1543910066e2 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -132,6 +132,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat transaction->commit(); } +void DiskObjectStorage::truncateFile(const String & path, size_t size) +{ + LOG_TEST(log, "Truncate file operation {} to size : {}", path, size); + auto transaction = createObjectStorageTransaction(); + transaction->truncateFile(path, size); + transaction->commit(); +} + void DiskObjectStorage::copyFile( /// NOLINT const String & from_file_path, IDisk & to_disk, diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index 88c5e3203b86..b1b44bccb093 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -84,6 +84,8 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper; void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override; + void truncateFile(const String & path, size_t size) override; + MetadataStoragePtr getMetadataStorage() override { return metadata_storage; } UInt32 getRefCount(const String & path) const override; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp index 19b8b51384fb..44854633d65e 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp @@ -15,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_FORMAT; + extern const int LOGICAL_ERROR; } void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf) @@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size) keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}}); } +ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject() +{ + if (keys_with_meta.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path); + + ObjectKeyWithMetadata object = std::move(keys_with_meta.back()); + keys_with_meta.pop_back(); + total_size -= object.metadata.size_bytes; + + return object; +} + bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting() { #ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h index 729d93af10d3..4f45f5b7ddf3 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h @@ -52,6 +52,7 @@ struct DiskObjectStorageMetadata void addObject(ObjectStorageKey key, size_t size); + ObjectKeyWithMetadata popLastObject(); void deserialize(ReadBuffer & buf); void deserializeFromString(const std::string & data); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp index d25add625e89..1df0cd92b32f 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp @@ -559,6 +559,51 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation } }; +struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation +{ + std::string path; + size_t size; + + TruncateFileOperationOutcomePtr truncate_outcome; + + TruncateFileObjectStorageOperation( + IObjectStorage & object_storage_, + IMetadataStorage & metadata_storage_, + const std::string & path_, + size_t size_) + : IDiskObjectStorageOperation(object_storage_, metadata_storage_) + , path(path_) + , size(size_) + {} + + std::string getInfoForLog() const override + { + return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size); + } + + void execute(MetadataTransactionPtr tx) override + { + if (metadata_storage.exists(path)) + { + if (!metadata_storage.isFile(path)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path); + + truncate_outcome = tx->truncateFile(path,size); + } + } + + void undo() override + { + + } + + void finalize() override + { + if (!truncate_outcome->objects_to_remove.empty()) + object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove); + } +}; + } void DiskObjectStorageTransaction::createDirectory(const std::string & path) @@ -598,6 +643,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri })); } +void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size) +{ + operations_to_execute.emplace_back( + std::make_unique(object_storage, metadata_storage, path, size) + ); +} + void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path) { auto operation = std::make_unique(object_storage, metadata_storage, from_path, to_path); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h index 67044751b840..23f66990d540 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h @@ -92,6 +92,8 @@ struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_share void createFile(const String & path) override; + void truncateFile(const String & path, size_t size) override; + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override; /// writeFile is a difficult function for transactions. diff --git a/src/Disks/ObjectStorages/IMetadataStorage.h b/src/Disks/ObjectStorages/IMetadataStorage.h index f95db2e1eee2..feb8707e71bc 100644 --- a/src/Disks/ObjectStorages/IMetadataStorage.h +++ b/src/Disks/ObjectStorages/IMetadataStorage.h @@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome UInt32 num_hardlinks = std::numeric_limits::max(); }; +struct TruncateFileOperationOutcome +{ + StoredObjects objects_to_remove; +}; + + using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr; +using TruncateFileOperationOutcomePtr = std::shared_ptr; + /// Tries to provide some "transactions" interface, which allow /// to execute (commit) operations simultaneously. We don't provide @@ -143,6 +151,12 @@ class IMetadataTransaction : private boost::noncopyable return nullptr; } + virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */) + { + throwNotImplemented(); + return nullptr; + } + virtual ~IMetadataTransaction() = default; private: diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp index 9b9c4eb388cb..a6570e58d8a6 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp @@ -339,4 +339,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink return result; } +TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size) +{ + auto operation = std::make_unique(path, target_size, metadata_storage, *metadata_storage.getDisk()); + auto result = operation->outcome; + addOperation(std::move(operation)); + return result; +} + } diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h index 7059d8e9a6a8..046de76e7628 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -135,7 +135,7 @@ class MetadataStorageFromDiskTransaction final : public IMetadataTransaction UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override; - + TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override; }; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp index 1357acdfc66d..f2d7a1fe9dda 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp @@ -4,9 +4,12 @@ #include #include #include +#include +#include #include #include #include +#include namespace fs = std::filesystem; @@ -14,6 +17,11 @@ namespace fs = std::filesystem; namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static std::string getTempFileName(const std::string & dir) { return fs::path(dir) / getRandomASCIIString(32); @@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo() outcome->num_hardlinks++; } +void TruncateMetadataFileOperation::execute(std::unique_lock & metadata_lock) +{ + if (metadata_storage.exists(path)) + { + auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); + while (metadata->getTotalSizeBytes() > target_size) + { + auto object_key_with_metadata = metadata->popLastObject(); + outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes); + } + if (metadata->getTotalSizeBytes() != target_size) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size); + } + LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size()); + + write_operation = std::make_unique(path, disk, metadata->serializeToString()); + + write_operation->execute(metadata_lock); + } +} + +void TruncateMetadataFileOperation::undo() +{ + if (write_operation) + write_operation->undo(); +} + + void SetReadonlyFileOperation::execute(std::unique_lock & metadata_lock) { auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h index e8fda177b950..c1ad2882d193 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h @@ -292,4 +292,34 @@ struct SetReadonlyFileOperation final : public IMetadataOperation std::unique_ptr write_operation; }; +struct TruncateMetadataFileOperation final : public IMetadataOperation +{ + const TruncateFileOperationOutcomePtr outcome = std::make_shared(); + + TruncateMetadataFileOperation( + const std::string & path_, + size_t target_size_, + const MetadataStorageFromDisk & metadata_storage_, + IDisk & disk_) + : path(path_) + , target_size(target_size_) + , metadata_storage(metadata_storage_) + , disk(disk_) + { + } + + void execute(std::unique_lock & metadata_lock) override; + + void undo() override; + +private: + std::string path; + size_t target_size; + + const MetadataStorageFromDisk & metadata_storage; + IDisk & disk; + + std::unique_ptr write_operation; +}; + } diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index f47eeb609181..4f5e7ba3f4d3 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -53,8 +54,13 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; extern const int CANNOT_RESTORE_TABLE; extern const int NOT_IMPLEMENTED; + extern const int FAULT_INJECTED; } +namespace FailPoints +{ + extern const char stripe_log_sink_write_fallpoint[]; +} /// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading, /// because we read ranges of data that do not change. @@ -234,6 +240,11 @@ class StripeLogSink final : public SinkToStorage /// Save the new indices. storage.saveIndices(lock); + // While executing save file sizes the exception might occurs. S3::TooManyRequests for example. + fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint, + { + throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table"); + }); /// Save the new file sizes. storage.saveFileSizes(lock); diff --git a/tests/integration/test_log_family_s3/configs/minio.xml b/tests/integration/test_log_family_s3/configs/minio.xml deleted file mode 100644 index 58771d6b2842..000000000000 --- a/tests/integration/test_log_family_s3/configs/minio.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - s3 - http://minio1:9001/root/data/ - minio - minio123 - - - - diff --git a/tests/integration/test_log_family_s3/configs/storage_configuration.xml b/tests/integration/test_log_family_s3/configs/storage_configuration.xml new file mode 100644 index 000000000000..d479a59b1977 --- /dev/null +++ b/tests/integration/test_log_family_s3/configs/storage_configuration.xml @@ -0,0 +1,34 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + true + + 1 + 0 + 1 + 20000 + + + + + +
+ s3_no_retries +
+
+
+
+
+
diff --git a/tests/integration/test_log_family_s3/test.py b/tests/integration/test_log_family_s3/test.py index bed379d098bd..ed84bdf48e60 100644 --- a/tests/integration/test_log_family_s3/test.py +++ b/tests/integration/test_log_family_s3/test.py @@ -11,7 +11,7 @@ def cluster(): cluster = ClickHouseCluster(__file__) cluster.add_instance( "node", - main_configs=["configs/minio.xml", "configs/ssl.xml"], + main_configs=["configs/storage_configuration.xml", "configs/ssl.xml"], with_minio=True, ) logging.info("Starting cluster...") @@ -84,3 +84,39 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i assert_objects_count(cluster, 0) finally: node.query("DROP TABLE s3_test") + + +# Imitate case when error occurs while inserting into table. +# For examle S3::TooManyRequests. +# In that case we can update data file, but not the size file. +# So due to exception we should do truncate of the data file to undo the insert query. +# See FileChecker::repair(). +def test_stripe_log_truncate(cluster): + node = cluster.instances["node"] + + node.query( + """ + CREATE TABLE stripe_table ( + a int + ) ENGINE = StripeLog() + SETTINGS storage_policy='s3_no_retries' + """ + ) + + node.query("SYSTEM ENABLE FAILPOINT stripe_log_sink_write_fallpoint") + node.query( + """ + INSERT INTO stripe_table SELECT number FROM numbers(10) + """, + ignore_error=True, + ) + node.query("SYSTEM DISABLE FAILPOINT stripe_log_sink_write_fallpoint") + node.query("SELECT count(*) FROM stripe_table") == "0\n" + node.query("INSERT INTO stripe_table SELECT number FROM numbers(10)") + node.query("SELECT count(*) FROM stripe_table") == "10\n" + + # Make sure that everything is okey with the table after restart. + node.query("DETACH TABLE stripe_table") + node.query("ATTACH TABLE stripe_table") + + assert node.query("DROP TABLE stripe_table") == ""