Skip to content

Commit

Permalink
Igni/affine steal sort (#261)
Browse files Browse the repository at this point in the history
Executor rework
  • Loading branch information
o0Ignition0o committed Sep 24, 2020
1 parent 75904cf commit aa066f2
Show file tree
Hide file tree
Showing 28 changed files with 1,011 additions and 704 deletions.
12 changes: 6 additions & 6 deletions src/bastion-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ travis-ci = { repository = "bastion-rs/bastion", branch = "master" }
maintenance = { status = "actively-developed" }

[features]
unstable = ["numanji", "allocator-suite", "jemallocator"]
unstable = []

[dependencies]
lightproc = "0.3.5"
Expand All @@ -42,17 +42,17 @@ num_cpus = "1.13"
pin-utils = "0.1.0"

# Allocator
numanji = { version = "^0.1", optional = true, default-features = false }
allocator-suite = { version = "^0.1", optional = true, default-features = false }
arrayvec = { version = "0.5.1", features = ["array-sizes-129-255"]}
futures-timer = "3.0.2"

[target.'cfg(not(any(target_os = "android", target_os = "linux")))'.dependencies]
jemallocator = { version = "^0.3", optional = true, default-features = false }
once_cell = "1.4.0"
lever = "0.1.1-alpha.11"
tracing = "0.1.19"
crossbeam-queue = "0.2.3"

[target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "^0.3.8", features = ["basetsd"] }

[dev-dependencies]
proptest = "^0.10"
futures = "0.3.5"
tracing-subscriber = "0.2.11"
9 changes: 2 additions & 7 deletions src/bastion-executor/benches/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
extern crate test;

use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::thread;
use std::time::Duration;
use test::Bencher;
Expand All @@ -15,7 +12,7 @@ use test::Bencher;
#[bench]
fn blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
(0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
Expand All @@ -25,9 +22,7 @@ fn blocking(b: &mut Bencher) {
ProcStack::default(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();

run(join_all(handles), ProcStack::default());
.collect::<Vec<_>>()
});
}

Expand Down
48 changes: 48 additions & 0 deletions src/bastion-executor/benches/run_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#![feature(test)]

extern crate test;

use bastion_executor::blocking;
use bastion_executor::run::run;
use futures::future::join_all;
use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;
use test::Bencher;

// Benchmark for a 10K burst task spawn
#[bench]
fn run_blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
.map(|_| {
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
)
})
.collect::<Vec<_>>();

run(join_all(handles), ProcStack::default())
});
}

// Benchmark for a single blocking task spawn
#[bench]
fn run_blocking_single(b: &mut Bencher) {
b.iter(|| {
run(
blocking::spawn_blocking(
async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
},
ProcStack::default(),
),
ProcStack::default(),
)
});
}
29 changes: 9 additions & 20 deletions src/bastion-executor/benches/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,43 @@

extern crate test;

use bastion_executor::load_balancer;
use bastion_executor::prelude::spawn;
use bastion_executor::run::run;
use futures::future::join_all;
use futures_timer::Delay;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::time::Duration;
use test::Bencher;

// Benchmark for a 10K burst task spawn
#[bench]
fn spawn_lot(b: &mut Bencher) {
let proc_stack = ProcStack::default();
b.iter(|| {
let proc_stack = ProcStack::default();
let handles = (0..10_000)
let _ = (0..10_000)
.map(|_| {
spawn(
async {
let duration = Duration::from_millis(0);
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
)
})
.collect::<Vec<RecoverableHandle<()>>>();

run(join_all(handles), proc_stack);
.collect::<Vec<_>>();
});
}

// Benchmark for a single blocking task spawn
// Benchmark for a single task spawn
#[bench]
fn spawn_single(b: &mut Bencher) {
let proc_stack = ProcStack::default();
b.iter(|| {
let proc_stack = ProcStack::default();

let handle = spawn(
spawn(
async {
let duration = Duration::from_millis(0);
let duration = Duration::from_millis(1);
Delay::new(duration).await;
},
proc_stack.clone(),
);
run(
async {
handle.await;
},
proc_stack,
)
});
}
45 changes: 39 additions & 6 deletions src/bastion-executor/benches/stats.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#![feature(test)]

extern crate test;
use bastion_executor::load_balancer::{stats, SmpStats};
use bastion_executor::load_balancer::{core_count, get_cores, stats, SmpStats};
use bastion_executor::placement;
use std::thread;
use test::Bencher;

fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
let cores = placement::get_core_ids().expect("Core mapping couldn't be fetched");
let mut handles = Vec::new();
for core in cores {
let mut handles = Vec::with_capacity(*core_count());
for core in get_cores() {
let handle = thread::spawn(move || {
placement::set_for_current(core);
placement::set_for_current(*core);
for i in 0..100 {
stats.store_load(core.id, 10);
if i % 3 == 0 {
Expand All @@ -25,7 +25,6 @@ fn stress_stats<S: SmpStats + Sync + Send>(stats: &'static S) {
handle.join().unwrap();
}
}
use test::Bencher;

// previous lock based stats benchmark 1,352,791 ns/iter (+/- 2,682,013)

Expand All @@ -36,3 +35,37 @@ fn lockless_stats_bench(b: &mut Bencher) {
stress_stats(stats());
});
}

#[bench]
fn lockless_stats_bad_load(b: &mut Bencher) {
let stats = stats();
const MAX_CORE: usize = 256;
for i in 0..MAX_CORE {
// Generating the worst possible mergesort scenario
// [0,2,4,6,8,10,1,3,5,7,9]...
if i <= MAX_CORE / 2 {
stats.store_load(i, i * 2);
} else {
stats.store_load(i, i - 1 - MAX_CORE / 2);
}
}

b.iter(|| {
let _sorted_load = stats.get_sorted_load();
});
}

#[bench]
fn lockless_stats_good_load(b: &mut Bencher) {
let stats = stats();
const MAX_CORE: usize = 256;
for i in 0..MAX_CORE {
// Generating the best possible mergesort scenario
// [0,1,2,3,4,5,6,7,8,9]...
stats.store_load(i, i);
}

b.iter(|| {
let _sorted_load = stats.get_sorted_load();
});
}
21 changes: 0 additions & 21 deletions src/bastion-executor/src/allocator.rs

This file was deleted.

Loading

0 comments on commit aa066f2

Please sign in to comment.