Skip to content

Commit

Permalink
Enable truncate operation for object storage disks.
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed May 13, 2024
1 parent a84d0c1 commit 7d02140
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static struct InitFiu
PAUSEABLE_ONCE(finish_set_quorum_failed_parts) \
PAUSEABLE_ONCE(finish_clean_quorum_failed_parts) \
PAUSEABLE(dummy_pausable_failpoint) \
ONCE(execute_query_calling_empty_set_result_func_on_exception)
ONCE(execute_query_calling_empty_set_result_func_on_exception)\
REGULAR(stripe_log_sink_write_fallpoint)

namespace FailPoints
{
Expand Down
7 changes: 7 additions & 0 deletions src/Disks/DiskEncryptedTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ class DiskEncryptedTransaction : public IDiskTransaction
return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings);
}

/// Truncate file to the target size.
void truncateFile(const std::string & src_path, size_t target_size) override
{
auto wrapped_path = wrappedPath(src_path);
delegate_transaction->truncateFile(wrapped_path, target_size);
}


private:

Expand Down
11 changes: 11 additions & 0 deletions src/Disks/FakeDiskTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

#include <Disks/IDiskTransaction.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Common/Exception.h>

namespace DB
{

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}

/// Fake disk transaction implementation.
/// Just execute all operations immediately, commit is noop operation.
/// No support for atomicity and rollback.
Expand Down Expand Up @@ -134,6 +140,11 @@ struct FakeDiskTransaction final : public IDiskTransaction
disk.createHardLink(src_path, dst_path);
}

void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation is not implemented");
}

private:
IDisk & disk;
};
Expand Down
3 changes: 3 additions & 0 deletions src/Disks/IDiskTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ struct IDiskTransaction : private boost::noncopyable

/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;

/// Truncate file to the target size.
virtual void truncateFile(const std::string & src_path, size_t target_size) = 0;
};

using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;
Expand Down
8 changes: 8 additions & 0 deletions src/Disks/ObjectStorages/DiskObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}

void DiskObjectStorage::truncateFile(const String & path, size_t size)
{
LOG_TEST(log, "Truncate file operation {} to size : {}", path, size);
auto transaction = createObjectStorageTransaction();
transaction->truncateFile(path, size);
transaction->commit();
}

void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/DiskObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;

void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;

void truncateFile(const String & path, size_t size) override;

MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }

UInt32 getRefCount(const String & path) const override;
Expand Down
13 changes: 13 additions & 0 deletions src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
}

void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
Expand Down Expand Up @@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
}

ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()
{
if (keys_with_meta.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path);

ObjectKeyWithMetadata object = std::move(keys_with_meta.back());
keys_with_meta.pop_back();
total_size -= object.metadata.size_bytes;

return object;
}

bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting()
{
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
Expand Down
1 change: 1 addition & 0 deletions src/Disks/ObjectStorages/DiskObjectStorageMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct DiskObjectStorageMetadata

void addObject(ObjectStorageKey key, size_t size);

ObjectKeyWithMetadata popLastObject();

void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);
Expand Down
52 changes: 52 additions & 0 deletions src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,51 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
}
};

struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
size_t size;

TruncateFileOperationOutcomePtr truncate_outcome;

TruncateFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_,
size_t size_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, size(size_)
{}

std::string getInfoForLog() const override
{
return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size);
}

void execute(MetadataTransactionPtr tx) override
{
if (metadata_storage.exists(path))
{
if (!metadata_storage.isFile(path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path);

truncate_outcome = tx->truncateFile(path,size);
}
}

void undo() override
{

}

void finalize() override
{
if (!truncate_outcome->objects_to_remove.empty())
object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove);
}
};

}

void DiskObjectStorageTransaction::createDirectory(const std::string & path)
Expand Down Expand Up @@ -598,6 +643,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
}));
}

void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size)
{
operations_to_execute.emplace_back(
std::make_unique<TruncateFileObjectStorageOperation>(object_storage, metadata_storage, path, size)
);
}

