Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change output of M2 groupby aggregation from a single double column into a structs column #9899

Draft
wants to merge 1 commit into
base: branch-22.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ std::unique_ptr<Base> make_mean_aggregation();
* deviation across multiple discrete sets. See
* `https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm` for more
* detail.
*
* The results of M2 aggregation are structs whose the last member is the computed M2 values. The
* first two members store the results of `COUNT_VALID` and `MEAN` aggregation that were used to
* compute these M2 values.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_m2_aggregation();
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1138,10 +1138,10 @@ struct target_type_impl<Source, k, std::enable_if_t<is_chrono<Source>() && is_su
using type = Source;
};

// Always use `double` for M2
// Always use struct for M2
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::M2> {
using type = double;
using type = struct_view;
};

// Always use `double` for VARIANCE
Expand Down
12 changes: 10 additions & 2 deletions cpp/src/groupby/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ namespace {
/**
* @brief Factory to construct empty result columns.
*
* Adds special handling for COLLECT_LIST/COLLECT_SET, because:
* Adds special handling for COLLECT_LIST/COLLECT_SET/M2/MERGE_M2, because:
* 1. `make_empty_column()` does not support construction of nested columns.
* 2. Empty lists need empty child columns, to persist type information.
* 2. Empty lists and structs need empty child columns, to persist type information.
*/
struct empty_column_constructor {
column_view values;
Expand All @@ -117,6 +117,14 @@ struct empty_column_constructor {
0, make_empty_column(type_to_id<offset_type>()), empty_like(values), 0, {});
}

if constexpr (k == aggregation::Kind::M2 || k == aggregation::Kind::MERGE_M2) {
std::vector<std::unique_ptr<column>> child_columns;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
I'm wondering whether this can be phrased differently, as you have suggested before:

auto begin = cudf::make_counting_transform_iterator(0, [](auto i){ return make_empty_column(FLOAT64); });
return make_structs_column(0, std::vector<std::unique_ptr<column>>(begin, begin+4), 0, {});

for (size_type i = 0; i < 3; ++i) {
child_columns.emplace_back(make_empty_column(type_id::FLOAT64));
}
return make_structs_column(0, std::move(child_columns), 0, {});
}

// If `values` is LIST typed, and the aggregation results match the type,
// construct empty results based on `values`.
// Most generally, this applies if input type matches output type.
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,19 @@ void aggregate_result_functor::operator()<aggregation::M2>(aggregation const& ag
{
if (cache.has_result(values, agg)) return;

auto const mean_agg = make_mean_aggregation();
auto const count_agg = make_count_aggregation();
auto const mean_agg = make_mean_aggregation();
operator()<aggregation::COUNT_VALID>(*count_agg);
operator()<aggregation::MEAN>(*mean_agg);
auto const mean_result = cache.get_result(values, *mean_agg);

auto const count_result = cache.get_result(values, *count_agg);
auto const mean_result = cache.get_result(values, *mean_agg);

cache.add_result(
values,
agg,
detail::group_m2(get_grouped_values(), mean_result, helper.group_labels(stream), stream, mr));
detail::group_m2(
get_grouped_values(), count_result, mean_result, helper.group_labels(stream), stream, mr));
};

template <>
Expand Down
75 changes: 46 additions & 29 deletions cpp/src/groupby/sort/group_m2.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cudf/column/column_view.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/dictionary/detail/iterator.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/utilities/span.hpp>
Expand All @@ -35,44 +36,43 @@ namespace groupby {
namespace detail {
namespace {

template <typename ResultType, typename Iterator>
template <typename FloatType, typename Iterator>
struct m2_transform {
column_device_view const d_values;
Iterator const values_iter;
ResultType const* d_means;
Iterator const d_values_iter;
FloatType const* d_means;
size_type const* d_group_labels;

__device__ ResultType operator()(size_type const idx) const noexcept
__device__ FloatType operator()(size_type const idx) const noexcept
{
if (d_values.is_null(idx)) { return 0.0; }

auto const x = static_cast<ResultType>(values_iter[idx]);
auto const x = static_cast<FloatType>(d_values_iter[idx]);
auto const group_idx = d_group_labels[idx];
auto const mean = d_means[group_idx];
auto const diff = x - mean;
return diff * diff;
}
};

template <typename ResultType, typename Iterator>
void compute_m2_fn(column_device_view const& values,
Iterator values_iter,
template <typename FloatType, typename Iterator>
void compute_m2_fn(column_device_view const& d_values,
Iterator d_values_iter,
cudf::device_span<size_type const> group_labels,
ResultType const* d_means,
ResultType* d_result,
FloatType const* d_means,
FloatType* d_m2s,
rmm::cuda_stream_view stream)
{
auto const var_iter = cudf::detail::make_counting_transform_iterator(
size_type{0},
m2_transform<ResultType, decltype(values_iter)>{
values, values_iter, d_means, group_labels.data()});
m2_transform<FloatType, Iterator>{d_values, d_values_iter, d_means, group_labels.data()});

thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.begin(),
group_labels.end(),
var_iter,
thrust::make_discard_iterator(),
d_result);
d_m2s);
}

struct m2_functor {
Expand All @@ -84,34 +84,35 @@ struct m2_functor {
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using result_type = cudf::detail::target_type_t<T, aggregation::Kind::M2>;
auto result = make_numeric_column(data_type(type_to_id<result_type>()),
group_means.size(),
mask_state::UNALLOCATED,
stream,
mr);
// Output double type for m2 values.
using float_type = id_to_type<type_id::FLOAT64>;
CUDF_EXPECTS(group_means.type().id() == type_to_id<float_type>(),
"Input `group_means` column must have double type.");

auto m2s = make_numeric_column(
data_type(type_to_id<float_type>()), group_means.size(), mask_state::UNALLOCATED, stream, mr);

auto const values_dv_ptr = column_device_view::create(values, stream);
auto const d_values = *values_dv_ptr;
auto const d_means = group_means.data<result_type>();
auto const d_result = result->mutable_view().data<result_type>();
auto const d_means = group_means.data<float_type>();
auto const d_m2s = m2s->mutable_view().data<float_type>();

if (!cudf::is_dictionary(values.type())) {
auto const values_iter = d_values.begin<T>();
compute_m2_fn(d_values, values_iter, group_labels, d_means, d_result, stream);
auto const d_values_iter = d_values.template begin<T>();
compute_m2_fn(d_values, d_values_iter, group_labels, d_means, d_m2s, stream);
} else {
auto const values_iter =
auto const d_values_iter =
cudf::dictionary::detail::make_dictionary_iterator<T>(*values_dv_ptr);
compute_m2_fn(d_values, values_iter, group_labels, d_means, d_result, stream);
compute_m2_fn(d_values, d_values_iter, group_labels, d_means, d_m2s, stream);
}

// M2 column values should have the same bitmask as means's.
if (group_means.nullable()) {
result->set_null_mask(cudf::detail::copy_bitmask(group_means, stream, mr),
group_means.null_count());
m2s->set_null_mask(cudf::detail::copy_bitmask(group_means, stream, mr),
group_means.null_count());
}

return result;
return m2s;
}

template <typename T, typename... Args>
Expand All @@ -124,6 +125,7 @@ struct m2_functor {
} // namespace

std::unique_ptr<column> group_m2(column_view const& values,
column_view const& group_counts,
column_view const& group_means,
cudf::device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
Expand All @@ -133,7 +135,22 @@ std::unique_ptr<column> group_m2(column_view const& values,
? dictionary_column_view(values).keys().type()
: values.type();

return type_dispatcher(values_type, m2_functor{}, values, group_means, group_labels, stream, mr);
// Firstly compute m2 values.
auto m2s =
type_dispatcher(values_type, m2_functor{}, values, group_means, group_labels, stream, mr);

// Then build the output structs column having double members (count, mean, m2).
std::vector<std::unique_ptr<column>> output_members;
output_members.emplace_back(cudf::detail::cast(group_counts, group_means.type(), stream, mr));
output_members.emplace_back(std::make_unique<column>(group_means, stream, mr));
output_members.emplace_back(std::move(m2s));

return make_structs_column(group_counts.size(),
std::move(output_members),
0,
rmm::device_buffer{0, stream, mr},
stream,
mr);
}

} // namespace detail
Expand Down
72 changes: 35 additions & 37 deletions cpp/src/groupby/sort/group_merge_m2.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,30 @@ namespace {
/**
* @brief Struct to store partial results for merging.
*/
template <class result_type>
template <class float_type>
struct partial_result {
size_type count;
result_type mean;
result_type M2;
float_type count;
float_type mean;
float_type M2;
};

/**
* @brief Functor to accumulate (merge) all partial results corresponding to the same key into a
* final result storing in a member variable. It performs merging for the partial results of
* `COUNT_VALID`, `MEAN`, and `M2` at the same time.
*/
template <class result_type>
template <class float_type>
struct accumulate_fn {
partial_result<result_type> merge_vals;
partial_result<float_type> merge_vals;

void __device__ operator()(partial_result<result_type> const& partial_vals) noexcept
void __device__ operator()(partial_result<float_type> const& partial_vals) noexcept
{
if (partial_vals.count == 0) { return; }

auto const n_ab = merge_vals.count + partial_vals.count;
auto const delta = partial_vals.mean - merge_vals.mean;
merge_vals.M2 += partial_vals.M2 + (delta * delta) *
static_cast<result_type>(merge_vals.count) *
static_cast<result_type>(partial_vals.count) / n_ab;
merge_vals.M2 +=
partial_vals.M2 + (delta * delta) * merge_vals.count * partial_vals.count / n_ab;
merge_vals.mean =
(merge_vals.mean * merge_vals.count + partial_vals.mean * partial_vals.count) / n_ab;
merge_vals.count = n_ab;
Expand All @@ -71,12 +70,12 @@ struct accumulate_fn {
* @brief Functor to merge partial results of `COUNT_VALID`, `MEAN`, and `M2` aggregations
* for a given group (key) index.
*/
template <class result_type>
template <class float_type>
struct merge_fn {
size_type const* const d_offsets;
size_type const* const d_counts;
result_type const* const d_means;
result_type const* const d_M2s;
float_type const* const d_counts;
float_type const* const d_means;
float_type const* const d_M2s;

auto __device__ operator()(size_type const group_idx) noexcept
{
Expand All @@ -85,7 +84,7 @@ struct merge_fn {
// This case should never happen, because all groups are non-empty as the results of
// aggregation. Here we just to make sure we cover this case.
if (start_idx == end_idx) {
return thrust::make_tuple(size_type{0}, result_type{0}, result_type{0}, int8_t{0});
return thrust::make_tuple(float_type{0}, float_type{0}, float_type{0}, int8_t{0});
}

// If `(n = d_counts[idx]) > 0` then `d_means[idx] != null` and `d_M2s[idx] != null`.
Expand All @@ -96,13 +95,13 @@ struct merge_fn {
auto get_partial_result = [&] __device__(size_type idx) {
{
auto const n = d_counts[idx];
return n > 0 ? partial_result<result_type>{n, d_means[idx], d_M2s[idx]}
: partial_result<result_type>{size_type{0}, result_type{0}, result_type{0}};
return n > 0 ? partial_result<float_type>{n, d_means[idx], d_M2s[idx]}
: partial_result<float_type>{float_type{0}, float_type{0}, float_type{0}};
};
};

// Firstly, store tuple(count, mean, M2) of the first partial result in an accumulator.
auto accumulator = accumulate_fn<result_type>{get_partial_result(start_idx)};
auto accumulator = accumulate_fn<float_type>{get_partial_result(start_idx)};

// Then, accumulate (merge) the remaining partial results into that accumulator.
for (auto idx = start_idx + 1; idx < end_idx; ++idx) {
Expand Down Expand Up @@ -133,42 +132,41 @@ std::unique_ptr<column> group_merge_m2(column_view const& values,
CUDF_EXPECTS(values.num_children() == 3,
"Input to `group_merge_m2` must be a structs column having 3 children columns.");

using result_type = id_to_type<type_id::FLOAT64>;
static_assert(
std::is_same_v<cudf::detail::target_type_t<result_type, aggregation::Kind::M2>, result_type>);
CUDF_EXPECTS(values.child(0).type().id() == type_id::INT32 &&
values.child(1).type().id() == type_to_id<result_type>() &&
values.child(2).type().id() == type_to_id<result_type>(),
"Input to `group_merge_m2` must be a structs column having children columns "
"containing tuples of (M2_value, mean, valid_count).");
// Double type must be used for all components (count, mean, m2).
using float_type = id_to_type<type_id::FLOAT64>;
CUDF_EXPECTS(values.child(0).type().id() == type_to_id<float_type>() &&
values.child(1).type().id() == type_to_id<float_type>() &&
values.child(2).type().id() == type_to_id<float_type>(),
"Input to `group_merge_m2` must be a structs column having children "
"containing tuples of double values (valid_count, mean, m2).");

auto result_counts = make_numeric_column(
data_type(type_to_id<size_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
data_type(type_to_id<float_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
auto result_means = make_numeric_column(
data_type(type_to_id<result_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
data_type(type_to_id<float_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
auto result_M2s = make_numeric_column(
data_type(type_to_id<result_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
data_type(type_to_id<float_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
auto validities = rmm::device_uvector<int8_t>(num_groups, stream);

// Perform merging for all the aggregations. Their output (and their validity data) are written
// out concurrently through an output zip iterator.
using iterator_tuple = thrust::tuple<size_type*, result_type*, result_type*, int8_t*>;
using iterator_tuple = thrust::tuple<float_type*, float_type*, float_type*, int8_t*>;
using output_iterator = thrust::zip_iterator<iterator_tuple>;
auto const out_iter =
output_iterator{thrust::make_tuple(result_counts->mutable_view().template data<size_type>(),
result_means->mutable_view().template data<result_type>(),
result_M2s->mutable_view().template data<result_type>(),
output_iterator{thrust::make_tuple(result_counts->mutable_view().template data<float_type>(),
result_means->mutable_view().template data<float_type>(),
result_M2s->mutable_view().template data<float_type>(),
validities.begin())};

auto const count_valid = values.child(0);
auto const mean_values = values.child(1);
auto const M2_values = values.child(2);
auto const iter = thrust::make_counting_iterator<size_type>(0);

auto const fn = merge_fn<result_type>{group_offsets.begin(),
count_valid.template begin<size_type>(),
mean_values.template begin<result_type>(),
M2_values.template begin<result_type>()};
auto const fn = merge_fn<float_type>{group_offsets.begin(),
count_valid.template begin<float_type>(),
mean_values.template begin<float_type>(),
M2_values.template begin<float_type>()};
thrust::transform(rmm::exec_policy(stream), iter, iter + num_groups, out_iter, fn);

// Generate bitmask for the output.
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/groupby/sort/group_reductions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,14 @@ std::unique_ptr<column> group_count_all(cudf::device_span<size_type const> group
* @endcode
*
* @param values Grouped values to compute M2 values
* @param group_counts Number of non-null values in each group.
* @param group_means Pre-computed groupwise MEAN
* @param group_labels ID of group corresponding value in @p values belongs to
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*/
std::unique_ptr<column> group_m2(column_view const& values,
column_view const& group_counts,
column_view const& group_means,
cudf::device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
Expand Down
Loading