Skip to content

Commit

Permalink
fix disposable destruction (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Aug 21, 2023
1 parent 6eecb02 commit b88b54b
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 25 deletions.
9 changes: 0 additions & 9 deletions src/rpp/rpp/operators/details/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,4 @@ class pointer_under_lock
T* m_ptr;
std::scoped_lock<std::mutex> m_lock;
};

template<typename T>
struct with_auto_dispose : public T
{
~with_auto_dispose() override
{
T::dispose();
}
};
} // namespace rpp::operators::details
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/group_by.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct group_by_observer_strategy
RPP_NO_UNIQUE_ADDRESS KeyComparator comparator;

mutable std::map<TKey, subjects::publish_subject<Type>, KeyComparator> key_to_subject{};
std::shared_ptr<details::with_auto_dispose<refcount_disposable>> disposable = std::make_shared<details::with_auto_dispose<refcount_disposable>>();
std::shared_ptr<refcount_disposable> disposable = std::make_shared<refcount_disposable>();

void on_subscribe(rpp::constraint::observer auto& obs) const
{
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver>
class merge_disposable final : public with_auto_dispose<composite_disposable>
class merge_disposable final : public composite_disposable
{
public:
merge_disposable(TObserver&& observer) : m_observer(std::move(observer)) {}
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/with_latest_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace rpp::operators::details
{
template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... RestArgs>
class with_latest_from_disposable final : public with_auto_dispose<composite_disposable>
class with_latest_from_disposable final : public composite_disposable
{
public:
explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector)
Expand Down
9 changes: 5 additions & 4 deletions src/tests/rpp/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,10 @@ TEST_CASE("merge doesn't produce extra copies")
TEST_CASE("merge disposes original disposable on disposing")
{
auto observable_disposable = std::make_shared<rpp::composite_disposable>();
auto observable = observable_with_disposable<int>(observable_disposable);

test_operator_with_disposable<int>(rpp::ops::merge_with(observable));
{
auto observable = observable_with_disposable<int>(observable_disposable);

CHECK(observable_disposable->is_disposed());
test_operator_with_disposable<int>(rpp::ops::merge_with(observable));
}
CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1);
}
9 changes: 5 additions & 4 deletions src/tests/rpp/test_with_lastest_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ TEST_CASE("with_latest_from handles race condition", "[with_latest_from]")
TEST_CASE("with_latest_from disposes original disposable on disposing")
{
auto observable_disposable = std::make_shared<rpp::composite_disposable>();
auto observable = observable_with_disposable<int>(observable_disposable);

test_operator_with_disposable<int>(rpp::ops::with_latest_from(observable));
{
auto observable = observable_with_disposable<int>(observable_disposable);

test_operator_with_disposable<int>(rpp::ops::with_latest_from(observable));
}

CHECK(observable_disposable->is_disposed());
CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1);
}
12 changes: 7 additions & 5 deletions src/tests/utils/disposable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ template<typename T>
void test_operator_over_observable_with_disposable(auto&& op)
{
auto observable_disposable = std::make_shared<rpp::composite_disposable>();
auto observable = observable_with_disposable<T>(observable_disposable);
{
auto observable = observable_with_disposable<T>(observable_disposable);

auto observer_disposable = std::make_shared<rpp::composite_disposable>();
op(observable) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [](const auto&){});
auto observer_disposable = std::make_shared<rpp::composite_disposable>();
op(observable) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [](const auto&){});

observer_disposable->dispose();
CHECK(observable_disposable->is_disposed());
observer_disposable->dispose();
}
CHECK(observable_disposable->is_disposed() || observable_disposable.use_count() == 1);
}

template<typename T>
Expand Down

