Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] Inner join running out of memory #636

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 114 additions & 136 deletions src/runtime/local/kernels/InnerJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,86 +19,117 @@
// Helper functions
// ****************************************************************************

/**
* Add the values from the frame to the final result.
* Iterate trough all the columns of the frame and add each row
* that is present in the hit vector.
*
* @param from the source frame
* @param hit the hit vector for current frame
* @param col_idx_res the current colum in the result frame
*
* @result result the out frame
*/
template<typename VTCol>
void innerJoinSetValue(
DenseMatrix<VTCol> * res,
const DenseMatrix<VTCol> * arg,
const int64_t targetRow,
const int64_t fromRow,
DCTX(ctx)
void innerJoinSet(
Frame * result,
const Frame * from,
const std::vector<int> *hit,
int64_t * col_idx_res

){
const VTCol argValue = arg->get(fromRow, 0);
res->set(targetRow, 0, argValue);
auto numCols = from->getNumCols();

for (size_t idx_c = 0; idx_c < numCols; idx_c++) {

auto fromCol = from->getColumn<VTCol>(idx_c);
auto resCol = result->getColumn<VTCol>(*col_idx_res);
int resRowIdx = 0;
for (auto rowIdx : *hit) {
auto value = fromCol->get(rowIdx, 0);
resCol->set(resRowIdx, 0, value);
resRowIdx++;
}

(*col_idx_res)++;
}
}

/**
* Determine the intersection of the two frames. Create one vector for
* each frame containing the row index that should be added in the final
* result.
*
* @param lhs left frame
* @param rhs right frame
* @param lhsOn label of the left frame
* @param rhsOn label of the right frame
*
* @result lHit vector for the left frame
* @result rHit vector for the right frame
*/
template<typename VTCol>
void innerJoinSet(
ValueTypeCode vtcType,
Frame *&res,
const Frame * arg,
const int64_t toRow,
const int64_t toCol,
const int64_t fromRow,
const int64_t fromCol,
DCTX(ctx)
void innerJoinIntersection(
std::vector<int> * lHit,
std::vector<int> * rHit,
const Frame * lhs,
const Frame * rhs,
const char * lhsOn,
const char * rhsOn
) {
if(vtcType == ValueTypeUtils::codeFor<VTCol>){
innerJoinSetValue<VTCol>(
res->getColumn<VTCol>(toCol),
arg->getColumn<VTCol>(fromCol),
toRow,
fromRow,
ctx
);

auto const lCol = lhs->getColumn<VTCol>(lhsOn);
auto const rCol = rhs->getColumn<VTCol>(rhsOn);

auto const numRowLhs = lhs->getNumRows();
auto const numRowRhs = rhs->getNumRows();

for (size_t row_idx_l = 0; row_idx_l < numRowLhs; row_idx_l++) {
for (size_t row_idx_r = 0; row_idx_r < numRowRhs; row_idx_r++) {
if (lCol->get(row_idx_l, 0) == rCol->get(row_idx_r, 0)) {
lHit->push_back(row_idx_l);
rHit->push_back(row_idx_r);
}
}
}
}

template<typename VTLhs, typename VTRhs>
bool innerJoinEqual(
// results
Frame *& res,
// arguments
const DenseMatrix<VTLhs> * argLhs,
const DenseMatrix<VTRhs> * argRhs,
const int64_t targetLhs,
const int64_t targetRhs,
// context
DCTX(ctx)
){
const VTLhs l = argLhs->get(targetLhs, 0);
const VTRhs r = argRhs->get(targetRhs, 0);
return l == r;
}
/**
* Helper function to combine the innerJoinIntersection and
* innerJoinSet to perform the full inner join.
*
* @param lhs the left frame
* @param rhs the right frame
* @param lhsOn label of the left frame
* @param rhsOn label of the right frame
*
* @result res the final frame
* @result row_idx_res number of rows of the final frame
*/
template<typename VTCol>
void computeInnerJoin(
Frame * res,
const Frame * lhs,
const Frame * rhs,
const char * lhsOn,
const char * rhsOn,
int64_t *row_idx_res
) {

std::vector<int> lHit;
std::vector<int> rHit;

int64_t col_idx_res = 0;

innerJoinIntersection<VTCol>(&lHit, &rHit, lhs, rhs, lhsOn, rhsOn);
innerJoinSet<VTCol>(res, lhs, &lHit, &col_idx_res);
innerJoinSet<VTCol>(res, rhs, &rHit, &col_idx_res);

*(row_idx_res) = lHit.size();

template<typename VTLhs, typename VTRhs>
bool innerJoinProbeIf(
// value type known only at run-time
ValueTypeCode vtcLhs,
ValueTypeCode vtcRhs,
// results
Frame *& res,
// input frames
const Frame * lhs, const Frame * rhs,
// input column names
const char * lhsOn, const char * rhsOn,
// input rows
const int64_t targetL, const int64_t targetR,
// context
DCTX(ctx)
){
if(vtcLhs == ValueTypeUtils::codeFor<VTLhs> && vtcRhs == ValueTypeUtils::codeFor<VTRhs>) {
return innerJoinEqual<VTLhs, VTRhs>(
res,
lhs->getColumn<VTLhs>(lhsOn),
rhs->getColumn<VTRhs>(rhsOn),
targetL,
targetR,
ctx
);
}
return false;
}


// ****************************************************************************
// Convenience function
// ****************************************************************************
Expand All @@ -120,7 +151,7 @@ void innerJoin(
// Perhaps check if res already allocated.
const size_t numRowRhs = rhs->getNumRows();
const size_t numRowLhs = lhs->getNumRows();
const size_t totalRows = numRowRhs * numRowLhs;
const size_t totalRows = (numRowRhs < numRowLhs) ? numRowLhs : numRowRhs;
const size_t numColRhs = rhs->getNumCols();
const size_t numColLhs = lhs->getNumCols();
const size_t totalCols = numColRhs + numColLhs;
Expand Down Expand Up @@ -148,77 +179,24 @@ void innerJoin(
// Creating Result Frame
res = DataObjectFactory::create<Frame>(totalRows, totalCols, schema, newlabels, false);

for(size_t row_idx_l = 0; row_idx_l < numRowLhs; row_idx_l++){
for(size_t row_idx_r = 0; row_idx_r < numRowRhs; row_idx_r++){
col_idx_res = 0;
//PROBE ROWS
bool hit = false;
hit = hit || innerJoinProbeIf<int64_t, int64_t>(
vtcLhsOn, vtcRhsOn,
res,
lhs, rhs,
lhsOn, rhsOn,
row_idx_l, row_idx_r,
ctx);
hit = hit || innerJoinProbeIf<double, double>(
vtcLhsOn, vtcRhsOn,
res,
lhs, rhs,
lhsOn, rhsOn,
row_idx_l, row_idx_r,
ctx);
if(hit){
for(size_t idx_c = 0; idx_c < numColLhs; idx_c++){
innerJoinSet<int64_t>(
schema[col_idx_res],
res,
lhs,
row_idx_res,
col_idx_res,
row_idx_l,
idx_c,
ctx
);
innerJoinSet<double>(
schema[col_idx_res],
res,
lhs,
row_idx_res,
col_idx_res,
row_idx_l,
idx_c,
ctx
);
col_idx_res++;

if (vtcLhsOn == vtcRhsOn) {
col_idx_res = 0;
switch (vtcLhsOn) {
case ValueTypeCode::SI64: {
computeInnerJoin<int64_t>(res, lhs, rhs, lhsOn, rhsOn, &row_idx_res);
break;
}
for(size_t idx_c = 0; idx_c < numColRhs; idx_c++){
innerJoinSet<int64_t>(
schema[col_idx_res],
res,
rhs,
row_idx_res,
col_idx_res,
row_idx_r,
idx_c,
ctx
);

innerJoinSet<double>(
schema[col_idx_res],
res,
rhs,
row_idx_res,
col_idx_res,
row_idx_r,
idx_c,
ctx
);
col_idx_res++;
case ValueTypeCode::F64: {
computeInnerJoin<double>(res, lhs, rhs, lhsOn, rhsOn, &row_idx_res);
break;
}
default: {
break;
}
row_idx_res++;
}
}
}

res->shrinkNumRows(row_idx_res);
}
#endif //SRC_RUNTIME_LOCAL_KERNELS_INNERJOIN_H
Loading