Skip to content

Commit

Permalink
Replace string tuple with Operation struct on the log
Browse files Browse the repository at this point in the history
  • Loading branch information
eaneto committed May 12, 2023
1 parent a5135fa commit a9e155d
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ use crate::{
CRLF,
};

struct Operation {
key: String,
value: String,
}

struct Entry {
value: String,
created_at: Instant,
}

type ShardedLog = Vec<RwLock<Vec<(String, String)>>>;
type ShardedLog = Vec<RwLock<Vec<Operation>>>;

pub struct ShardedStorage {
num_shards: usize,
Expand Down Expand Up @@ -242,7 +247,10 @@ impl ShardedStorage {
self.log.get(replica).unwrap()[shard_key]
.write()
.await
.push((key.to_string(), value.to_string()));
.push(Operation {
key: key.to_string(),
value: value.to_string(),
});
}
}

Expand All @@ -260,12 +268,7 @@ impl ShardedStorage {
self.clean_log().await;
}

async fn replicate_shard(
&self,
log: &[(String, String)],
log_offset: &mut u32,
replica: &String,
) {
async fn replicate_shard(&self, log: &[Operation], log_offset: &mut u32, replica: &String) {
trace!(log_offset = log_offset, "Current log offset");

let mut stream = match TcpStream::connect(&replica).await {
Expand Down Expand Up @@ -301,21 +304,25 @@ impl ShardedStorage {

async fn replicate_operation(
&self,
operation: &(String, String),
operation: &Operation,
stream: &mut TcpStream,
replicated_operations_by_shard: &mut u32,
) {
trace!(key = operation.0, value = operation.1, "Sending operation");
trace!(
key = operation.key,
value = operation.value,
"Sending operation"
);

let command = format!("SET {} {}{CRLF}", operation.0, operation.1);
let command = format!("SET {} {}{CRLF}", operation.key, operation.value);

match stream.write_all(command.as_bytes()).await {
Ok(_) => *replicated_operations_by_shard += 1,
Err(e) => {
// Ignore error and proceed with replication
error!(
key = operation.0,
value = operation.1,
key = operation.key,
value = operation.value,
"Error sending operation to replica {e}",
)
}
Expand Down

0 comments on commit a9e155d

Please sign in to comment.