1 comment on commit b88b54b

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 722.29 ns 1.60 ns 1.60 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 720.32 ns 1.20 ns 1.20 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1868.89 ns 0.40 ns 0.40 ns 1.00
from array of 1 - create + subscribe + current_thread 2741.02 ns 37.13 ns 39.21 ns 0.95
concat_as_source of just(1 immediate) create + subscribe 5688.20 ns 0.40 ns 0.40 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 1523.16 ns 0.40 ns 0.40 ns 1.00
create+filter(true)+subscribe 872.30 ns 0.40 ns 0.40 ns 1.00
create(1,2)+skip(1)+subscribe 1243.19 ns 0.40 ns 0.40 ns 1.00
create(1,1,2)+distinct_until_changed()+subscribe 874.13 ns 0.40 ns 0.40 ns 1.00
create(1,2)+first()+subscribe 1796.37 ns 0.40 ns 0.40 ns 1.00
create(1,2)+last()+subscribe 1107.89 ns 0.40 ns 0.40 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 763.36 ns 0.81 ns 0.60 ns 1.34
current_thread scheduler create worker + schedule 955.07 ns 8.42 ns 6.82 ns 1.24
current_thread scheduler create worker + schedule + recursive schedule 2071.77 ns 87.03 ns 88.66 ns 0.98

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 891.80 ns 0.40 ns 0.40 ns 1.00
create+scan(10, std::plus)+subscribe 1044.24 ns 0.40 ns 0.40 ns 1.00
create+flat_map(just(v*2))+subscribe 3170.37 ns 141.16 ns 151.82 ns 0.93

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 925.08 ns - - 0.00
create+take_while(true)+subscribe 878.11 ns 0.40 ns 0.40 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 3129.38 ns 29.81 ns 30.06 ns 0.99

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 4283.24 ns 167.53 ns 181.23 ns 0.92
create(1) + merge_with(create(2)) + subscribe 6848.63 ns 168.01 ns 180.07 ns 0.93
create(1) + with_latest_from(create(2)) + subscribe - 131.97 ns 141.19 ns 0.93

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 113.67 ns 75.25 ns 74.72 ns 1.01

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 3975.46 ns 108.94 ns 103.33 ns 1.05

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1088.01 ns 0.56 ns 1.13 ns 0.49
Subscribe empty callbacks to empty observable via pipe operator 1117.23 ns 0.56 ns 1.12 ns 0.50

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2441.15 ns 0.43 ns 0.44 ns 0.99
from array of 1 - create + subscribe + current_thread 3062.44 ns 98.86 ns 114.46 ns 0.86
concat_as_source of just(1 immediate) create + subscribe 7010.73 ns 0.28 ns 0.29 ns 0.95

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 2006.96 ns 4.17 ns 3.91 ns 1.07
create+filter(true)+subscribe 1130.70 ns 2.51 ns 2.78 ns 0.90
create(1,2)+skip(1)+subscribe 1863.54 ns 5.57 ns 5.46 ns 1.02
create(1,1,2)+distinct_until_changed()+subscribe 1176.61 ns 8.39 ns 8.39 ns 1.00
create(1,2)+first()+subscribe 2341.83 ns 5.87 ns 5.67 ns 1.04
create(1,2)+last()+subscribe 1468.05 ns 8.14 ns 7.69 ns 1.06

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 997.77 ns 0.56 ns 1.14 ns 0.49
current_thread scheduler create worker + schedule 1309.95 ns 9.46 ns 9.46 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 2367.85 ns 160.92 ns 177.99 ns 0.90

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 1138.08 ns 2.51 ns 2.78 ns 0.90
create+scan(10, std::plus)+subscribe 1354.43 ns 5.01 ns 4.47 ns 1.12
create+flat_map(just(v*2))+subscribe 3435.87 ns 300.43 ns 308.17 ns 0.97

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 1176.19 ns 3.35 ns 3.34 ns 1.00
create+take_while(true)+subscribe 1126.35 ns 2.79 ns 2.50 ns 1.11

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 3684.78 ns 90.56 ns 87.25 ns 1.04

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 4209.69 ns 358.79 ns 355.31 ns 1.01
create(1) + merge_with(create(2)) + subscribe 6894.72 ns 350.02 ns 353.93 ns 0.99
create(1) + with_latest_from(create(2)) + subscribe - 411.13 ns 428.62 ns 0.96

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 98.76 ns 89.55 ns 93.79 ns 0.95

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 4438.26 ns 230.83 ns 231.18 ns 1.00

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 342.21 ns 1.20 ns 1.11 ns 1.09
Subscribe empty callbacks to empty observable via pipe operator 342.37 ns 1.20 ns 1.14 ns 1.06

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 734.50 ns 0.41 ns 0.37 ns 1.09
from array of 1 - create + subscribe + current_thread 1048.30 ns 24.64 ns 31.33 ns 0.79
concat_as_source of just(1 immediate) create + subscribe 2491.02 ns 0.40 ns 0.37 ns 1.07

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 642.50 ns 0.40 ns 0.41 ns 0.97
create+filter(true)+subscribe 366.36 ns 0.40 ns 0.40 ns 1.01
create(1,2)+skip(1)+subscribe 566.32 ns 0.40 ns 0.36 ns 1.10
create(1,1,2)+distinct_until_changed()+subscribe 378.29 ns 0.40 ns 0.69 ns 0.58
create(1,2)+first()+subscribe 782.25 ns 0.40 ns 0.38 ns 1.04
create(1,2)+last()+subscribe 491.23 ns 0.40 ns 0.39 ns 1.04

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 271.54 ns 1.20 ns 1.08 ns 1.11
current_thread scheduler create worker + schedule 410.00 ns 5.83 ns 6.71 ns 0.87
current_thread scheduler create worker + schedule + recursive schedule 839.40 ns 67.15 ns 77.44 ns 0.87

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 368.01 ns 0.40 ns 0.40 ns 1.00
create+scan(10, std::plus)+subscribe 458.43 ns 0.80 ns 0.82 ns 0.98
create+flat_map(just(v*2))+subscribe 1615.30 ns 84.10 ns 116.34 ns 0.72

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 394.16 ns - - 0.00
create+take_while(true)+subscribe 368.93 ns 0.40 ns 0.39 ns 1.02

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 1743.87 ns 17.73 ns 22.32 ns 0.79

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 2099.40 ns 103.80 ns 148.43 ns 0.70
create(1) + merge_with(create(2)) + subscribe 3341.60 ns 89.98 ns 134.99 ns 0.67
create(1) + with_latest_from(create(2)) + subscribe - 87.26 ns 122.35 ns 0.71

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 38.96 ns 32.73 ns 32.76 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1621.81 ns 91.26 ns 111.06 ns 0.82

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 2407.67 ns 2.35 ns 2.20 ns 1.07
Subscribe empty callbacks to empty observable via pipe operator 2897.57 ns 2.39 ns 2.41 ns 0.99

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 4130.25 ns 10.83 ns 10.14 ns 1.07
from array of 1 - create + subscribe + current_thread 5241.63 ns 146.26 ns 151.58 ns 0.96
concat_as_source of just(1 immediate) create + subscribe 11085.58 ns 16.78 ns 16.05 ns 1.05

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 3099.73 ns 10.82 ns 11.44 ns 0.95
create+filter(true)+subscribe 1703.04 ns 9.82 ns 10.40 ns 0.94
create(1,2)+skip(1)+subscribe 2654.87 ns 13.39 ns 11.92 ns 1.12
create(1,1,2)+distinct_until_changed()+subscribe 2326.90 ns 20.36 ns 22.00 ns 0.93
create(1,2)+first()+subscribe 3618.55 ns 13.41 ns 14.45 ns 0.93
create(1,2)+last()+subscribe 2248.67 ns 16.83 ns 16.53 ns 1.02

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1439.89 ns 6.68 ns 6.54 ns 1.02
current_thread scheduler create worker + schedule 1767.88 ns 13.35 ns 14.17 ns 0.94
current_thread scheduler create worker + schedule + recursive schedule 3015.00 ns 215.08 ns 175.52 ns 1.23

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 1718.26 ns 9.19 ns 9.57 ns 0.96
create+scan(10, std::plus)+subscribe 2018.70 ns 11.61 ns 12.08 ns 0.96
create+flat_map(just(v*2))+subscribe 5129.17 ns 379.58 ns 400.90 ns 0.95

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 1919.69 ns 7.87 ns 8.82 ns 0.89
create+take_while(true)+subscribe 1727.23 ns 9.42 ns 8.55 ns 1.10

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 5654.63 ns 121.74 ns 122.25 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 6565.16 ns 424.80 ns 446.89 ns 0.95
create(1) + merge_with(create(2)) + subscribe 10285.29 ns 418.09 ns 445.97 ns 0.94
create(1) + with_latest_from(create(2)) + subscribe - 450.60 ns 480.47 ns 0.94

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 138.80 ns 107.71 ns 105.18 ns 1.02

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 7181.51 ns 379.49 ns 370.26 ns 1.02

Please sign in to comment.