Skip to content

Commit

Permalink
Add with_lastest_from (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Aug 20, 2023
1 parent 74e541a commit 0a0e7e8
Show file tree
Hide file tree
Showing 10 changed files with 570 additions and 21 deletions.
23 changes: 20 additions & 3 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

#define BENCHMARK(NAME) bench.context("benchmark_title", NAME);
#define SECTION(NAME) bench.context("benchmark_name", NAME);
#define TEST_RPP(ACTION) bench.context("source", "rpp").run(ACTION);
#define TEST_RPP(...) bench.context("source", "rpp").run(__VA_ARGS__)
#ifdef RPP_BUILD_RXCPP
#define TEST_RXCPP(ACTION) bench.context("source", "rxcpp").run(ACTION);
#define TEST_RXCPP(...) bench.context("source", "rxcpp").run(__VA_ARGS__)
#else
#define TEST_RXCPP(ACTION)
#define TEST_RXCPP(...)
#endif

char const* json() noexcept {
Expand Down Expand Up @@ -227,6 +227,23 @@ int main(int argc, char* argv[]) // NOLINT
| rxcpp::operators::subscribe<int>([](int v){ ankerl::nanobench::doNotOptimizeAway(v); });
});
}
SECTION("create(1) + with_latest_from(create(2)) + subscribe")
{
TEST_RPP([&]()
{
rpp::source::create<int>([](const auto& obs){ obs.on_next(1); })
| rpp::operators::with_latest_from(rpp::source::create<int>([](const auto& obs){ obs.on_next(2); }))
| rpp::operators::subscribe([](const std::tuple<int,int>& v){ ankerl::nanobench::doNotOptimizeAway(v); });
});

// doesn't work due to tuple issues in rxcpp =C
// TEST_RXCPP([&]()
// {
// rxcpp::observable<>::create<int>([](const auto& obs){obs.on_next(1);})
// | rxcpp::operators::with_latest_from(rpp::utils::pack_to_tuple{}, rxcpp::observable<>::create<int>([](const auto& obs){obs.on_next(2);}))
// | rxcpp::operators::subscribe<std::tuple<int,int>>([](const std::tuple<int,int>& v){ ankerl::nanobench::doNotOptimizeAway(v); });
// });
}
}

BENCHMARK("Conditional Operators")
Expand Down
33 changes: 33 additions & 0 deletions src/examples/rpp/doxygen/with_latest_from.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <rpp/rpp.hpp>
#include <iostream>

