Skip to content

Commit

Permalink
Merge pull request #3 from ublk-org/next
Browse files Browse the repository at this point in the history
libublk: replace local executor with smol
  • Loading branch information
ming1 authored Feb 20, 2024
2 parents 357359b + 6c3bb04 commit 4451b3a
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 627 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ log = {version = "0.4", features = ["release_max_level_off"]}
thiserror = "1.0.43"
derive_builder = "0.12"
futures = "0.3"
env_logger = "0.9"
smol = "1.3.0"
slab = "0.4.9"

[dev-dependencies]
block-utils = "0.11.0"
Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ by ctrl+C.

``` rust
use libublk::dev_flags::*;
use libublk::{ctrl::UblkCtrl, exe::Executor, io::UblkDev, io::UblkQueue};
use libublk::uring_async::ublk_wait_and_handle_ios;
use libublk::{ctrl::UblkCtrl, io::UblkDev, io::UblkQueue};

fn main() {
let depth = 64_u32;
Expand Down Expand Up @@ -69,7 +70,8 @@ fn main() {
let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap();
let q_handler = move |qid: u16, dev: &UblkDev| {
let q_rc = std::rc::Rc::new(UblkQueue::new(qid as u16, &dev).unwrap());
let exe = Executor::new(dev.get_nr_ios());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();

// handle_io_cmd() can be .await nested, and support join!() over
// multiple Future objects from async function/block
Expand All @@ -81,7 +83,7 @@ fn main() {
let q = q_rc.clone();

// spawn background task(coroutine) for handling each io command
exe.spawn(tag, async move {
f_vec.push(exe.spawn(async move {
let mut cmd_op = libublk::sys::UBLK_IO_FETCH_REQ;
let mut result = 0;
let addr = std::ptr::null_mut();
Expand All @@ -96,10 +98,11 @@ fn main() {
result = handle_io_cmd(&q, tag).await;
cmd_op = libublk::sys::UBLK_IO_COMMIT_AND_FETCH_REQ;
}
});
}));
}

q_rc.wait_and_wake_io_tasks(&exe);
ublk_wait_and_handle_ios(&q_rc, &exe);
smol::block_on(async { futures::future::join_all(f_vec).await });
};

// Now start this ublk target
Expand Down
116 changes: 40 additions & 76 deletions examples/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use ilog::IntLog;
use io_uring::{opcode, squeue, types};
use libublk::dev_flags::*;
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{
ctrl::UblkCtrl, exe::Executor, exe::UringOpFuture, sys, UblkError, UblkIORes, UblkSession,
};
use libublk::uring_async::ublk_wait_and_handle_ios;
use libublk::{ctrl::UblkCtrl, sys, UblkError, UblkIORes, UblkSession};
use log::trace;
use serde::Serialize;
use std::os::unix::fs::FileTypeExt;
Expand Down Expand Up @@ -135,59 +134,22 @@ fn __lo_prep_submit_io_cmd(iod: &libublk::sys::ublksrv_io_desc) -> i32 {
}

#[inline]
fn __lo_submit_io_cmd(
q: &UblkQueue<'_>,
op: u32,
off: u64,
bytes: u32,
buf_addr: *mut u8,
data: u64,
) {
fn __lo_submit_io_cmd(op: u32, off: u64, bytes: u32, buf_addr: *mut u8) -> io_uring::squeue::Entry {
match op {
libublk::sys::UBLK_IO_OP_FLUSH => {
let sqe = &opcode::SyncFileRange::new(types::Fixed(1), bytes)
.offset(off)
.build()
.flags(squeue::Flags::FIXED_FILE)
.user_data(data);
unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(sqe)
.expect("submission fail");
}
}
libublk::sys::UBLK_IO_OP_READ => {
let sqe = &opcode::Read::new(types::Fixed(1), buf_addr, bytes)
.offset(off)
.build()
.flags(squeue::Flags::FIXED_FILE)
.user_data(data);
unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(sqe)
.expect("submission fail");
}
}
libublk::sys::UBLK_IO_OP_WRITE => {
let sqe = &opcode::Write::new(types::Fixed(1), buf_addr, bytes)
.offset(off)
.build()
.flags(squeue::Flags::FIXED_FILE)
.user_data(data);
unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(sqe)
.expect("submission fail");
}
}
_ => {}
};
libublk::sys::UBLK_IO_OP_FLUSH => opcode::SyncFileRange::new(types::Fixed(1), bytes)
.offset(off)
.build()
.flags(squeue::Flags::FIXED_FILE),
libublk::sys::UBLK_IO_OP_READ => opcode::Read::new(types::Fixed(1), buf_addr, bytes)
.offset(off)
.build()
.flags(squeue::Flags::FIXED_FILE),
libublk::sys::UBLK_IO_OP_WRITE => opcode::Write::new(types::Fixed(1), buf_addr, bytes)
.offset(off)
.build()
.flags(squeue::Flags::FIXED_FILE),
_ => panic!(),
}
}

async fn lo_handle_io_cmd_async(q: &UblkQueue<'_>, tag: u16) -> i32 {
Expand All @@ -199,14 +161,13 @@ async fn lo_handle_io_cmd_async(q: &UblkQueue<'_>, tag: u16) -> i32 {

for _ in 0..4 {
let op = iod.op_flags & 0xff;
let user_data = UblkIOCtx::build_user_data_async(tag as u16, op, 0);
// either start to handle or retry
let off = (iod.start_sector << 9) as u64;
let bytes = (iod.nr_sectors << 9) as u32;
let buf_addr = q.get_io_buf_addr(tag);

__lo_submit_io_cmd(q, op, off, bytes, buf_addr, user_data);
let res = UringOpFuture { user_data }.await;
let sqe = __lo_submit_io_cmd(op, off, bytes, buf_addr);
let res = q.ublk_submit_sqe(sqe).await;
if res != -(libc::EAGAIN) {
return res;
}
Expand All @@ -223,35 +184,29 @@ async fn lo_handle_io_cmd_async_split(q: &UblkQueue<'_>, tag: u16) -> i32 {
}

let op = iod.op_flags & 0xff;
let user_data = UblkIOCtx::build_user_data_async(tag as u16, op, 0);
let off = (iod.start_sector << 9) as u64;
let bytes = (iod.nr_sectors << 9) as u32;
let buf_addr = q.get_io_buf_addr(tag);

if bytes > 4096 {
__lo_submit_io_cmd(q, op, off, 4096, buf_addr, user_data);
let user_data2 = UblkIOCtx::build_user_data_async(tag as u16, op, 1);
__lo_submit_io_cmd(
q,
let sqe = __lo_submit_io_cmd(op, off, 4096, buf_addr);
let sqe2 = __lo_submit_io_cmd(
op,
off + 4096,
bytes - 4096,
((buf_addr as u64) + 4096) as *mut u8,
user_data2,
);

let f = UringOpFuture { user_data };
let f2 = UringOpFuture {
user_data: user_data2,
};
let f = q.ublk_submit_sqe(sqe);
let f2 = q.ublk_submit_sqe(sqe2);
let (res, res2) = futures::join!(f, f2);

res + res2
} else {
__lo_submit_io_cmd(q, op, off, bytes, buf_addr, user_data);
let res = UringOpFuture { user_data };
let sqe = __lo_submit_io_cmd(op, off, bytes, buf_addr);
let f = q.ublk_submit_sqe(sqe);

res.await
f.await
}
}

Expand Down Expand Up @@ -281,7 +236,14 @@ fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx) {
let off = (iod.start_sector << 9) as u64;
let bytes = (iod.nr_sectors << 9) as u32;
let buf_addr = q.get_io_buf_addr(tag);
__lo_submit_io_cmd(q, op, off, bytes, buf_addr, data);
let sqe = __lo_submit_io_cmd(op, off, bytes, buf_addr).user_data(data);
unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(&sqe)
.expect("submission fail");
}
}
}

Expand Down Expand Up @@ -362,12 +324,13 @@ fn __test_add(
let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap();
let q_async_fn = move |qid: u16, dev: &UblkDev| {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &dev).unwrap());
let exe = Executor::new(dev.get_nr_ios());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();

for tag in 0..depth as u16 {
let q = q_rc.clone();

exe.spawn(tag as u16, async move {
f_vec.push(exe.spawn(async move {
let buf_addr = q.get_io_buf_addr(tag);
let mut cmd_op = sys::UBLK_IO_FETCH_REQ;
let mut res = 0;
Expand All @@ -384,9 +347,10 @@ fn __test_add(
};
cmd_op = sys::UBLK_IO_COMMIT_AND_FETCH_REQ;
}
});
}));
}
q_rc.wait_and_wake_io_tasks(&exe);
ublk_wait_and_handle_ios(&q_rc, &exe);
smol::block_on(async { futures::future::join_all(f_vec).await });
};

let q_sync_fn = move |qid: u16, _dev: &UblkDev| {
Expand Down
13 changes: 8 additions & 5 deletions examples/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use bitflags::bitflags;
use clap::{Arg, ArgAction, Command};
use libublk::dev_flags::*;
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{ctrl::UblkCtrl, exe::Executor, UblkIORes, UblkSession};
use libublk::uring_async::ublk_wait_and_handle_ios;
use libublk::{ctrl::UblkCtrl, UblkIORes, UblkSession};
use std::rc::Rc;

bitflags! {
Expand Down Expand Up @@ -90,12 +91,13 @@ fn __test_add(
};
let q_async_handler = move |qid: u16, dev: &UblkDev| {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &dev).unwrap());
let exe = Executor::new(dev.get_nr_ios());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();

for tag in 0..depth as u16 {
let q = q_rc.clone();

exe.spawn(tag as u16, async move {
f_vec.push(exe.spawn(async move {
let buf_addr = q.get_io_buf_addr(tag);
let mut cmd_op = libublk::sys::UBLK_IO_FETCH_REQ;
let mut res = 0;
Expand All @@ -108,9 +110,10 @@ fn __test_add(
res = get_io_cmd_result(&q, tag);
cmd_op = libublk::sys::UBLK_IO_COMMIT_AND_FETCH_REQ;
}
});
}));
}
q_rc.wait_and_wake_io_tasks(&exe);
ublk_wait_and_handle_ios(&q_rc, &exe);
smol::block_on(async { futures::future::join_all(f_vec).await });
};

if aio {
Expand Down
28 changes: 18 additions & 10 deletions examples/ramdisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use io_uring::cqueue;
/// UblkCtrl::start_dev_in_queue() and low level interface example.
///
use libublk::dev_flags::*;
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{ctrl::UblkCtrl, exe::Executor, UblkError};
use libublk::io::{UblkDev, UblkQueue};
use libublk::uring_async::ublk_wake_task;
use libublk::{ctrl::UblkCtrl, UblkError};
use std::rc::Rc;

fn handle_io(q: &UblkQueue, tag: u16, buf_addr: *mut u8, start: u64) -> i32 {
Expand Down Expand Up @@ -65,14 +66,15 @@ fn rd_add_dev(dev_id: i32, buf_addr: u64, size: u64, for_add: bool) {
};
let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap();

let exe = Executor::new(dev.get_nr_ios());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();
let q_rc = Rc::new(UblkQueue::new(0, &dev).unwrap());
let buf_size = dev.dev_info.max_io_buf_bytes as usize;

for tag in 0..depth as u16 {
let q = q_rc.clone();
assert!(q.get_io_buf_addr(tag) == std::ptr::null_mut());
exe.spawn(tag as u16, async move {
f_vec.push(exe.spawn(async move {
let mut buffer: Vec<u8> = vec![0; buf_size];
let addr = buffer.as_mut_ptr();
let mut cmd_op = libublk::sys::UBLK_IO_FETCH_REQ;
Expand All @@ -87,19 +89,18 @@ fn rd_add_dev(dev_id: i32, buf_addr: u64, size: u64, for_add: bool) {
res = handle_io(&q, tag, addr, buf_addr);
cmd_op = libublk::sys::UBLK_IO_COMMIT_AND_FETCH_REQ;
}
});
}));
exe.try_tick();
}
ctrl.configure_queue(&dev, 0, unsafe { libc::gettid() })
.unwrap();

let (token, buf) = ctrl.submit_start_dev(&dev).unwrap();
let wake_handler = |data: u64, cqe: &cqueue::Entry, _last: bool| {
let tag = UblkIOCtx::user_data_to_tag(data);
exe.wake_with_uring_cqe(tag as u16, &cqe);
};
let wake_handler = |data: u64, cqe: &cqueue::Entry, _last: bool| ublk_wake_task(data, cqe);

let res = loop {
let _ = q_rc.flush_and_wake_io_tasks(wake_handler, 0);
while exe.try_tick() {}
let _res = ctrl.poll_start_dev(token);
match _res {
Ok(res) => break Ok(res),
Expand All @@ -117,7 +118,14 @@ fn rd_add_dev(dev_id: i32, buf_addr: u64, size: u64, for_add: bool) {
match res {
Ok(res) if res >= 0 => {
ctrl.dump();
q_rc.wait_and_wake_io_tasks(&exe);
loop {
while exe.try_tick() {}
match q_rc.flush_and_wake_io_tasks(wake_handler, 1) {
Err(_) => break,
_ => {}
}
}
smol::block_on(async { futures::future::join_all(f_vec).await });
}
_ => {}
};
Expand Down
Loading

0 comments on commit 4451b3a

Please sign in to comment.