Skip to content

Commit

Permalink
Cleanup kv
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed May 5, 2023
1 parent e5b0241 commit aea52d4
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 74 deletions.
31 changes: 8 additions & 23 deletions src/connectors/impls/cluster_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,6 @@ struct KvSink {
}

impl KvSink {
fn decode(&mut self, mut v: Option<Vec<u8>>, ingest_ns: u64) -> Result<Value<'static>> {
if let Some(v) = v.as_mut() {
let data: &mut [u8] = v.as_mut_slice();
// TODO: We could optimize this
Ok(self
.codec
.decode(data, ingest_ns)?
.unwrap_or_default()
.into_static())
} else {
Ok(Value::null())
}
}
fn encode(&mut self, v: &Value) -> Result<Vec<u8>> {
self.codec.encode(v)
}
Expand All @@ -275,7 +262,7 @@ impl KvSink {
cmd: Command,
op_name: &'static str,
value: &Value<'a>,
ingest_ns: u64,
_ingest_ns: u64,
) -> Result<Vec<(Value<'static>, Value<'static>)>> {
match cmd {
Command::Get { key, strict } => {
Expand All @@ -287,15 +274,13 @@ impl KvSink {
];
let combined_key = key_parts.join(".");

self.decode(
if strict {
self.raft.kv_get(combined_key).await?
} else {
self.raft.kv_get_local(combined_key).await?
},
ingest_ns,
)
.map(|v| oks(op_name, key, v))
let value = if strict {
self.raft.kv_get(combined_key).await?
} else {
self.raft.kv_get_local(combined_key).await?
}
.map_or(Value::const_null(), Value::from);
Ok(oks(op_name, key, value))
}
Command::Put { key } => {
// return the new value
Expand Down
4 changes: 2 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ pub type NodeId = u64;
/// When the config isn't valid
pub fn config() -> ClusterResult<Config> {
let config = Config {
// heartbeat_interval: 250,
// election_timeout_min: 299,
heartbeat_interval: 150,
election_timeout_min: 299,
..Default::default()
};
Ok(config.validate()?)
Expand Down
21 changes: 6 additions & 15 deletions src/raft/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@

//! Tremor Rest API Client
use crate::ids::{AppFlowInstanceId, AppId, FlowDefinitionId};
use crate::raft::{
api::apps::AppState,
node::Addr,
store::{TremorInstanceState, TremorResponse},
NodeId,
};
use crate::raft::{api::apps::AppState, node::Addr, store::TremorInstanceState, NodeId};
use crate::{errors::Result, raft::store::TremorStart};
use halfbrown::HashMap;
use openraft::{LogId, RaftMetrics};
Expand Down Expand Up @@ -126,10 +121,8 @@ impl Tremor {
///
/// # Errors
/// if the api call fails
pub async fn read(&self, req: &str) -> ClientResult<Vec<u8>> {
let tremor_res: TremorResponse =
self.api_req("api/kv/read", Method::POST, Some(req)).await?;
Ok(tremor_res.into_kv_value()?)
pub async fn read(&self, req: &str) -> ClientResult<OwnedValue> {
self.api_req("api/kv/read", Method::POST, Some(req)).await
}

/// Consistent Read value by key.
Expand All @@ -138,11 +131,9 @@ impl Tremor {
///
/// # Errors
/// if the api call fails
pub async fn consistent_read(&self, req: &str) -> ClientResult<Vec<u8>> {
let tremor_res: TremorResponse = self
.api_req("api/kv/consistent_read", Method::POST, Some(req))
.await?;
Ok(tremor_res.into_kv_value()?)
pub async fn consistent_read(&self, req: &str) -> ClientResult<OwnedValue> {
self.api_req("api/kv/consistent_read", Method::POST, Some(req))
.await
}
}

Expand Down
36 changes: 9 additions & 27 deletions src/raft/api/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::raft::{
api::{APIError, APIRequest, APIResult, ToAPIResult},
store::TremorSet,
};
use crate::raft::api::{APIError, APIRequest, APIResult};
use axum::{extract, routing::post, Json, Router};
use http::StatusCode;
use simd_json::OwnedValue;
Expand All @@ -41,21 +38,12 @@ pub struct KVSet {

async fn write(
extract::State(state): extract::State<APIRequest>,
extract::OriginalUri(uri): extract::OriginalUri,
extract::Json(body): extract::Json<KVSet>,
) -> APIResult<Vec<u8>> {
let tremor_set = TremorSet {
key: body.key,
value: simd_json::to_vec(&body.value)?,
};
let res = state
.raft
.client_write(tremor_set.into())
.await
.to_api_result(&uri, &state)
.await?;

Ok(res.data.into_kv_value()?)
Ok(state
.raft_manager
.kv_set(body.key, simd_json::to_vec(&body.value)?)
.await?)
}

/// read a value from the current node, not necessarily the leader, thus this value can be stale
Expand All @@ -64,8 +52,8 @@ async fn read(
extract::Json(key): extract::Json<String>,
) -> APIResult<Json<OwnedValue>> {
let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??;
if let Some(mut value) = value {
Ok(Json(simd_json::from_slice(&mut value)?))
if let Some(value) = value {
Ok(Json(value))
} else {
Err(APIError::HTTP {
status: StatusCode::NOT_FOUND,
Expand All @@ -77,17 +65,11 @@ async fn read(
/// read a value from the leader. If this request is received by another node, it will return a redirect
async fn consistent_read(
extract::State(state): extract::State<APIRequest>,
extract::OriginalUri(uri): extract::OriginalUri,
extract::Json(key): extract::Json<String>,
) -> APIResult<Json<OwnedValue>> {
// this will fail if we are not a leader
state.ensure_leader(Some(uri.clone())).await?;
// here we are safe to read
let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??;
// Ensure that we are still the leader at the end of the read so we can guarantee freshness
state.ensure_leader(Some(uri)).await?;
if let Some(mut value) = value {
Ok(Json(simd_json::from_slice(&mut value)?))
if let Some(value) = value {
Ok(Json(value))
} else {
Err(APIError::HTTP {
status: StatusCode::NOT_FOUND,
Expand Down
11 changes: 8 additions & 3 deletions src/raft/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use openraft::{
error::{CheckIsLeaderError, ForwardToLeader, RaftError},
raft::ClientWriteResponse,
};
use simd_json::OwnedValue;

use crate::raft::api::APIStoreReq;
use crate::Result;
Expand Down Expand Up @@ -124,6 +125,7 @@ impl Cluster {
self.send(command).await?;
Ok(rx.await?)
}

pub(crate) async fn get_apps_local(
&self,
) -> Result<HashMap<AppId, crate::raft::api::apps::AppState>> {
Expand Down Expand Up @@ -164,7 +166,7 @@ impl Cluster {
let tremor_res = self.client_write(TremorSet { key, value }).await?;
tremor_res.data.into_kv_value()
}
pub(crate) async fn kv_get(&self, key: String) -> Result<Option<Vec<u8>>> {
pub(crate) async fn kv_get(&self, key: String) -> Result<Option<OwnedValue>> {
match self.is_leader().await {
Ok(_) => self.kv_get_local(key).await,
Err(Error(
Expand All @@ -187,10 +189,13 @@ impl Cluster {
Err(e) => Err(e),
}
}
pub(crate) async fn kv_get_local(&self, key: String) -> Result<Option<Vec<u8>>> {
pub(crate) async fn kv_get_local(&self, key: String) -> Result<Option<OwnedValue>> {
let (tx, rx) = oneshot();
let command = APIStoreReq::KVGet(key, tx);
self.send(command).await?;
Ok(rx.await?)
Ok(rx
.await?
.map(|mut v| simd_json::from_slice(&mut v))
.transpose()?)
}
}
6 changes: 2 additions & 4 deletions tremor-cli/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,14 @@ impl KvCommands {
key,
consistant: true,
} => {
let mut r = client.read(&key).await.map_err(|e| format!("error: {e}"))?;
let value: OwnedValue = simd_json::from_slice(&mut r)?;
let value = client.read(&key).await.map_err(|e| format!("error: {e}"))?;
println!("{}", simd_json::to_string_pretty(&value)?);
}
KvCommands::Get { key, .. } => {
let mut r = client
let value = client
.consistent_read(&key)
.await
.map_err(|e| format!("error: {e}"))?;
let value: OwnedValue = simd_json::from_slice(&mut r)?;
println!("{}", simd_json::to_string_pretty(&value)?);
}
KvCommands::Set { key, mut value } => {
Expand Down

0 comments on commit aea52d4

Please sign in to comment.