Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread_pool schedulers #563

Merged
merged 6 commits into from
Apr 21, 2024
Merged

Add thread_pool schedulers #563

merged 6 commits into from
Apr 21, 2024

Conversation

victimsnino
Copy link
Owner

@victimsnino victimsnino commented Apr 18, 2024

Copy link
Contributor

github-actions bot commented Apr 19, 2024

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 308.65 ns 2.16 ns 2.16 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 302.64 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 698.91 ns 0.31 ns 0.62 ns 0.50
from array of 1 - create + subscribe + current_thread 1066.60 ns 3.42 ns 3.43 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2212.58 ns 117.86 ns 130.79 ns 0.90
defer from array of 1 - defer + create + subscribe + immediate 737.82 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2167.15 ns 59.23 ns 59.23 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3098.52 ns 32.42 ns 32.42 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 26642.54 ns 28103.00 ns 27881.55 ns 1.01
from array of 1000 - create + as_blocking + subscribe + new_thread 40212.24 ns 50851.94 ns 51949.64 ns 0.98

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1080.16 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 829.52 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 987.17 ns 0.31 ns 0.31 ns 0.99
immediate_just(1,1,2)+distinct_until_changed()+subscribe 863.75 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1265.58 ns 0.31 ns 0.62 ns 0.50
immediate_just(1,2)+last()+subscribe 943.89 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1088.56 ns 17.28 ns 17.90 ns 0.97

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 265.61 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 369.78 ns 5.87 ns 6.18 ns 0.95
current_thread scheduler create worker + schedule + recursive schedule 839.98 ns 55.36 ns 55.96 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 871.00 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 890.67 ns 0.37 ns 0.31 ns 1.21
immediate_just+flat_map(immediate_just(v*2))+subscribe 2308.63 ns 166.45 ns 162.21 ns 1.03
immediate_just+buffer(2)+subscribe 1576.10 ns 13.58 ns 13.59 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2372.36 ns 1085.38 ns 1047.77 ns 1.04

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 817.67 ns - - 0.00
immediate_just+take_while(true)+subscribe 829.19 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1965.94 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3448.08 ns 182.27 ns 186.10 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3700.90 ns 171.45 ns 169.90 ns 1.01
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 156.95 ns 157.07 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3504.07 ns 1253.32 ns 1031.79 ns 1.21
immediate_just(1) + zip(immediate_just(2)) + subscribe 2085.67 ns 213.46 ns 210.27 ns 1.02

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.53 ns 16.80 ns 16.74 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1368.47 ns 13.89 ns 14.43 ns 0.96
basic sample with immediate scheduler 1551.47 ns 6.01 ns 5.55 ns 1.08

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 932.22 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1058.18 ns 131.19 ns 125.98 ns 1.04

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1084.29 ns 5.40 ns 3.80 ns 1.42
Subscribe empty callbacks to empty observable via pipe operator 1092.35 ns 5.37 ns 3.84 ns 1.40

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2315.65 ns 0.44 ns 0.23 ns 1.89
from array of 1 - create + subscribe + current_thread 2970.94 ns 10.89 ns 32.45 ns 0.34
concat_as_source of just(1 immediate) create + subscribe 6751.32 ns 407.37 ns 334.85 ns 1.22
defer from array of 1 - defer + create + subscribe + immediate 2344.02 ns 0.28 ns 0.23 ns 1.20
interval - interval + take(3) + subscribe + immediate 6457.21 ns 95.48 ns 116.25 ns 0.82
interval - interval + take(3) + subscribe + current_thread 7545.28 ns 76.02 ns 95.62 ns 0.79
from array of 1 - create + as_blocking + subscribe + new_thread 74100.25 ns 68970.60 ns 78873.46 ns 0.87
from array of 1000 - create + as_blocking + subscribe + new_thread 76990.23 ns 74434.62 ns 85145.33 ns 0.87

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3428.75 ns 0.28 ns 0.22 ns 1.30
immediate_just+filter(true)+subscribe 2955.45 ns 0.28 ns 0.24 ns 1.16
immediate_just(1,2)+skip(1)+subscribe 3240.14 ns 0.29 ns 0.24 ns 1.23
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2444.45 ns 0.56 ns 0.47 ns 1.20
immediate_just(1,2)+first()+subscribe 3850.64 ns 0.28 ns 0.24 ns 1.16
immediate_just(1,2)+last()+subscribe 2830.48 ns 0.28 ns 0.24 ns 1.15
immediate_just+take_last(1)+subscribe 3585.47 ns 74.16 ns 74.86 ns 0.99

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1021.25 ns 5.76 ns 4.02 ns 1.43
current_thread scheduler create worker + schedule 1330.12 ns 18.17 ns 39.69 ns 0.46
current_thread scheduler create worker + schedule + recursive schedule 2979.94 ns 159.72 ns 202.51 ns 0.79

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2559.01 ns 0.28 ns 0.22 ns 1.25
immediate_just+scan(10, std::plus)+subscribe 2885.78 ns 0.56 ns 0.45 ns 1.25
immediate_just+flat_map(immediate_just(v*2))+subscribe 6598.31 ns 459.31 ns 375.13 ns 1.22
immediate_just+buffer(2)+subscribe 3075.12 ns 74.09 ns 65.05 ns 1.14
immediate_just+window(2)+subscribe + subscsribe inner 8063.86 ns 2742.73 ns 2148.16 ns 1.28

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2437.56 ns - - 0.00
immediate_just+take_while(true)+subscribe 2602.21 ns 0.28 ns 0.22 ns 1.25

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 5892.37 ns 0.28 ns 0.23 ns 1.20

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 9046.42 ns 514.56 ns 438.77 ns 1.17
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 11426.93 ns 523.58 ns 439.97 ns 1.19
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 537.61 ns 461.76 ns 1.16
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 10089.69 ns 2393.98 ns 1913.39 ns 1.25
immediate_just(1) + zip(immediate_just(2)) + subscribe 6287.73 ns 965.03 ns 804.52 ns 1.20

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 99.10 ns 62.27 ns 49.19 ns 1.27

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 3719.74 ns 38.59 ns 71.35 ns 0.54
basic sample with immediate scheduler 3461.08 ns 7.38 ns 15.65 ns 0.47

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2816.63 ns 0.28 ns 0.23 ns 1.19

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 8449.19 ns 5000.53 ns 4081.84 ns 1.23

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 272.57 ns 0.88 ns 1.56 ns 0.56
Subscribe empty callbacks to empty observable via pipe operator 271.88 ns 0.88 ns 1.57 ns 0.56

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 569.45 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 780.51 ns 4.01 ns 4.01 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2406.84 ns 136.32 ns 135.13 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 780.49 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2310.38 ns 58.26 ns 58.31 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3252.42 ns 31.17 ns 31.19 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29275.10 ns 26930.22 ns 27817.21 ns 0.97
from array of 1000 - create + as_blocking + subscribe + new_thread 35033.72 ns 32250.44 ns 36525.45 ns 0.88

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1165.69 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 844.93 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1100.89 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 880.65 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1366.88 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1000.68 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1193.93 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 277.49 ns 0.88 ns 1.57 ns 0.56
current_thread scheduler create worker + schedule 396.95 ns 4.01 ns 4.01 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 848.67 ns 55.30 ns 56.88 ns 0.97

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 840.10 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 958.97 ns 0.62 ns 0.62 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 4182.62 ns 140.05 ns 138.58 ns 1.01
immediate_just+buffer(2)+subscribe 1526.86 ns 13.90 ns 13.58 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2485.89 ns 910.67 ns 894.88 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 834.29 ns - - 0.00
immediate_just+take_while(true)+subscribe 838.70 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2081.72 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3419.09 ns 161.35 ns 158.58 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3722.78 ns 148.92 ns 148.88 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 141.23 ns 141.25 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3534.80 ns 841.82 ns 840.77 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2195.42 ns 203.02 ns 201.37 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 54.28 ns 19.05 ns 18.55 ns 1.03

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1325.48 ns 11.12 ns 11.11 ns 1.00
basic sample with immediate scheduler 1340.56 ns 6.17 ns 6.17 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 989.99 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1043.55 ns 138.96 ns 134.53 ns 1.03

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 605.31 ns 4.94 ns 4.01 ns 1.23
Subscribe empty callbacks to empty observable via pipe operator 605.79 ns 4.94 ns 4.01 ns 1.23

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1224.33 ns 5.55 ns 5.55 ns 1.00
from array of 1 - create + subscribe + current_thread 1451.72 ns 15.44 ns 15.44 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 3998.50 ns 184.63 ns 183.74 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 1248.12 ns 5.24 ns 5.24 ns 1.00
interval - interval + take(3) + subscribe + immediate 3023.33 ns 133.63 ns 134.23 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3874.90 ns 53.16 ns 52.98 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 128511.11 ns 122988.89 ns 115490.00 ns 1.06
from array of 1000 - create + as_blocking + subscribe + new_thread 136333.33 ns 137388.89 ns 128162.50 ns 1.07

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1921.31 ns 12.89 ns 12.86 ns 1.00
immediate_just+filter(true)+subscribe 1461.25 ns 11.74 ns 12.39 ns 0.95
immediate_just(1,2)+skip(1)+subscribe 2112.88 ns 13.47 ns 13.02 ns 1.03
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1393.45 ns 15.80 ns 15.95 ns 0.99
immediate_just(1,2)+first()+subscribe 2131.03 ns 12.95 ns 12.64 ns 1.02
immediate_just(1,2)+last()+subscribe 1821.34 ns 14.10 ns 14.11 ns 1.00
immediate_just+take_last(1)+subscribe 2119.29 ns 59.30 ns 58.75 ns 1.01

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 515.44 ns 6.17 ns 6.17 ns 1.00
current_thread scheduler create worker + schedule 704.45 ns 13.38 ns 13.33 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1178.70 ns 108.07 ns 104.57 ns 1.03

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1364.54 ns 11.25 ns 12.33 ns 0.91
immediate_just+scan(10, std::plus)+subscribe 1468.64 ns 21.58 ns 21.27 ns 1.01
immediate_just+flat_map(immediate_just(v*2))+subscribe 3592.19 ns 228.33 ns 226.25 ns 1.01
immediate_just+buffer(2)+subscribe 2366.53 ns 58.79 ns 58.17 ns 1.01
immediate_just+window(2)+subscribe + subscsribe inner 4595.22 ns 1227.51 ns 1227.66 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1375.76 ns 11.46 ns 11.46 ns 1.00
immediate_just+take_while(true)+subscribe 1675.99 ns 11.75 ns 12.40 ns 0.95

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3293.66 ns 7.71 ns 7.71 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 6010.11 ns 231.23 ns 236.78 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5691.46 ns 232.16 ns 232.74 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 217.09 ns 221.66 ns 0.98
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5624.59 ns 976.07 ns 932.27 ns 1.05
immediate_just(1) + zip(immediate_just(2)) + subscribe 3555.59 ns 532.53 ns 524.58 ns 1.02

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.74 ns 25.59 ns 25.59 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1913.78 ns 57.07 ns 56.62 ns 1.01
basic sample with immediate scheduler 2248.25 ns 35.79 ns 38.29 ns 0.93

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1521.22 ns 20.00 ns 17.81 ns 1.12

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1987.64 ns 348.42 ns 340.74 ns 1.02

Copy link

sonarcloud bot commented Apr 21, 2024

@victimsnino victimsnino marked this pull request as ready for review April 21, 2024 20:50
@victimsnino victimsnino merged commit 67a3735 into v2 Apr 21, 2024
27 checks passed
@victimsnino victimsnino deleted the thread_pools branch April 21, 2024 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant