From 0a0e7e83430677cbb383809dc1746075bfa735a4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 20 Aug 2023 15:43:50 +0300 Subject: [PATCH] Add with_lastest_from (#408) --- src/benchmarks/benchmarks.cpp | 23 +- src/examples/rpp/doxygen/with_latest_from.cpp | 33 +++ src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/details/utils.hpp | 47 ++++ src/rpp/rpp/operators/fwd.hpp | 10 +- src/rpp/rpp/operators/merge.hpp | 26 +- src/rpp/rpp/operators/with_latest_from.hpp | 234 ++++++++++++++++++ src/rpp/rpp/utils/functors.hpp | 7 + src/rpp/rpp/utils/tuple.hpp | 28 ++- src/tests/rpp/test_with_lastest_from.cpp | 182 ++++++++++++++ 10 files changed, 570 insertions(+), 21 deletions(-) create mode 100644 src/examples/rpp/doxygen/with_latest_from.cpp create mode 100644 src/rpp/rpp/operators/details/utils.hpp create mode 100644 src/rpp/rpp/operators/with_latest_from.hpp create mode 100644 src/tests/rpp/test_with_lastest_from.cpp diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index 59b4bc8a4..4313f888a 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -14,11 +14,11 @@ #define BENCHMARK(NAME) bench.context("benchmark_title", NAME); #define SECTION(NAME) bench.context("benchmark_name", NAME); -#define TEST_RPP(ACTION) bench.context("source", "rpp").run(ACTION); +#define TEST_RPP(...) bench.context("source", "rpp").run(__VA_ARGS__) #ifdef RPP_BUILD_RXCPP - #define TEST_RXCPP(ACTION) bench.context("source", "rxcpp").run(ACTION); + #define TEST_RXCPP(...) bench.context("source", "rxcpp").run(__VA_ARGS__) #else - #define TEST_RXCPP(ACTION) + #define TEST_RXCPP(...) #endif char const* json() noexcept { @@ -227,6 +227,23 @@ int main(int argc, char* argv[]) // NOLINT | rxcpp::operators::subscribe([](int v){ ankerl::nanobench::doNotOptimizeAway(v); }); }); } + SECTION("create(1) + with_latest_from(create(2)) + subscribe") + { + TEST_RPP([&]() + { + rpp::source::create([](const auto& obs){ obs.on_next(1); }) + | rpp::operators::with_latest_from(rpp::source::create([](const auto& obs){ obs.on_next(2); })) + | rpp::operators::subscribe([](const std::tuple& v){ ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + // doesn't work due to tuple issues in rxcpp =C + // TEST_RXCPP([&]() + // { + // rxcpp::observable<>::create([](const auto& obs){obs.on_next(1);}) + // | rxcpp::operators::with_latest_from(rpp::utils::pack_to_tuple{}, rxcpp::observable<>::create([](const auto& obs){obs.on_next(2);})) + // | rxcpp::operators::subscribe>([](const std::tuple& v){ ankerl::nanobench::doNotOptimizeAway(v); }); + // }); + } } BENCHMARK("Conditional Operators") diff --git a/src/examples/rpp/doxygen/with_latest_from.cpp b/src/examples/rpp/doxygen/with_latest_from.cpp new file mode 100644 index 000000000..7961a0047 --- /dev/null +++ b/src/examples/rpp/doxygen/with_latest_from.cpp @@ -0,0 +1,33 @@ +#include +#include + +/** + * @example with_latest_from.cpp + **/ +int main() +{ + //! [with_latest_from] + rpp::source::just(1, 2, 3, 4, 5, 6) + | rpp::operators::with_latest_from(rpp::source::just(3, 4, 5)) + | rpp::operators::subscribe([](std::tuple v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; }); + // Output: 1:3 2:4 3:5 4:5 5:5 6:5 + + std::cout << std::endl; + + rpp::source::just(1, 2, 3) + | rpp::operators::with_latest_from(rpp::source::just(3, 4, 5, 6, 7, 8)) + | rpp::operators::subscribe([](std::tuple v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; }); + // Output: 1:3 2:4 3:5 + //! [with_latest_from] + + std::cout << std::endl; + + //! [with_latest_from custom selector] + rpp::source::just(1, 2, 3, 4) + | rpp::operators::with_latest_from([](int left, int right) { return left + right; }, + rpp::source::just(3, 4, 5)) + | rpp::operators::subscribe([](int v) { std::cout << v << " "; }); + // Output: 4 6 8 9 + //! [with_latest_from custom selector] + return 0; +} diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index b5a0327ec..5d959e24d 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -68,6 +68,7 @@ */ #include +#include /** * @defgroup utility_operators Utility Operators diff --git a/src/rpp/rpp/operators/details/utils.hpp b/src/rpp/rpp/operators/details/utils.hpp new file mode 100644 index 000000000..3e6084793 --- /dev/null +++ b/src/rpp/rpp/operators/details/utils.hpp @@ -0,0 +1,47 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +namespace rpp::operators::details +{ +template +struct value_with_mutex +{ + value_with_mutex() = default; + explicit value_with_mutex(const T& v) : value{v} {} + explicit value_with_mutex(T&& v) : value{std::move(v)} {} + + T value{}; + std::mutex mutex{}; +}; + +template +class pointer_under_lock +{ +public: + explicit pointer_under_lock(value_with_mutex& value) + : pointer_under_lock{value.value, value.mutex} + {} + + pointer_under_lock(T& val, std::mutex& mutex) : m_ptr{&val}, m_lock{mutex} {} + + T* operator->() { return m_ptr; } + const T* operator->() const { return m_ptr; } + +private: + T* m_ptr; + std::scoped_lock m_lock; +}; + + +} // namespace rpp::operators::details \ No newline at end of file diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index b11b2e040..ba0aaf290 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -84,7 +84,15 @@ template auto take_while(Fn&& predicate); auto take(size_t count); -} + +template + requires(!utils::is_not_template_callable || + std::invocable, utils::extract_observable_type_t...>) +auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables); + +template +auto with_latest_from(TObservable&& observable, TObservables&&... observables); +} // namespace rpp::operators namespace rpp { diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index 0ac16e1e8..2d6fdabbd 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -16,6 +16,8 @@ #include #include +#include + #include #include #include @@ -29,17 +31,14 @@ class merge_disposable final : public composite_disposable public: merge_disposable(TObserver&& observer) : m_observer(std::move(observer)) {} - std::lock_guard lock_guard() { return std::lock_guard{m_mutex}; } - void increment_on_completed() { m_on_completed_needed.fetch_add(1, std::memory_order_relaxed); } bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order_relaxed) == 1; } - TObserver& get_observer() { return m_observer; } + pointer_under_lock get_observer_under_lock() { return pointer_under_lock{m_observer}; } private: - TObserver m_observer; - std::mutex m_mutex{}; - std::atomic_size_t m_on_completed_needed{1}; + value_with_mutex m_observer{}; + std::atomic_size_t m_on_completed_needed{1}; }; template @@ -60,15 +59,13 @@ struct merge_observer_base_strategy bool is_disposed() const { - return m_disposable->is_disposed() || m_disposable->get_observer().is_disposed(); + return m_disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { m_disposable->dispose(); - - auto lock = m_disposable->lock_guard(); - m_disposable->get_observer().on_error(err); + m_disposable->get_observer_under_lock()->on_error(err); } void on_completed() const @@ -78,9 +75,7 @@ struct merge_observer_base_strategy std::atomic_thread_fence(std::memory_order_acquire); m_disposable->dispose(); - - auto lock = m_disposable->lock_guard(); - m_disposable->get_observer().on_completed(); + m_disposable->get_observer_under_lock()->on_completed(); } } @@ -96,8 +91,7 @@ struct merge_observer_inner_strategy final : public merge_observer_base_strategy template void on_next(T&& v) const { - auto lock = merge_observer_base_strategy::m_disposable->lock_guard(); - merge_observer_base_strategy::m_disposable->get_observer().on_next(std::forward(v)); + merge_observer_base_strategy::m_disposable->get_observer_under_lock()->on_next(std::forward(v)); } }; @@ -108,7 +102,7 @@ class merge_observer_strategy final : public merge_observer_base_strategy{std::make_shared>(std::move(observer))} { - merge_observer_base_strategy::m_disposable->get_observer().set_upstream(disposable_wrapper::from_weak(merge_observer_base_strategy::m_disposable)); + merge_observer_base_strategy::m_disposable->get_observer_under_lock()->set_upstream(disposable_wrapper::from_weak(merge_observer_base_strategy::m_disposable)); } template diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp new file mode 100644 index 000000000..f06776373 --- /dev/null +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -0,0 +1,234 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include +#include + + +namespace rpp::operators::details +{ +template +class with_latest_from_disposable final : public composite_disposable +{ +public: + explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector) + : observer_with_mutex{std::move(observer)}, selector{selector} + { + } + + pointer_under_lock get_observer_under_lock() { return pointer_under_lock{observer_with_mutex}; } + rpp::utils::tuple>...>& get_values() { return values; } + const TSelector& get_selector() const { return selector; } + +private: + value_with_mutex observer_with_mutex{}; + rpp::utils::tuple>...> values{}; + TSelector selector; +}; + +template +struct with_latest_from_inner_observer_strategy +{ + std::shared_ptr> disposable{}; + + void set_upstream(const rpp::disposable_wrapper& d) const + { + disposable->add(d.get_original()); + } + + bool is_disposed() const + { + return disposable->is_disposed(); + } + + template + void on_next(T&& v) const + { + auto& [value, mutex] = disposable->get_values().template get(); + std::scoped_lock lock{mutex}; + value.emplace(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + disposable->dispose(); + disposable->get_observer_under_lock()->on_error(err); + } + + static constexpr rpp::utils::empty_function_t<> on_completed{}; +}; + +template + requires std::invocable...> +class with_latest_from_observer_strategy +{ + using Result = std::invoke_result_t...>; + using Disposable = with_latest_from_disposable...>; +public: + using DisposableStrategyToUseWithThis = rpp::details::none_disposable_strategy; + + template + with_latest_from_observer_strategy(Observer&& observer, const TSelector& selector, const TObservables&... observables, std::index_sequence) + : m_disposable{std::make_shared(std::move(observer), selector)} + { + m_disposable->get_observer_under_lock()->set_upstream(rpp::disposable_wrapper::from_weak(m_disposable)); + (observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{m_disposable}),...); + } + + void set_upstream(const rpp::disposable_wrapper& d) const + { + m_disposable->add(d.get_original()); + } + + bool is_disposed() const + { + return m_disposable->is_disposed(); + } + + template + void on_next(T&& v) const + { + auto result = m_disposable->get_values().apply([&](value_with_mutex>>&... vals) -> std::optional + { + auto lock = std::scoped_lock{vals.mutex...}; + + if ((vals.value.has_value() && ...)) + return m_disposable->get_selector()(rpp::utils::as_const(std::forward(v)), rpp::utils::as_const(vals.value.value())...); + return std::nullopt; + }); + + if (result.has_value()) + m_disposable->get_observer_under_lock()->on_next(std::move(result).value()); + } + + void on_error(const std::exception_ptr& err) const + { + m_disposable->dispose(); + m_disposable->get_observer_under_lock()->on_error(err); + } + + void on_completed() const + { + m_disposable->dispose(); + m_disposable->get_observer_under_lock()->on_completed(); + } + +private: + std::shared_ptr m_disposable{}; +}; + +template +struct with_latest_from_t +{ + RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; + RPP_NO_UNIQUE_ADDRESS TSelector selector; + + template + requires std::invocable...> + using ResultValue = std::invoke_result_t...>; + + template + void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + { + // Need to take ownership over current_thread in case of inner-observables also uses them + auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); + } + +private: + template + static void subscribe_impl(Observer&& observer, const observable_chain_strategy& observable_strategy, const TSelector& selector, const TObservables&... observables) + { + using ExpectedValue = typename observable_chain_strategy::ValueType; + + observable_strategy.subscribe(rpp::observer, TSelector, ExpectedValue, TObservables...>>{std::forward(observer), selector, observables..., std::index_sequence_for{}}); + } +}; +} + +namespace rpp::operators +{ +/** + * @brief Combines latest emissions from observables with emission from current observable when it sends new value via applying selector + * + * @marble with_latest_from_custom_selector + { + source observable : +------1 -2 -3 -| + source other_observable : +-5-6-7- -- 8- -| + operator "with_latest_from: x,y =>std::pair{x,y}" : +------{1,5}-{2,7}-{3,8}-| + } + * + * @details Actually this operator just keeps last values from all other observables and combines them together with each new emission from original observable + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from "others" copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param selector is applied to current emission of current observable and latests emissions from observables + * @param observables are observables whose emissions would be combined when current observable sends new value + * @warning #include + * + * @par Examples + * @snippet with_latest_from.cpp with_latest_from custom selector + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/combinelatest.html + */ +template + requires(!utils::is_not_template_callable || + std::invocable, utils::extract_observable_type_t...>) +auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables) +{ + return details::with_latest_from_t, std::decay_t, std::decay_t...>{rpp::utils::tuple{std::forward(observable), std::forward(observables)...}, std::forward(selector)}; +} + +/** + * @brief Combines latest emissions from observables with emission from current observable when it sends new value via making tuple + * + * @marble with_latest_from + { + source observable : +------1 -2 -3 -| + source other_observable : +-5-6-7- -- 8- -| + operator "with_latest_from: make_tuple" : +------{1,5}-{2,7}-{3,8}-| + } + * + * @warn Selector is just packing values to tuple in this case + * + * @par Performance notes: + * - 1 heap allocation for disposable + * - each value from "others" copied/moved to internal storage + * - mutex acquired every time value obtained + * + * @param observables are observables whose emissions would be combined when current observable sends new value + * @warning #include + * + * @par Examples + * @snippet with_latest_from.cpp with_latest_from + * + * @ingroup combining_operators + * @see https://reactivex.io/documentation/operators/combinelatest.html + */ +template +auto with_latest_from(TObservable&& observable, TObservables&&... observables) +{ + return with_latest_from(rpp::utils::pack_to_tuple{}, std::forward(observable), std::forward(observables)...); +} +} // namespace rpp::operators diff --git a/src/rpp/rpp/utils/functors.hpp b/src/rpp/rpp/utils/functors.hpp index 0bb40f5dc..84b8f60c5 100644 --- a/src/rpp/rpp/utils/functors.hpp +++ b/src/rpp/rpp/utils/functors.hpp @@ -12,6 +12,8 @@ #include +#include + namespace rpp::utils { template struct overloaded : Ts... { using Ts::operator()...; }; @@ -59,4 +61,9 @@ struct less template bool operator()(const T& l, const T& r) const { return l < r; } }; + +struct pack_to_tuple +{ + auto operator()(auto&& ...vals) const { return std::make_tuple(std::forward(vals)...); } +}; } // namespace rpp::utils \ No newline at end of file diff --git a/src/rpp/rpp/utils/tuple.hpp b/src/rpp/rpp/utils/tuple.hpp index 25e448e4d..3da79a242 100644 --- a/src/rpp/rpp/utils/tuple.hpp +++ b/src/rpp/rpp/utils/tuple.hpp @@ -22,6 +22,7 @@ template class tuple_leaf { public: + tuple_leaf() = default; tuple_leaf(const T& value) : m_value{value} {} tuple_leaf(T&& value) : m_value{std::move(value)} {} @@ -29,7 +30,7 @@ class tuple_leaf T& get() { return m_value; } private: - RPP_NO_UNIQUE_ADDRESS T m_value; + RPP_NO_UNIQUE_ADDRESS T m_value{}; }; template @@ -39,6 +40,8 @@ template class tuple_impl, Args...> : private tuple_leaf... { public: + tuple_impl() = default; + template ...TArgs> requires(!rpp::constraint::variadic_decayed_same_as, Args...>, TArgs...>) tuple_impl(TArgs&&...args) @@ -56,6 +59,29 @@ class tuple_impl, Args...> : private tuple_leaf< { return std::forward(callable)(std::forward(args)..., static_cast*>(this)->get()...); } + + template + requires (I < sizeof...(Args)) + const auto& get() const + { + return static_cast>*>(this)->get(); + } + + template + requires (I < sizeof...(Args)) + auto& get() + { + return static_cast>*>(this)->get(); + } + +private: + template + consteval static T type_at_index(const tuple_leaf&); + +public: + template + requires (I < sizeof...(Args)) + using type_at_index_t = decltype(type_at_index(std::declval())); }; } diff --git a/src/tests/rpp/test_with_lastest_from.cpp b/src/tests/rpp/test_with_lastest_from.cpp new file mode 100644 index 000000000..83088cfa6 --- /dev/null +++ b/src/tests/rpp/test_with_lastest_from.cpp @@ -0,0 +1,182 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include + +#include + +#include "mock_observer.hpp" + + +TEST_CASE("with_latest_from combines observables") +{ + auto obs_1 = rpp::source::just(1); + auto obs_2 = rpp::source::just(2.2); + SECTION("subscribe on it via with_latest_from") + { + auto mock = mock_observer_strategy>{}; + obs_1 | rpp::ops::with_latest_from(obs_2) | rpp::ops::subscribe(mock.get_observer()); + SECTION("obtain tuple of values") + { + CHECK(mock.get_received_values() == std::vector{ std::tuple{1, 2.2} }); + CHECK(mock.get_total_on_next_count() == 1); + CHECK(mock.get_on_completed_count() == 1); + } + } + SECTION("subscribe on it via with_latest_from with custom selector") + { + auto mock = mock_observer_strategy{}; + obs_1 | rpp::ops::with_latest_from([](int left, double right) { return left + right; }, obs_2) | rpp::ops::subscribe(mock.get_observer()); + SECTION("obtain values") + { + CHECK(mock.get_received_values() == std::vector{1+2.2}); + CHECK(mock.get_total_on_next_count() == 1); + CHECK(mock.get_on_completed_count() == 1); + } + } +} + +TEST_CASE("with_latest_from reacts only on main root but sends last value from others") +{ + SECTION("subjects and subscribe on it with_latest_from") + { + auto subj_1 = rpp::subjects::publish_subject{}; + auto subj_2 = rpp::subjects::publish_subject{}; + auto mock = mock_observer_strategy>{}; + subj_1.get_observable() | rpp::ops::with_latest_from(subj_2.get_observable()) | rpp::ops::subscribe(mock.get_observer()); + SECTION("send only first subject sends value") + { + subj_1.get_observer().on_next(1); + SECTION("No values to observer") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + SECTION("send only second subject sends value") + { + subj_2.get_observer().on_next(1); + SECTION("No values to observer") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + SECTION("send first subject sends value SECTION combine called with last second value") + { + subj_2.get_observer().on_next(1); + subj_2.get_observer().on_next(2); + subj_2.get_observer().on_next(3); + subj_1.get_observer().on_next(4); + SECTION("obtain last combintaion") + { + CHECK(mock.get_received_values() == std::vector{ std::tuple{4, 3} }); + CHECK(mock.get_total_on_next_count() == 1); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + SECTION("second sends values again") + { + subj_2.get_observer().on_next(5); + SECTION("nothing new happens") + { + CHECK(mock.get_received_values() == std::vector{ std::tuple{4, 3} }); + CHECK(mock.get_total_on_next_count() == 1); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + SECTION("second completes") + { + subj_2.get_observer().on_completed(); + SECTION("nothing happens") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + SECTION("second errors") + { + subj_2.get_observer().on_error({}); + SECTION("error obtained") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + SECTION("first completes") + { + subj_1.get_observer().on_completed(); + SECTION("observer completed") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + SECTION("first errors") + { + subj_1.get_observer().on_error({}); + SECTION("error obtained") + { + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +TEST_CASE("with_latest_from handles race condition", "[with_latest_from]") +{ + SECTION("source observable in current thread pairs with error in other thread") + { + std::atomic_bool on_error_called{false}; + auto subject = rpp::subjects::publish_subject{}; + + SECTION("subscribe on it") + { + SECTION("on_error can't interleave with on_next") + { + std::thread th{}; + auto source = rpp::subjects::publish_subject{}; + + source.get_observable() + | rpp::ops::with_latest_from(subject.get_observable()) + | rpp::ops::subscribe([&](auto&&) + { + CHECK(!on_error_called); + th = std::thread{[&] + { + subject.get_observer().on_error(std::exception_ptr{}); + }}; + std::this_thread::sleep_for(std::chrono::seconds{1}); + CHECK(!on_error_called); + }, + [&](auto) { on_error_called = true; }); + + subject.get_observer().on_next(2); + source.get_observer().on_next(1); + + th.join(); + + CHECK(on_error_called); + } + } + } +} \ No newline at end of file