/**
* @example with_latest_from.cpp
**/
int main()
{
//! [with_latest_from]
rpp::source::just(1, 2, 3, 4, 5, 6)
| rpp::operators::with_latest_from(rpp::source::just(3, 4, 5))
| rpp::operators::subscribe([](std::tuple<int,int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:3 2:4 3:5 4:5 5:5 6:5

std::cout << std::endl;

rpp::source::just(1, 2, 3)
| rpp::operators::with_latest_from(rpp::source::just(3, 4, 5, 6, 7, 8))
| rpp::operators::subscribe([](std::tuple<int,int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:3 2:4 3:5
//! [with_latest_from]

std::cout << std::endl;

//! [with_latest_from custom selector]
rpp::source::just(1, 2, 3, 4)
| rpp::operators::with_latest_from([](int left, int right) { return left + right; },
rpp::source::just(3, 4, 5))
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 4 6 8 9
//! [with_latest_from custom selector]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
*/

#include <rpp/operators/merge.hpp>
#include <rpp/operators/with_latest_from.hpp>

/**
* @defgroup utility_operators Utility Operators
Expand Down
47 changes: 47 additions & 0 deletions src/rpp/rpp/operators/details/utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <mutex>

namespace rpp::operators::details
{
template<typename T>
struct value_with_mutex
{
value_with_mutex() = default;
explicit value_with_mutex(const T& v) : value{v} {}
explicit value_with_mutex(T&& v) : value{std::move(v)} {}

T value{};
std::mutex mutex{};
};

template<typename T>
class pointer_under_lock
{
public:
explicit pointer_under_lock(value_with_mutex<T>& value)
: pointer_under_lock{value.value, value.mutex}
{}

pointer_under_lock(T& val, std::mutex& mutex) : m_ptr{&val}, m_lock{mutex} {}

T* operator->() { return m_ptr; }
const T* operator->() const { return m_ptr; }

private:
T* m_ptr;
std::scoped_lock<std::mutex> m_lock;
};


} // namespace rpp::operators::details
10 changes: 9 additions & 1 deletion src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@ template<typename Fn>
auto take_while(Fn&& predicate);

auto take(size_t count);
}

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires(!utils::is_not_template_callable<TSelector> ||
std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>)
auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables);

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto with_latest_from(TObservable&& observable, TObservables&&... observables);
} // namespace rpp::operators

namespace rpp
{
Expand Down
26 changes: 10 additions & 16 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/operators/details/utils.hpp>

#include <atomic>
#include <cstddef>
#include <mutex>
Expand All @@ -29,17 +31,14 @@ class merge_disposable final : public composite_disposable
public:
merge_disposable(TObserver&& observer) : m_observer(std::move(observer)) {}

std::lock_guard<std::mutex> lock_guard() { return std::lock_guard<std::mutex>{m_mutex}; }

void increment_on_completed() { m_on_completed_needed.fetch_add(1, std::memory_order_relaxed); }
bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order_relaxed) == 1; }

TObserver& get_observer() { return m_observer; }
pointer_under_lock<TObserver> get_observer_under_lock() { return pointer_under_lock{m_observer}; }

private:
TObserver m_observer;
std::mutex m_mutex{};
std::atomic_size_t m_on_completed_needed{1};
value_with_mutex<TObserver> m_observer{};
std::atomic_size_t m_on_completed_needed{1};
};

template<rpp::constraint::observer TObserver>
Expand All @@ -60,15 +59,13 @@ struct merge_observer_base_strategy

bool is_disposed() const
{
return m_disposable->is_disposed() || m_disposable->get_observer().is_disposed();
return m_disposable->is_disposed();
}

void on_error(const std::exception_ptr& err) const
{
m_disposable->dispose();

auto lock = m_disposable->lock_guard();
m_disposable->get_observer().on_error(err);
m_disposable->get_observer_under_lock()->on_error(err);
}

void on_completed() const
Expand All @@ -78,9 +75,7 @@ struct merge_observer_base_strategy
std::atomic_thread_fence(std::memory_order_acquire);

m_disposable->dispose();

auto lock = m_disposable->lock_guard();
m_disposable->get_observer().on_completed();
m_disposable->get_observer_under_lock()->on_completed();
}
}

Expand All @@ -96,8 +91,7 @@ struct merge_observer_inner_strategy final : public merge_observer_base_strategy
template<typename T>
void on_next(T&& v) const
{
auto lock = merge_observer_base_strategy<TObserver>::m_disposable->lock_guard();
merge_observer_base_strategy<TObserver>::m_disposable->get_observer().on_next(std::forward<T>(v));
merge_observer_base_strategy<TObserver>::m_disposable->get_observer_under_lock()->on_next(std::forward<T>(v));
}
};

Expand All @@ -108,7 +102,7 @@ class merge_observer_strategy final : public merge_observer_base_strategy<TObser
explicit merge_observer_strategy(TObserver&& observer)
: merge_observer_base_strategy<TObserver>{std::make_shared<merge_disposable<TObserver>>(std::move(observer))}
{
merge_observer_base_strategy<TObserver>::m_disposable->get_observer().set_upstream(disposable_wrapper::from_weak(merge_observer_base_strategy<TObserver>::m_disposable));
merge_observer_base_strategy<TObserver>::m_disposable->get_observer_under_lock()->set_upstream(disposable_wrapper::from_weak(merge_observer_base_strategy<TObserver>::m_disposable));
}

template<typename T>
Expand Down
Loading

1 comment on commit 0a0e7e8

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 680.90 ns 1.67 ns 1.61 ns 1.04
Subscribe empty callbacks to empty observable via pipe operator 683.75 ns 2.01 ns 1.21 ns 1.66

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1755.90 ns 0.33 ns 0.40 ns 0.83
from array of 1 - create + subscribe + current_thread 2566.76 ns 43.90 ns 38.30 ns 1.15
concat_as_source of just(1 immediate) create + subscribe 5414.00 ns 0.33 ns 0.42 ns 0.79

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 1384.93 ns 0.33 ns 0.40 ns 0.83
create+filter(true)+subscribe 828.67 ns 0.33 ns 0.42 ns 0.80
create(1,2)+skip(1)+subscribe 1162.80 ns 0.33 ns 0.40 ns 0.83
create(1,1,2)+distinct_until_changed()+subscribe 832.16 ns 0.33 ns 0.41 ns 0.83
create(1,2)+first()+subscribe 1686.51 ns 0.33 ns 0.42 ns 0.79
create(1,2)+last()+subscribe 1059.06 ns 0.33 ns 0.41 ns 0.81

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 699.19 ns 1.00 ns 0.83 ns 1.21
current_thread scheduler create worker + schedule 867.41 ns 8.68 ns 7.61 ns 1.14
current_thread scheduler create worker + schedule + recursive schedule 1911.72 ns 89.75 ns 87.55 ns 1.03

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 819.51 ns 0.33 ns 0.40 ns 0.83
create+scan(10, std::plus)+subscribe 980.78 ns 0.33 ns 0.40 ns 0.83
create+flat_map(just(v*2))+subscribe 3184.30 ns 150.84 ns 138.10 ns 1.09

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 860.65 ns - - 0.00
create+take_while(true)+subscribe 832.55 ns 0.33 ns 0.40 ns 0.83

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 3118.83 ns 33.75 ns 30.44 ns 1.11

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 4329.95 ns 177.23 ns 166.43 ns 1.06
create(1) + merge_with(create(2)) + subscribe 6723.01 ns 176.67 ns 167.79 ns 1.05
create(1) + with_latest_from(create(2)) + subscribe - 145.63 ns - 0.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 95.96 ns 77.64 ns 78.95 ns 0.98

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 3783.81 ns 118.83 ns 109.10 ns 1.09

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1078.19 ns 1.12 ns 1.12 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 1092.42 ns 1.12 ns 1.12 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2337.23 ns 0.44 ns 0.44 ns 0.99
from array of 1 - create + subscribe + current_thread 3015.74 ns 102.52 ns 102.55 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 6920.80 ns 0.28 ns 0.28 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 2012.91 ns 4.18 ns 3.95 ns 1.06
create+filter(true)+subscribe 1137.67 ns 2.52 ns 2.51 ns 1.00
create(1,2)+skip(1)+subscribe 1825.33 ns 5.60 ns 5.60 ns 1.00
create(1,1,2)+distinct_until_changed()+subscribe 1168.85 ns 8.41 ns 8.09 ns 1.04
create(1,2)+first()+subscribe 2350.52 ns 5.91 ns 5.84 ns 1.01
create(1,2)+last()+subscribe 1454.84 ns 8.14 ns 8.11 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1006.00 ns 1.12 ns 1.12 ns 1.00
current_thread scheduler create worker + schedule 1342.01 ns 9.53 ns 9.56 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 2413.13 ns 163.85 ns 155.12 ns 1.06

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 1147.02 ns 2.52 ns 2.53 ns 0.99
create+scan(10, std::plus)+subscribe 1401.26 ns 5.04 ns 5.05 ns 1.00
create+flat_map(just(v*2))+subscribe 3419.99 ns 300.44 ns 301.01 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 1199.21 ns 3.36 ns 3.35 ns 1.00
create+take_while(true)+subscribe 1149.02 ns 2.79 ns 2.79 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 3935.26 ns 86.36 ns 87.99 ns 0.98

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 4254.12 ns 340.82 ns 348.86 ns 0.98
create(1) + merge_with(create(2)) + subscribe 7042.96 ns 360.69 ns 346.21 ns 1.04
create(1) + with_latest_from(create(2)) + subscribe - 399.59 ns - 0.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 103.02 ns 93.55 ns 92.86 ns 1.01

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 4607.36 ns 240.08 ns 249.51 ns 0.96

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 334.81 ns 0.80 ns 2.01 ns 0.40
Subscribe empty callbacks to empty observable via pipe operator 337.62 ns 1.20 ns 1.21 ns 0.99

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 735.14 ns 0.41 ns 0.43 ns 0.95
from array of 1 - create + subscribe + current_thread 1048.85 ns 25.09 ns 35.07 ns 0.72
concat_as_source of just(1 immediate) create + subscribe 2488.68 ns 0.40 ns 0.41 ns 0.98

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 642.37 ns 0.40 ns 0.39 ns 1.02
create+filter(true)+subscribe 365.66 ns 0.40 ns 0.40 ns 1.01
create(1,2)+skip(1)+subscribe 565.02 ns 0.40 ns 0.40 ns 1.00
create(1,1,2)+distinct_until_changed()+subscribe 374.79 ns 0.40 ns 0.80 ns 0.50
create(1,2)+first()+subscribe 782.84 ns 0.40 ns 0.41 ns 0.98
create(1,2)+last()+subscribe 493.78 ns 0.40 ns 0.41 ns 0.98

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 254.03 ns 1.20 ns 1.22 ns 0.98
current_thread scheduler create worker + schedule 387.77 ns 5.77 ns 9.82 ns 0.59
current_thread scheduler create worker + schedule + recursive schedule 831.26 ns 66.09 ns 84.40 ns 0.78

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 365.39 ns 0.40 ns 0.37 ns 1.07
create+scan(10, std::plus)+subscribe 454.22 ns 0.80 ns 0.80 ns 1.00
create+flat_map(just(v*2))+subscribe 1648.44 ns 82.85 ns 96.30 ns 0.86

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 393.14 ns - - 0.00
create+take_while(true)+subscribe 366.46 ns 0.40 ns 0.40 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 1752.22 ns 18.04 ns 22.08 ns 0.82

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 2113.59 ns 105.99 ns 122.52 ns 0.87
create(1) + merge_with(create(2)) + subscribe 3416.15 ns 94.33 ns 110.85 ns 0.85
create(1) + with_latest_from(create(2)) + subscribe - 89.49 ns - 0.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 38.77 ns 32.47 ns 31.81 ns 1.02

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1613.45 ns 92.22 ns 117.91 ns 0.78

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1174.73 ns 2.34 ns 1.15 ns 2.04
Subscribe empty callbacks to empty observable via pipe operator 1826.60 ns 2.34 ns 1.15 ns 2.04

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2612.56 ns 9.70 ns 5.18 ns 1.87
from array of 1 - create + subscribe + current_thread 3422.64 ns 112.83 ns 76.94 ns 1.47
concat_as_source of just(1 immediate) create + subscribe 8863.79 ns 14.61 ns 8.06 ns 1.81

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 2799.36 ns 9.28 ns 6.51 ns 1.42
create+filter(true)+subscribe 1361.80 ns 7.02 ns 4.45 ns 1.58
create(1,2)+skip(1)+subscribe 2055.19 ns 11.03 ns 5.97 ns 1.85
create(1,1,2)+distinct_until_changed()+subscribe 1392.90 ns 15.20 ns 9.98 ns 1.52
create(1,2)+first()+subscribe 2887.27 ns 14.26 ns 8.31 ns 1.72
create(1,2)+last()+subscribe 1735.76 ns 13.37 ns 9.23 ns 1.45

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1140.31 ns 5.01 ns 2.88 ns 1.74
current_thread scheduler create worker + schedule 1416.23 ns 11.03 ns 6.84 ns 1.61
current_thread scheduler create worker + schedule + recursive schedule 2434.92 ns 136.55 ns 91.79 ns 1.49

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 1349.94 ns 6.68 ns 4.45 ns 1.50
create+scan(10, std::plus)+subscribe 1601.10 ns 11.03 ns 6.96 ns 1.58
create+flat_map(just(v*2))+subscribe 4031.85 ns 293.50 ns 187.03 ns 1.57

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 1494.88 ns 6.35 ns 3.80 ns 1.67
create+take_while(true)+subscribe 1370.64 ns 7.68 ns 4.47 ns 1.72

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 4684.44 ns 94.70 ns 60.26 ns 1.57

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 5045.54 ns 339.98 ns 213.31 ns 1.59
create(1) + merge_with(create(2)) + subscribe 8146.76 ns 342.52 ns 215.34 ns 1.59
create(1) + with_latest_from(create(2)) + subscribe - 341.43 ns - 0.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 114.88 ns 94.36 ns 70.42 ns 1.34

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 5414.52 ns 300.29 ns 205.59 ns 1.46

Please sign in to comment.