void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path)
{
auto operation = std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path);
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/DiskObjectStorageTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_share

void createFile(const String & path) override;

void truncateFile(const String & path, size_t size) override;

void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;

/// writeFile is a difficult function for transactions.
Expand Down
14 changes: 14 additions & 0 deletions src/Disks/ObjectStorages/IMetadataStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome
UInt32 num_hardlinks = std::numeric_limits<UInt32>::max();
};

struct TruncateFileOperationOutcome
{
StoredObjects objects_to_remove;
};


using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
using TruncateFileOperationOutcomePtr = std::shared_ptr<TruncateFileOperationOutcome>;


/// Tries to provide some "transactions" interface, which allow
/// to execute (commit) operations simultaneously. We don't provide
Expand Down Expand Up @@ -143,6 +151,12 @@ class IMetadataTransaction : private boost::noncopyable
return nullptr;
}

virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */)
{
throwNotImplemented();
return nullptr;
}

virtual ~IMetadataTransaction() = default;

private:
Expand Down
8 changes: 8 additions & 0 deletions src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink
return result;
}

TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size)
{
auto operation = std::make_unique<TruncateMetadataFileOperation>(path, target_size, metadata_storage, *metadata_storage.getDisk());
auto result = operation->outcome;
addOperation(std::move(operation));
return result;
}

}
2 changes: 1 addition & 1 deletion src/Disks/ObjectStorages/MetadataStorageFromDisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class MetadataStorageFromDiskTransaction final : public IMetadataTransaction

UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;


TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override;
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <optional>
#include <ranges>
#include <filesystem>
#include <utility>

namespace fs = std::filesystem;


namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

static std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString(32);
Expand Down Expand Up @@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo()
outcome->num_hardlinks++;
}

void TruncateMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
if (metadata_storage.exists(path))
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
while (metadata->getTotalSizeBytes() > target_size)
{
auto object_key_with_metadata = metadata->popLastObject();
outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes);
}
if (metadata->getTotalSizeBytes() != target_size)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size);
}
LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size());

write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());

write_operation->execute(metadata_lock);
}
}

void TruncateMetadataFileOperation::undo()
{
if (write_operation)
write_operation->undo();
}


void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,34 @@ struct SetReadonlyFileOperation final : public IMetadataOperation
std::unique_ptr<WriteFileOperation> write_operation;
};

struct TruncateMetadataFileOperation final : public IMetadataOperation
{
const TruncateFileOperationOutcomePtr outcome = std::make_shared<TruncateFileOperationOutcome>();

TruncateMetadataFileOperation(
const std::string & path_,
size_t target_size_,
const MetadataStorageFromDisk & metadata_storage_,
IDisk & disk_)
: path(path_)
, target_size(target_size_)
, metadata_storage(metadata_storage_)
, disk(disk_)
{
}

void execute(std::unique_lock<SharedMutex> & metadata_lock) override;

void undo() override;

private:
std::string path;
size_t target_size;

const MetadataStorageFromDisk & metadata_storage;
IDisk & disk;

std::unique_ptr<WriteFileOperation> write_operation;
};

}
11 changes: 11 additions & 0 deletions src/Storages/StorageStripeLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <Common/escapeForFileName.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>

#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h>
Expand Down Expand Up @@ -53,8 +54,13 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int FAULT_INJECTED;
}

namespace FailPoints
{
extern const char stripe_log_sink_write_fallpoint[];
}

/// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading,
/// because we read ranges of data that do not change.
Expand Down Expand Up @@ -234,6 +240,11 @@ class StripeLogSink final : public SinkToStorage
/// Save the new indices.
storage.saveIndices(lock);

// While executing save file sizes the exception might occurs. S3::TooManyRequests for example.
fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint,
{
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table");
});
/// Save the new file sizes.
storage.saveFileSizes(lock);

Expand Down
12 changes: 0 additions & 12 deletions tests/integration/test_log_family_s3/configs/minio.xml

This file was deleted.

Loading

0 comments on commit 7d02140

Please sign in to comment.