Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: memory record batch reader #157

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ jobs:

- name: Build c++
run: cd cpp && make

- name: Run cpp tests
run: cd cpp/build/Release/test && ./milvus_test

- name: Run tests
run: cd go && make && make test
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ typedef void* CStatus;
typedef void* CRecordBatch;
typedef void* CFileSystem;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);
/**
* Open a memory record batch reader.
*/
int OpenMemRecordBatchReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
struct ArrowArrayStream* out);

#ifdef __cplusplus
}
Expand Down
88 changes: 88 additions & 0 deletions cpp/include/milvus-storage/packed/async_mem_record_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>
#include <arrow/api.h>
#include "common/config.h"
#include <arrow/filesystem/filesystem.h>
#include <parquet/arrow/reader.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include "packed/mem_record_reader.h"

namespace milvus_storage {

/**
* @brief AsyncMemRecordBatchReader class encapsulates logic for reading row groups in parallel
*/
class AsyncMemRecordBatchReader {
public:
/**
* @brief Constructor for AsyncMemRecordBatchReader.
*
* @param fs The Arrow file system instance.
* @param path The file path to be read.
* @param schema The schema of the data.
* @param total_buffer_size The total buffer size for reading.
*/
AsyncMemRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
int64_t total_buffer_size);

/**
* @brief Executes the parallel reading of row groups.
*
* @return arrow::Status indicating success or failure of the operation.
*/
arrow::Status Execute();

/**
* @brief Access the readers after execution.
*
* @return A reference to the vector of AsyncMemRecordBatchReader instances.
*/
const std::vector<std::shared_ptr<MemRecordBatchReader>> Readers();

/**
* @brief Access the results after execution.
*
* @return A vecc to the vector of RecordBatch instances.
*/
const std::shared_ptr<arrow::Table> Table();

private:
arrow::fs::FileSystem& fs_;
std::string path_;
std::shared_ptr<arrow::Schema> schema_;
size_t total_row_groups_;
int64_t total_buffer_size_;
int64_t cpu_thread_pool_size_;
std::vector<std::shared_ptr<MemRecordBatchReader>> readers_;
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> results_;

/**
* @brief Splits row groups into multiple batches.
*
* @param batch_size Number of row groups per batch.
* @return A vector of batches, where each batch is a pair (row_group_offset, num_row_groups).
*/
std::vector<std::pair<size_t, size_t>> CreateBatches(size_t batch_size) const;
};

} // namespace milvus_storage
91 changes: 91 additions & 0 deletions cpp/include/milvus-storage/packed/mem_record_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>
#include <arrow/api.h>
#include "common/config.h"
#include <arrow/filesystem/filesystem.h>
#include <parquet/arrow/reader.h>
#include <arrow/status.h>
#include <arrow/table.h>

namespace milvus_storage {

/**
* @brief A record batch reader for reading a single Parquet file with memory constraints.
*/
class MemRecordBatchReader : public arrow::RecordBatchReader {
public:
/**
* @brief MemRecordBatchReader reads num of row groups starting from row_group_offset with memory constraints.
*
* @param fs The Arrow filesystem interface.
* @param path Path to the Parquet file.
* @param schema Expected schema of the Parquet file.
* @param buffer_size Memory limit for reading row groups.
* @param row_group_offset The starting row group index to read.
* @param row_group_num The number of row groups to read.
*/
MemRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE,
const size_t row_group_offset = 0,
const size_t row_group_num = std::numeric_limits<size_t>::max());

/**
* @brief Returns the schema of the Parquet file.
*
* @return A shared pointer to the Arrow schema.
*/
std::shared_ptr<arrow::Schema> schema() const;

/**
* @brief Reads the next record batch from the file.
*
* @param out A shared pointer to the output record batch.
* @return Arrow Status indicating success or failure.
*/
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out);

/**
* @brief Closes the reader and releases resources.
*
* @return Arrow Status indicating success or failure.
*/
arrow::Status Close();

private:
std::shared_ptr<arrow::Schema> schema_;
std::unique_ptr<parquet::arrow::FileReader> file_reader_;
size_t current_row_group_ = 0;
size_t read_count_ = 0;

int64_t buffer_size_;
std::vector<size_t> row_group_sizes_;
size_t row_group_offset_;

void Initialize(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size,
const size_t row_group_offset,
const size_t row_group_num);
};

} // namespace milvus_storage
33 changes: 27 additions & 6 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,44 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
// Test only
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

/**
* @brief PackedRecordBatchReader is responsible for reading and deserializing data from multiple Parquet files
* into arrow RecordBatch under the given memory limit.
*
* @param fs The Arrow filesystem interface.
* @param path Parquet file paths to read.
* @param schema Expected arrow schema reading from the Parquet files.
* @param column_offsets tTe list of original column index and its path index.
* @param needed_columns The set of columns needed to be read.
* @param buffer_size Memory limit for reading.
*/
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

/**
* @brief Returns the schema of the RecordBatch being read.
*
* @return A shared pointer to the Arrow schema.
*/
std::shared_ptr<arrow::Schema> schema() const override;

/**
* @brief Reads the next record batch from the file.
*
* @param out A shared pointer to the output record batch.
* @return Arrow Status indicating success or failure.
*/
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

/**
* @brief Closes the reader and releases resources.
*
* @return Arrow Status indicating success or failure.
*/
arrow::Status Close() override;

private:
Expand Down
15 changes: 9 additions & 6 deletions cpp/src/packed/reader_c.cpp → cpp/src/c/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "packed/reader_c.h"
#include "common/log.h"
#include "packed/reader.h"
#include "c/reader_c.h"
#include "packed/mem_record_reader.h"
#include "filesystem/fs.h"
#include "common/config.h"
#include "common/log.h"

#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/status.h>
#include <iostream>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
int OpenMemRecordBatchReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
Expand All @@ -36,11 +39,11 @@ int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto reader = std::make_shared<milvus_storage::MemRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
LOG_STORAGE_ERROR_ << "Error exporting file record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
Expand Down
Loading
Loading