diff --git a/Cargo.lock b/Cargo.lock index d58eec4..12a9dff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,31 @@ dependencies = [ "regex", ] +[[package]] +name = "actix" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb72882332b6d6282f428b77ba0358cb2687e61a6f6df6a6d3871e8a177c2d4f" +dependencies = [ + "actix-macros", + "actix-rt", + "actix_derive", + "bitflags 2.4.1", + "bytes", + "crossbeam-channel", + "futures-core", + "futures-sink", + "futures-task", + "futures-util", + "log", + "once_cell", + "parking_lot 0.12.0", + "pin-project-lite", + "smallvec", + "tokio", + "tokio-util", +] + [[package]] name = "actix-codec" version = "0.5.1" @@ -191,6 +216,17 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "actix_derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "addr2line" version = "0.17.0" @@ -3084,6 +3120,7 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" name = "vstc" version = "0.1.0" dependencies = [ + "actix", "actix-web", "serde", "serde_json", diff --git a/vstc/Cargo.toml b/vstc/Cargo.toml index 271cadb..efef367 100644 --- a/vstc/Cargo.toml +++ b/vstc/Cargo.toml @@ -15,4 +15,5 @@ serde_qs = "0.8.0" serde_json = "1.0.108" termion = "2.0.1" actix-web = "4" +actix = "0.13.3" tokio = "1" diff --git a/vstc/src/db_host.rs b/vstc/src/db_host.rs index c9e052d..7932db5 100644 --- a/vstc/src/db_host.rs +++ b/vstc/src/db_host.rs @@ -1,6 +1,13 @@ use crate::exit_command_failed::exit_command_failed; -use actix_web::{dev, web, App, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder}; +use actix::{Actor, Addr, Context, Handler, Message}; +use actix_web::{ + dev, + http::{Method, StatusCode}, + web::{self, Bytes}, + App, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder, +}; use storage::{storage_head_ptr, SledBackend, Storage, StorageReader}; +use tokio::task::LocalSet; use valuescript_compiler::inline_valuescript; use valuescript_vm::{ vs_object::VsObject, @@ -18,18 +25,20 @@ pub fn db_host(path: &str, args: &[String]) { ); } - let path = path.to_owned(); - // TODO: Multi-thread? let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - runtime.block_on(async { + let local = LocalSet::new(); + + local.block_on(&runtime, async { + let db_actor = DbActor::new(Storage::new(SledBackend::open(path).unwrap())).start(); + HttpServer::new(move || { App::new() - .app_data(web::Data::new(path.clone())) + .app_data(web::Data::new(db_actor.clone())) .default_service(web::route().to(handle_request)) }) .bind("127.0.0.1:8080") @@ -43,93 +52,160 @@ pub fn db_host(path: &str, args: &[String]) { async fn handle_request( req: HttpRequest, payload: web::Payload, - data: web::Data, + data: web::Data>, ) -> impl Responder { - let path = req.path(); - let method = req.method(); - let mut storage = Storage::new(SledBackend::open(data.as_ref()).unwrap()); - - let body = match get_body(&req, payload.into_inner()).await { - Ok(body) => body, - Err(e) => return e.into(), + let req_val = DbRequest { + path: req.path().to_owned(), + method: req.method().clone(), + body: match get_body(&req, payload.into_inner()).await { + Ok(body) => body, + Err(_e) => todo!(), // handle error + }, }; - let mut instance: Val = storage - .get_head(storage_head_ptr(b"state")) - .unwrap() - .unwrap(); + match data.send(req_val).await { + Ok(Ok(res)) => HttpResponse::Ok() + .content_type("application/json") + .body(res), + Ok(Err(err)) => { + println!("{}", err.message); + HttpResponse::build(StatusCode::from_u16(err.code).unwrap()).body(err.message) + } + Err(err) => { + println!("{}", err); + HttpResponse::InternalServerError().body("Mailbox has closed") + } + } +} - let fn_ = inline_valuescript( - r#" - export default function(req) { - if ("handleRequest" in this) { - return this.handleRequest(req); - } +struct DbRequest { + path: String, + method: Method, + body: Bytes, +} - const handlerName = `${req.method} ${req.path}`; +struct HttpError { + code: u16, + message: String, +} - if (!this[handlerName]) { - throw new Error("No handler for request"); - } +impl Message for DbRequest { + type Result = Result; +} - if (req.method === "GET") { - // Enforce GET as read-only - const state = this; - return state[handlerName](req.body); - } +struct DbActor { + storage: Storage, + apply_fn: Val, +} - return this[handlerName](req.body); - } - "#, - ); - - let req_val = VsObject { - string_map: vec![ - ("path".to_string(), path.to_val()), - ("method".to_string(), method.to_string().to_val()), - ("body".to_string(), body), - ] - .into_iter() - .collect(), - symbol_map: vec![].into_iter().collect(), - prototype: Val::Void, - } - .to_val(); +impl Actor for DbActor { + type Context = Context; +} - let mut vm = VirtualMachine::default(); +impl DbActor { + fn new(storage: Storage) -> Self { + Self { + storage, + apply_fn: inline_valuescript( + // TODO: store in actor + r#" + export default function(req) { + if ("handleRequest" in this) { + return this.handleRequest(req); + } + + const handlerName = `${req.method} ${req.path}`; + + if (!this[handlerName]) { + throw new Error("No handler for request"); + } + + if (req.method === "GET") { + // Enforce GET as read-only + const state = this; + return state[handlerName](req.body); + } + + return this[handlerName](req.body); + } + "#, + ), + } + } +} - let res = match vm.run(None, &mut instance, fn_, vec![req_val]) { - Ok(res) => match res.to_json() { - Some(json) => HttpResponse::Ok().json(json), - None => HttpResponse::InternalServerError().body("Failed to serialize response"), - }, - Err(err) => { - println!("Uncaught exception: {}", err.pretty()); - HttpResponse::InternalServerError().body("Uncaught exception") +impl Handler for DbActor { + type Result = Result; + + fn handle(&mut self, msg: DbRequest, _ctx: &mut Self::Context) -> Self::Result { + let mut instance: Val = self + .storage + .get_head(storage_head_ptr(b"state")) + .unwrap() + .unwrap(); + + let DbRequest { path, method, body } = msg; + + let body = if body.is_empty() { + Val::Undefined + } else { + match serde_json::from_slice::(&body) { + Ok(json_value) => Val::from_json(&json_value), + Err(_err) => { + return Err(HttpError { + code: 400, + message: "Bad request".to_owned(), + }) + } + } + }; + + let req_val = VsObject { + string_map: vec![ + ("path".to_string(), path.to_val()), + ("method".to_string(), method.to_string().to_val()), + ("body".to_string(), body), + ] + .into_iter() + .collect(), + symbol_map: vec![].into_iter().collect(), + prototype: Val::Void, } - }; + .to_val(); + + let mut vm = VirtualMachine::default(); + + let res = match vm.run(None, &mut instance, self.apply_fn.clone(), vec![req_val]) { + Ok(res) => match res.to_json() { + Some(json) => Ok(json.to_string()), + None => Err(HttpError { + code: 500, + message: "Failed to serialize response".to_owned(), + }), + }, + Err(err) => { + println!("Uncaught exception: {}", err.pretty()); + Err(HttpError { + code: 500, + message: "Uncaught exception".to_owned(), + }) + } + }; - storage - .set_head(storage_head_ptr(b"state"), &instance) - .unwrap(); + self + .storage + .set_head(storage_head_ptr(b"state"), &instance) + .unwrap(); - res + res + } } -async fn get_body(req: &HttpRequest, mut payload: dev::Payload) -> Result { +async fn get_body(req: &HttpRequest, mut payload: dev::Payload) -> Result { let payload = web::Payload::from_request(req, &mut payload).await?; - let body = payload + payload .to_bytes_limited(1_024 * 1_024) .await - .map_err(|_| actix_web::error::PayloadError::Overflow)??; - - if body.is_empty() { - Ok(Val::Undefined) - } else { - match serde_json::from_slice::(&body) { - Ok(json_value) => Ok(Val::from_json(&json_value)), - Err(err) => Err(actix_web::error::ErrorBadRequest(err)), - } - } + .map_err(|_| actix_web::error::PayloadError::Overflow)? }