Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BulkLoad/Dump implementation #11842

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion documentation/sphinx/source/bulkdump.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ManagementAPI provides following interfaces to do the operations:
1. Submit a job: submitBulkDumpJob(BulkDumpState job); // For generating the input job metadata, see the point 4.
2. Clear a job: clearBulkDumpJob();
3. Enable the feature: setBulkDumpMode(int mode); // Set mode = 1 to enable; Set mode = 0 to disable.
4. BulkDump job metadata is generated by newBulkDumpTaskLocalSST(KeyRange range, std::string remoteRoot); // Will include more APIs to generate the metadata as the funcationality expands (sp of functionality).
4. BulkDump job metadata is generated by newBulkDumpJobLocalSST(KeyRange range, std::string remoteRoot); // Will include more APIs to generate the metadata as the funcationality expands (sp of functionality).

Mechanisms
==========
Expand Down
4 changes: 2 additions & 2 deletions fdbcli/BulkDumpCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ ACTOR Future<UID> bulkDumpCommandActor(Reference<IClusterConnectionRecord> clust
}
std::string remoteRoot = tokens[4].toString();
KeyRange range = Standalone(KeyRangeRef(rangeBegin, rangeEnd));
bulkDumpJob = newBulkDumpTaskLocalSST(range, remoteRoot);
bulkDumpJob = newBulkDumpJobLocalSST(range, remoteRoot);
wait(submitBulkDumpJob(cx, bulkDumpJob));
return bulkDumpJob.getJobId();

Expand All @@ -127,7 +127,7 @@ ACTOR Future<UID> bulkDumpCommandActor(Reference<IClusterConnectionRecord> clust
}
std::string remoteRoot = tokens[4].toString();
KeyRange range = Standalone(KeyRangeRef(rangeBegin, rangeEnd));
bulkDumpJob = newBulkDumpTaskBlobstoreSST(range, remoteRoot);
bulkDumpJob = newBulkDumpJobBlobstoreSST(range, remoteRoot);
wait(submitBulkDumpJob(cx, bulkDumpJob));
return bulkDumpJob.getJobId();

Expand Down
54 changes: 31 additions & 23 deletions fdbcli/BulkLoadCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,29 @@

