Skip to content

Commit

Permalink
modify some other strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino committed Oct 14, 2024
1 parent 51d39eb commit 906250a
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 11 deletions.
11 changes: 11 additions & 0 deletions src/rpp/rpp/observers/details/disposable_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,15 @@ namespace rpp::details::observers

static void dispose() {}
};

struct locally_disposable_strategy
{
static void add(const rpp::disposable_wrapper&) {}

bool is_disposed() const noexcept { return state; }

void dispose() const { state = true; }

mutable bool state{};
};
} // namespace rpp::details::observers
5 changes: 5 additions & 0 deletions src/rpp/rpp/observers/details/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ namespace rpp::details::observers
*/
struct none_disposable_strategy;

/**
* @brief Just bool over is_disposed/dispose logic with no any add logic
*/
struct locally_disposable_strategy;

/**
* @brief Dynamic disposable logic based on pre-allocated vector
*/
Expand Down
3 changes: 3 additions & 0 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy_base
{

concat_observer_strategy_base(std::shared_ptr<concat_state_t<TObservable, TObserver>> state, rpp::composite_disposable_wrapper refcounted)
: state{std::move(state)}
, refcounted{std::move(refcounted)}
Expand Down Expand Up @@ -138,6 +139,8 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_inner_observer_strategy : public concat_observer_strategy_base<TObservable, TObserver>
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

using base = concat_observer_strategy_base<TObservable, TObserver>;

using base::concat_observer_strategy_base;
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace rpp::operators::details
template<rpp::constraint::observer Observer, typename Worker, bool ClearOnError>
struct delay_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<delay_state<Observer, Worker>> state{};

Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace rpp::operators::details
template<typename TState>
struct combining_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<TState> state{};

void set_upstream(const rpp::disposable_wrapper& d) const
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace rpp::operators::details
template<rpp::constraint::observer TObserver>
struct merge_observer_base_strategy
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

merge_observer_base_strategy(std::shared_ptr<merge_state<TObserver>>&& state)
: m_state{std::move(state)}
{
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/operators/retry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ namespace rpp::operators::details
template<rpp::constraint::observer TObserver, typename TObservable>
struct retry_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -57,7 +56,6 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
if (state->count == 0)
{
state->observer.on_error(err);
Expand All @@ -74,7 +72,6 @@ namespace rpp::operators::details

void on_completed() const
{
locally_disposed = true;
state->observer.on_completed();
}

Expand All @@ -83,7 +80,7 @@ namespace rpp::operators::details
state->disposable.add(d);
}

bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); }
bool is_disposed() const { return state->disposable.is_disposed(); }
};

template<rpp::constraint::observer TObserver, typename TObservable>
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/with_latest_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ namespace rpp::operators::details
template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... RestArgs>
struct with_latest_from_inner_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<with_latest_from_state<Observer, TSelector, RestArgs...>> state{};

void set_upstream(const rpp::disposable_wrapper& d) const
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ namespace rpp::details
template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
struct concat_source_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -65,17 +64,15 @@ namespace rpp::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
state->observer.on_error(err);
}

void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); }

bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); }
bool is_disposed() const { return state->disposable.is_disposed(); }

void on_completed() const
{
locally_disposed = true;
state->disposable.clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
Expand Down
1 change: 1 addition & 0 deletions src/tests/rpp/test_delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ TEST_CASE("delay is not disposing early")
d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d.value());
obs.on_completed();
CHECK(obs.is_disposed());
})
| rpp::ops::delay(std::chrono::seconds{1}, scheduler)
| rpp::ops::subscribe(mock);
Expand Down
1 change: 1 addition & 0 deletions src/tests/utils/disposable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ void test_operator_over_observable_with_disposable(auto&& op)
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
obs.on_completed();
CHECK(obs.is_disposed());
CHECK(d.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
}
Expand Down

0 comments on commit 906250a

Please sign in to comment.