Skip to content

Commit

Permalink
Issue #220: file-manager "final" version: merge old data to stack and…
Browse files Browse the repository at this point in the history
… redirect get/set request to stack non functions
  • Loading branch information
weiqiushi committed Apr 19, 2023
1 parent fd2806b commit bc83a41
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 128 deletions.
2 changes: 1 addition & 1 deletion src/component/cyfs-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ simple_logger = '2.1'
intbits = '0.2'
itertools = '0.10'
base-x = '0.2.0'
sqlx = { version = '0.5', default-features = false, optional = true }
sqlx = { version = '*', default-features = false, optional = true, features = ["runtime-async-std-rustls"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
zip = '0.6'
Expand Down
8 changes: 5 additions & 3 deletions src/service/file-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ description = "Rust file-manager package"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lazy_static = "1.4"
tide = "0.16"
async-std = { version = "1.11", features = ["unstable", "attributes"] }
cyfs-base = { path = "../../component/cyfs-base" }
cyfs-base = { path = "../../component/cyfs-base", features = ["sqlx-error"] }
cyfs-core = {path="../../component/cyfs-core"}
cyfs-util = { path = "../../component/cyfs-util" }
rusqlite = { version = "0.27.0", features = ["bundled", "blob"] }
sqlx = {version="0.6", features = ["sqlite", "runtime-async-std-rustls"]}
log = "0.4"
cyfs-debug = { path = "../../component/cyfs-debug" }
cyfs-lib = {path="../../component/cyfs-lib"}
serde = "1"
90 changes: 44 additions & 46 deletions src/service/file-manager/src/file_manager.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,56 @@
use lazy_static::lazy_static;
use async_std::sync::{Mutex};
use rusqlite::{Connection, params};
use std::path::{Path, PathBuf};
use cyfs_base::{BuckyResult, AnyNamedObject, RawConvertTo, ObjectId, RawFrom};
use std::path::{Path};
use std::str::FromStr;
use async_std::prelude::StreamExt;
use log::LevelFilter;
use sqlx::{ConnectOptions, Executor, Pool, Row, Sqlite};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use cyfs_base::{BuckyResult, ObjectId, AccessString};
use cyfs_lib::{NONPutObjectOutputRequest, SharedCyfsStack};

pub struct FileManager {
database: Option<PathBuf>,
database: Pool<Sqlite>,
}

const CREATE_TABLE: &str = r#"
CREATE TABLE IF NOT EXISTS "file_desc" (
"id" TEXT NOT NULL UNIQUE,
"desc" BLOB NOT NULL,
PRIMARY KEY("id")
);
"#;

const INSERT: &str = r#"
INSERT OR REPLACE INTO file_desc VALUES (?1, ?2);
"#;

const SELECT: &str = r#"
SELECT desc from file_desc where id=?1;
SELECT id, desc from file_desc;
"#;

impl FileManager {
pub fn new() -> FileManager { FileManager { database:None }}
pub async fn merge(database: &Path, stack: SharedCyfsStack) -> BuckyResult<()> {
let mut options = SqliteConnectOptions::new().filename(database).create_if_missing(false).read_only(true);
options.log_statements(LevelFilter::Off);
let pool = SqlitePoolOptions::new().max_connections(10).connect_with(options).await?;

let mut stream = pool.fetch(SELECT);
while let Some(row) = stream.next().await {
let row = row?;
let id: String = row.try_get("id")?;
let desc: Vec<u8> = row.try_get("desc")?;
match ObjectId::from_str(&id) {
Ok(id) => {
let mut request = NONPutObjectOutputRequest::new_noc(id, desc);
request.access = Some(AccessString::full());
match stack.non_service().put_object(request).await {
Ok(resp) => {
info!("insert obj {} to stack result {}", &id, resp.result.to_string());
}
Err(e) => {
error!("insert obj {} to stack err {}, skip", &id, e);
}
}
}
Err(e) => {
error!("decode object id {} err {}, skip it", &id, e);
}
}
}

info!("insert all object to stack complete, delete database file {}", database.display());

pool.close().await;

std::fs::remove_file(database)?;

pub fn init(&mut self, database: &Path) -> BuckyResult<()> {
self.database = Some(PathBuf::from(database));
let conn = Connection::open(self.database.as_ref().unwrap())?;
conn.execute(CREATE_TABLE, [])?;
Ok(())
}

pub async fn set(&self, id: &ObjectId, desc: &AnyNamedObject) -> BuckyResult<()> {
let data = desc.to_vec()?;
let conn = Connection::open(self.database.as_ref().unwrap())?;
conn.execute(INSERT, params![id.to_string(), data])?;
Ok(())
}

pub async fn get(&self, id: &ObjectId) -> BuckyResult<AnyNamedObject> {
let conn = Connection::open(self.database.as_ref().unwrap())?;
let desc_buf = conn.query_row(SELECT, params![id.to_string()], |row| -> rusqlite::Result<Vec<u8>>{
Ok(row.get(0)?)
})?;
let desc = AnyNamedObject::clone_from_slice(&desc_buf)?;
Ok(desc)
}
}

lazy_static! {
pub static ref FILE_MANAGER: Mutex<FileManager> = {
return Mutex::new(FileManager::new());
};
}
125 changes: 47 additions & 78 deletions src/service/file-manager/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
#![windows_subsystem = "windows"]

use crate::file_manager::FILE_MANAGER;
use std::borrow::Cow;
use crate::file_manager::{FileManager};
use tide::{Request, Response, StatusCode};
use cyfs_base::{BuckyResult, FILE_MANAGER_NAME, FILE_MANAGER_PORT, AnyNamedObject, RawFrom, ObjectId, RawConvertTo};
use cyfs_base::{BuckyResult, FILE_MANAGER_NAME, FILE_MANAGER_PORT, AnyNamedObject, RawFrom, ObjectId, RawConvertTo, AccessString};
use std::str::FromStr;
use tide::prelude::*;
use cyfs_core::get_system_dec_app;
use cyfs_lib::{NONGetObjectRequest, NONPutObjectRequest, SharedCyfsStack};

mod file_manager;
mod gateway_helper;

#[macro_use]
extern crate log;

fn find_param_from_req<'a>(req: &'a Request<()>, param: &str) -> Option<Cow<'a, str>> {
match req.url().query_pairs().find(|(x, _)| x == param) {
None => {
error!(
"can`t find param {} from query {}",
param,
req.url().query().unwrap_or("NULL")
);
None
}
Some((_, v)) => Some(v),
}
#[derive(Deserialize)]
struct GetParam {
fileid: String
}

async fn decode_desc_from_req(req: &mut Request<()>) -> BuckyResult<AnyNamedObject> {
async fn decode_desc_from_req(req: &mut Request<SharedCyfsStack>) -> BuckyResult<AnyNamedObject> {
let desc_buf = req.body_bytes().await?;
Ok(AnyNamedObject::clone_from_slice(&desc_buf)?)
}

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
async fn main() -> BuckyResult<()> {
cyfs_util::process::check_cmd_and_exec(FILE_MANAGER_NAME);

cyfs_debug::CyfsLoggerBuilder::new_service(FILE_MANAGER_NAME)
.level("debug")
.console("debug")
.enable_bdt(Some("debug"), Some("debug"))
.level("info")
.console("info")
.build()
.unwrap()
.start();
Expand All @@ -47,80 +39,57 @@ async fn main() -> Result<(), std::io::Error> {
.build()
.start();


let mut database = cyfs_util::get_cyfs_root_path().join("data").join(FILE_MANAGER_NAME);
let _ = std::fs::create_dir_all(&database).map_err(|e| {
error!("create database dir {} failed.", database.display());
let stack = SharedCyfsStack::open_default(Some(get_system_dec_app().clone())).await.map_err(|e| {
error!("open shared stack err {}", e);
e
})?;
database.push("file.sqlite");

info!("database is:{}", database.display());

let _ = FILE_MANAGER.lock().await.init(&database).map_err(|e| {
error!("init file manager failed, msg:{}", e.to_string());
std::io::Error::from(std::io::ErrorKind::Interrupted)
})?;

let database = cyfs_util::get_cyfs_root_path().join("data").join(FILE_MANAGER_NAME).join("file.sqlite");
if database.exists() {
info!("find old file-manager database {}, merge to cyfs stack", database.display());
if let Err(e) = FileManager::merge(&database, stack.clone()).await {
error!("merge old database to stack err {}, try re-merge at next startup", e);
}
}

gateway_helper::register();

let mut app = tide::new();

app.at("/get_file").get(move |req: Request<()>| async move {
loop {
let fileid_ret = find_param_from_req(&req, "fileid");
if fileid_ret.is_none() {
break;
}
let mut app = tide::with_state(stack.clone());

if let Ok(file_id) = ObjectId::from_str(fileid_ret.as_ref().unwrap()) {
let file_manager = FILE_MANAGER.lock().await;

match file_manager.get(&file_id).await {
Ok(file_desc) => {
let mut resp = Response::new(StatusCode::Ok);
match file_desc.to_vec() {
Ok(buf) => {
resp.set_body(buf);
return Ok(resp);
}
Err(e) => {
error!("encode file_desc err {}", e);
}
}
}
Err(e) => {
error!("get file {} desc error: {}", file_id, e.to_string());
}
app.at("/get_file").get(move |req: Request<SharedCyfsStack>| async move {
let param = req.query::<GetParam>()?;
if let Ok(file_id) = ObjectId::from_str(&param.fileid) {
match req.state().non_service().get_object(NONGetObjectRequest::new_noc(file_id, None)).await {
Ok(file_desc) => {
let mut resp = Response::new(StatusCode::Ok);
resp.set_body(file_desc.object.object_raw);
return Ok(resp);
}
Err(e) => {
error!("get file {} desc error: {}", file_id, e.to_string());
}
} else {
error!("invaild fileid {}", fileid_ret.as_ref().unwrap());
}

break;
} else {
error!("invaild fileid {}", param.fileid);
}

Ok(Response::new(StatusCode::BadRequest))
});

app.at("/set_file")
.post(move |mut req: Request<()>| async move {
match decode_desc_from_req(&mut req).await {
Ok(desc) => {
let id = desc.calculate_id();
let file_manager = FILE_MANAGER.lock().await;
match file_manager.set(&id, &desc).await {
Ok(_) => {
info!("set desc {} success", &id);
return Ok(Response::new(StatusCode::Ok));
}
Err(e) => {
error!("set desc {} failed, err {}", id, e);
}
}
.post(move |mut req: Request<SharedCyfsStack>| async move {
let desc_buf = req.body_bytes().await?;
let desc = AnyNamedObject::clone_from_slice(&desc_buf)?;
let id = desc.calculate_id();
let mut request = NONPutObjectRequest::new_noc(id.clone(), desc.to_vec().unwrap());
request.access = Some(AccessString::full());
match req.state().non_service().put_object(request).await {
Ok(resp) => {
info!("set desc {} success, resp {}", &id, resp.result.to_string());
return Ok(Response::new(StatusCode::Ok));
}
Err(e) => {
error!("decode filedesc error: {}", e);
error!("set desc {} failed, err {}", id, e);
}
}

Expand Down

0 comments on commit bc83a41

Please sign in to comment.