namespace fdb_cli {

ACTOR Future<Void> getBulkLoadStateByRange(Database cx,
KeyRange rangeToRead,
size_t countLimit,
Optional<BulkLoadPhase> phase) {
ACTOR Future<Void> getBulkLoadTaskStateByRange(Database cx,
KeyRange rangeToRead,
size_t countLimit,
Optional<BulkLoadPhase> phase) {
try {
std::vector<BulkLoadState> res = wait(getValidBulkLoadTasksWithinRange(cx, rangeToRead, countLimit, phase));
std::vector<BulkLoadTaskState> res = wait(getValidBulkLoadTasksWithinRange(cx, rangeToRead, countLimit, phase));
int64_t finishCount = 0;
int64_t unfinishedCount = 0;
for (const auto& bulkLoadState : res) {
if (bulkLoadState.phase == BulkLoadPhase::Complete) {
fmt::println("[Complete]: {}", bulkLoadState.toString());
for (const auto& bulkLoadTaskState : res) {
if (bulkLoadTaskState.phase == BulkLoadPhase::Complete) {
fmt::println("[Complete]: {}", bulkLoadTaskState.toString());
++finishCount;
} else if (bulkLoadState.phase == BulkLoadPhase::Running) {
fmt::println("[Running]: {}", bulkLoadState.toString());
} else if (bulkLoadTaskState.phase == BulkLoadPhase::Running) {
fmt::println("[Running]: {}", bulkLoadTaskState.toString());
++unfinishedCount;
} else if (bulkLoadState.phase == BulkLoadPhase::Triggered) {
fmt::println("[Triggered]: {}", bulkLoadState.toString());
} else if (bulkLoadTaskState.phase == BulkLoadPhase::Triggered) {
fmt::println("[Triggered]: {}", bulkLoadTaskState.toString());
++unfinishedCount;
} else if (bulkLoadState.phase == BulkLoadPhase::Submitted) {
fmt::println("[Submitted] {}", bulkLoadState.toString());
} else if (bulkLoadTaskState.phase == BulkLoadPhase::Submitted) {
fmt::println("[Submitted] {}", bulkLoadTaskState.toString());
++unfinishedCount;
} else if (bulkLoadState.phase == BulkLoadPhase::Acknowledged) {
fmt::println("[Acknowledge] {}", bulkLoadState.toString());
} else if (bulkLoadTaskState.phase == BulkLoadPhase::Acknowledged) {
fmt::println("[Acknowledge] {}", bulkLoadTaskState.toString());
++finishCount;
} else {
UNREACHABLE();
Expand Down Expand Up @@ -92,7 +92,6 @@ ACTOR Future<UID> bulkLoadCommandActor(Reference<IClusterConnectionRecord> clust
printUsage(tokens[0]);
return UID();
}

} else if (tokencmp(tokens[1], "acknowledge")) {
// Acknowledge any completed bulk loading task and clear the corresponding metadata
if (tokens.size() != 5) {
Expand All @@ -111,7 +110,9 @@ ACTOR Future<UID> bulkLoadCommandActor(Reference<IClusterConnectionRecord> clust
return taskId;

} else if (tokencmp(tokens[1], "local")) {
// Generate spec of bulk loading local files and submit the bulk loading task
// Generate spec of bulk loading local files and submit the bulk loading task.
// This is used for testing of bulkload task engine.
// Therefore, some information of manifest is ignored.
if (tokens.size() < 7) {
printUsage(tokens[0]);
return UID();
Expand All @@ -125,16 +126,23 @@ ACTOR Future<UID> bulkLoadCommandActor(Reference<IClusterConnectionRecord> clust
}
std::string folder = tokens[4].toString();
std::string dataFile = tokens[5].toString();
std::string byteSampleFile = tokens[6].toString(); // TODO(BulkLoad): reject if the input bytes sampling file is
// not same as the configuration as FDB cluster
std::string byteSampleFile = tokens[6].toString();
KeyRange range = Standalone(KeyRangeRef(rangeBegin, rangeEnd));
state BulkLoadState bulkLoadTask = newBulkLoadTaskLocalSST(range, folder, dataFile, byteSampleFile);
BulkLoadFileSet fileSet =
BulkLoadFileSet(folder, "", generateEmptyManifestFileName(), dataFile, byteSampleFile);
state BulkLoadTaskState bulkLoadTask =
newBulkLoadTaskLocalSST(deterministicRandom()->randomUniqueID(),
range,
fileSet,
BulkLoadByteSampleSetting(0, "hashlittle2", 0, 0, 0), // We fake it here
/*snapshotVersion=*/invalidVersion,
/*checksum=*/"",
/*bytes=*/-1);
wait(submitBulkLoadTask(cx, bulkLoadTask));
return bulkLoadTask.getTaskId();

} else if (tokencmp(tokens[1], "status")) {
// Get progress of existing bulk loading tasks intersecting the input range
// TODO(BulkLoad): check status by ID
if (tokens.size() < 6) {
printUsage(tokens[0]);
return UID();
Expand Down Expand Up @@ -166,7 +174,7 @@ ACTOR Future<UID> bulkLoadCommandActor(Reference<IClusterConnectionRecord> clust
return UID();
}
int countLimit = std::stoi(tokens[5].toString());
wait(getBulkLoadStateByRange(cx, range, countLimit, phase));
wait(getBulkLoadTaskStateByRange(cx, range, countLimit, phase));
return UID();

} else {
Expand Down
10 changes: 4 additions & 6 deletions fdbclient/BulkDumping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@

#include "fdbclient/BulkDumping.h"

BulkDumpState newBulkDumpTaskLocalSST(const KeyRange& range, const std::string& remoteRoot) {
return BulkDumpState(
range, BulkDumpFileType::SST, BulkDumpTransportMethod::CP, BulkDumpExportMethod::File, remoteRoot);
BulkDumpState newBulkDumpJobLocalSST(const KeyRange& range, const std::string& remoteRoot) {
return BulkDumpState(range, BulkLoadType::SST, BulkLoadTransportMethod::CP, remoteRoot);
}

BulkDumpState newBulkDumpTaskBlobstoreSST(const KeyRange& range, const std::string& remoteRoot) {
return BulkDumpState(
range, BulkDumpFileType::SST, BulkDumpTransportMethod::BLOBSTORE, BulkDumpExportMethod::File, remoteRoot);
BulkDumpState newBulkDumpJobBlobstoreSST(const KeyRange& range, const std::string& remoteRoot) {
return BulkDumpState(range, BulkLoadType::SST, BulkLoadTransportMethod::BLOBSTORE, remoteRoot);
}
76 changes: 63 additions & 13 deletions fdbclient/BulkLoading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,67 @@

#include "fdbclient/BulkLoading.h"

BulkLoadState newBulkLoadTaskLocalSST(KeyRange range,
std::string folder,
std::string dataFile,
std::string bytesSampleFile) {
std::unordered_set<std::string> dataFiles;
dataFiles.insert(dataFile);
return BulkLoadState(range,
BulkLoadType::SST,
BulkLoadTransportMethod::CP,
BulkLoadInjectMethod::File,
folder,
dataFiles,
bytesSampleFile);
const int bulkLoadJobManifestFileFormatVersion = 1;

std::string generateBulkLoadJobManifestFileName() {
return "job-manifest.txt";
}

std::string generateRandomBulkLoadDataFileName() {
return deterministicRandom()->randomUniqueID().toString() + "-data.sst";
}

std::string generateRandomBulkLoadBytesSampleFileName() {
return deterministicRandom()->randomUniqueID().toString() + "-bytesample.sst";
}

std::string generateEmptyManifestFileName() {
return "manifest-empty.sst";
}

// Generate the bulkload job manifest file. Here is an example:
// Row 0: [FormatVersion]: 1, [ManifestCount]: 3, [RootPath]: "/tmp";
// Row 1: "", "01", 100, 9000, "range1", "manifest1.txt"
// Row 2: "01", "02 ff", 200, 0, "range2", "manifest2.txt"
// Row 3: "02 ff", "ff", 300, 8100, "range3", "manifest3.txt"
// In this example, the job manifest file is in the format of version 1.
// The file contains three ranges: "" ~ "\x01", "\x01" ~ "\x02\xff", and "\x02\xff" ~ "\xff".
// For the first range, the data version is at 100, the data size is 9KB, the manifest file path is
// "/tmp/range1/manifest1.txt". For the second range, the data version is at 200, the data size is 0 indicating this is
// an empty range. The manifest file path is "/tmp/range2/manifest2.txt". For the third range, the data version is at
// 300, the data size is 8.1KB, the manifest file path is "/tmp/range1/manifest3.txt".
std::string generateBulkLoadJobManifestFileContent(const std::map<Key, BulkLoadManifest>& manifests) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put an example of the content generated as a comment on this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion! Thanks!

std::string root = "";
std::string content;
for (const auto& [beginKey, manifest] : manifests) {
if (root.empty()) {
root = manifest.getRootPath();
} else {
ASSERT(manifest.getRootPath() == root);
}
content = content + manifest.generateEntryInJobManifest() + "\n";
}
std::string head = "[FormatVersion]: " + std::to_string(bulkLoadJobManifestFileFormatVersion) +
", [ManifestCount]: " + std::to_string(manifests.size()) + ", [RootPath]: " + root + "\n";
return head + content;
}

// For submitting a task manually (for testing)
BulkLoadTaskState newBulkLoadTaskLocalSST(const UID& jobId,
const KeyRange& range,
const BulkLoadFileSet& fileSet,
const BulkLoadByteSampleSetting& byteSampleSetting,
Version snapshotVersion,
const std::string& checksum,
int64_t bytes) {
BulkLoadManifest manifest(fileSet,
range.begin,
range.end,
snapshotVersion,
checksum,
bytes,
byteSampleSetting,
BulkLoadType::SST,
BulkLoadTransportMethod::CP);
return BulkLoadTaskState(jobId, manifest);
}
Loading