Skip to content

Commit

Permalink
chore: initiate grow preemptively (#3267)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 4, 2024
1 parent 2bf4451 commit 62f5483
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
ec = op_manager_->Stash(id, value_sv);
} else if (auto bin = bins_->Stash(dbid, key, value_sv); bin) {
id = bin->first;
ec = op_manager_->Stash(bin->first, bin->second);
ec = op_manager_->Stash(id, bin->second);
}

if (ec) {
Expand Down
14 changes: 12 additions & 2 deletions src/server/tiering/disk_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {

// If we've run out of space, block and grow as much as needed
if (offset < 0) {
// TODO: To introduce asynchronous call that starts resizing before we reach this step.
// Right now we do it synchronously as well (see Grow(256MB) call.)
RETURN_ON_ERR(Grow(-offset));

offset = alloc_.Malloc(bytes.size());
Expand All @@ -163,6 +165,11 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
backing_file_->WriteFixedAsync(buf.bytes, offset, *buf.buf_idx, std::move(io_cb));
else
backing_file_->WriteAsync(buf.bytes, offset, std::move(io_cb));
if (alloc_.allocated_bytes() > (size_ * 0.85) && !grow_pending_) {
auto ec = Grow(265_MB);
LOG_IF(ERROR, ec) << "Could not call grow :" << ec.message();
return ec;
}
return {};
}

Expand All @@ -176,9 +183,12 @@ std::error_code DiskStorage::Grow(off_t grow_size) {
if (off_t(alloc_.capacity()) + grow_size > max_size_)
return std::make_error_code(std::errc::no_space_on_device);

if (std::exchange(grow_pending_, true))
if (std::exchange(grow_pending_, true)) {
// TODO: to introduce future like semantics where multiple flow can block on the
// ongoing Grow operation.
LOG(WARNING) << "Concurrent grow request detected ";
return std::make_error_code(std::errc::operation_in_progress);

}
auto err = DoFiberCall(&SubmitEntry::PrepFallocate, backing_file_->fd(), 0, size_, grow_size);
grow_pending_ = false;
RETURN_ON_ERR(err);
Expand Down
1 change: 1 addition & 0 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ std::error_code OpManager::Stash(EntryId id_ref, std::string_view value) {
ProcessStashed(Borrowed(id), version, segment, ec);
};

// May block due to blocking call to Grow.
auto ec = storage_.Stash(buf_view, std::move(io_cb));
if (ec)
pending_stash_ver_.erase(ToOwned(id_ref));
Expand Down

0 comments on commit 62f5483

Please sign in to comment.