Skip to content

Commit

Permalink
Use actor to avoid re-opening the db for every request
Browse files Browse the repository at this point in the history
  • Loading branch information
voltrevo committed Feb 28, 2024
1 parent ea61799 commit 7368453
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 75 deletions.
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vstc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ serde_qs = "0.8.0"
serde_json = "1.0.108"
termion = "2.0.1"
actix-web = "4"
actix = "0.13.3"
tokio = "1"
226 changes: 151 additions & 75 deletions vstc/src/db_host.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use crate::exit_command_failed::exit_command_failed;
use actix_web::{dev, web, App, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder};
use actix::{Actor, Addr, Context, Handler, Message};
use actix_web::{
dev,
http::{Method, StatusCode},
web::{self, Bytes},
App, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder,
};
use storage::{storage_head_ptr, SledBackend, Storage, StorageReader};
use tokio::task::LocalSet;
use valuescript_compiler::inline_valuescript;
use valuescript_vm::{
vs_object::VsObject,
Expand All @@ -18,18 +25,20 @@ pub fn db_host(path: &str, args: &[String]) {
);
}

let path = path.to_owned();

// TODO: Multi-thread?
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

runtime.block_on(async {
let local = LocalSet::new();

local.block_on(&runtime, async {
let db_actor = DbActor::new(Storage::new(SledBackend::open(path).unwrap())).start();

HttpServer::new(move || {
App::new()
.app_data(web::Data::new(path.clone()))
.app_data(web::Data::new(db_actor.clone()))
.default_service(web::route().to(handle_request))
})
.bind("127.0.0.1:8080")
Expand All @@ -43,93 +52,160 @@ pub fn db_host(path: &str, args: &[String]) {
async fn handle_request(
req: HttpRequest,
payload: web::Payload,
data: web::Data<String>,
data: web::Data<Addr<DbActor>>,
) -> impl Responder {
let path = req.path();
let method = req.method();
let mut storage = Storage::new(SledBackend::open(data.as_ref()).unwrap());

let body = match get_body(&req, payload.into_inner()).await {
Ok(body) => body,
Err(e) => return e.into(),
let req_val = DbRequest {
path: req.path().to_owned(),
method: req.method().clone(),
body: match get_body(&req, payload.into_inner()).await {
Ok(body) => body,
Err(_e) => todo!(), // handle error
},
};

let mut instance: Val = storage
.get_head(storage_head_ptr(b"state"))
.unwrap()
.unwrap();
match data.send(req_val).await {
Ok(Ok(res)) => HttpResponse::Ok()
.content_type("application/json")
.body(res),
Ok(Err(err)) => {
println!("{}", err.message);
HttpResponse::build(StatusCode::from_u16(err.code).unwrap()).body(err.message)
}
Err(err) => {
println!("{}", err);
HttpResponse::InternalServerError().body("Mailbox has closed")
}
}
}

let fn_ = inline_valuescript(
r#"
export default function(req) {
if ("handleRequest" in this) {
return this.handleRequest(req);
}
struct DbRequest {
path: String,
method: Method,
body: Bytes,
}

const handlerName = `${req.method} ${req.path}`;
struct HttpError {
code: u16,
message: String,
}

if (!this[handlerName]) {
throw new Error("No handler for request");
}
impl Message for DbRequest {
type Result = Result<String, HttpError>;
}

if (req.method === "GET") {
// Enforce GET as read-only
const state = this;
return state[handlerName](req.body);
}
struct DbActor {
storage: Storage<SledBackend>,
apply_fn: Val,
}

return this[handlerName](req.body);
}
"#,
);

let req_val = VsObject {
string_map: vec![
("path".to_string(), path.to_val()),
("method".to_string(), method.to_string().to_val()),
("body".to_string(), body),
]
.into_iter()
.collect(),
symbol_map: vec![].into_iter().collect(),
prototype: Val::Void,
}
.to_val();
impl Actor for DbActor {
type Context = Context<Self>;
}

let mut vm = VirtualMachine::default();
impl DbActor {
fn new(storage: Storage<SledBackend>) -> Self {
Self {
storage,
apply_fn: inline_valuescript(
// TODO: store in actor
r#"
export default function(req) {
if ("handleRequest" in this) {
return this.handleRequest(req);
}
const handlerName = `${req.method} ${req.path}`;
if (!this[handlerName]) {
throw new Error("No handler for request");
}
if (req.method === "GET") {
// Enforce GET as read-only
const state = this;
return state[handlerName](req.body);
}
return this[handlerName](req.body);
}
"#,
),
}
}
}

let res = match vm.run(None, &mut instance, fn_, vec![req_val]) {
Ok(res) => match res.to_json() {
Some(json) => HttpResponse::Ok().json(json),
None => HttpResponse::InternalServerError().body("Failed to serialize response"),
},
Err(err) => {
println!("Uncaught exception: {}", err.pretty());
HttpResponse::InternalServerError().body("Uncaught exception")
impl Handler<DbRequest> for DbActor {
type Result = Result<String, HttpError>;

fn handle(&mut self, msg: DbRequest, _ctx: &mut Self::Context) -> Self::Result {
let mut instance: Val = self
.storage
.get_head(storage_head_ptr(b"state"))
.unwrap()
.unwrap();

let DbRequest { path, method, body } = msg;

let body = if body.is_empty() {
Val::Undefined
} else {
match serde_json::from_slice::<serde_json::Value>(&body) {
Ok(json_value) => Val::from_json(&json_value),
Err(_err) => {
return Err(HttpError {
code: 400,
message: "Bad request".to_owned(),
})
}
}
};

let req_val = VsObject {
string_map: vec![
("path".to_string(), path.to_val()),
("method".to_string(), method.to_string().to_val()),
("body".to_string(), body),
]
.into_iter()
.collect(),
symbol_map: vec![].into_iter().collect(),
prototype: Val::Void,
}
};
.to_val();

let mut vm = VirtualMachine::default();

let res = match vm.run(None, &mut instance, self.apply_fn.clone(), vec![req_val]) {
Ok(res) => match res.to_json() {
Some(json) => Ok(json.to_string()),
None => Err(HttpError {
code: 500,
message: "Failed to serialize response".to_owned(),
}),
},
Err(err) => {
println!("Uncaught exception: {}", err.pretty());
Err(HttpError {
code: 500,
message: "Uncaught exception".to_owned(),
})
}
};

storage
.set_head(storage_head_ptr(b"state"), &instance)
.unwrap();
self
.storage
.set_head(storage_head_ptr(b"state"), &instance)
.unwrap();

res
res
}
}

async fn get_body(req: &HttpRequest, mut payload: dev::Payload) -> Result<Val, actix_web::Error> {
async fn get_body(req: &HttpRequest, mut payload: dev::Payload) -> Result<Bytes, actix_web::Error> {
let payload = web::Payload::from_request(req, &mut payload).await?;

let body = payload
payload
.to_bytes_limited(1_024 * 1_024)
.await
.map_err(|_| actix_web::error::PayloadError::Overflow)??;

if body.is_empty() {
Ok(Val::Undefined)
} else {
match serde_json::from_slice::<serde_json::Value>(&body) {
Ok(json_value) => Ok(Val::from_json(&json_value)),
Err(err) => Err(actix_web::error::ErrorBadRequest(err)),
}
}
.map_err(|_| actix_web::error::PayloadError::Overflow)?
}

0 comments on commit 7368453

Please sign in to comment.