Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Nov 2, 2023
1 parent 67b29ed commit caa48e2
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/connectors/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) use tremor_common::{
};
pub(crate) use tremor_config::Impl;
pub use tremor_config::NameWithConfig;
pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID};
pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID};
pub(crate) use tremor_script::prelude::*;
/// default buf size used for reading from files and streams (sockets etc)
///
Expand Down
43 changes: 43 additions & 0 deletions src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,14 @@ impl Store {
where
T: for<'de> serde::Deserialize<'de>,
{
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(STORE).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;
let table = read_txn.open_table(STORE).map_err(r_err)?;
let r = table.get(key).map_err(r_err)?;
Expand Down Expand Up @@ -528,6 +536,14 @@ impl RaftLogReader<TremorRaftConfig> for Store {
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TremorRaftConfig>>> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(LOGS).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(LOGS).map_err(r_err)?;
Expand Down Expand Up @@ -770,6 +786,15 @@ impl RaftStorage<TremorRaftConfig> for Store {
}

async fn get_log_state(&mut self) -> StorageResult<LogState<TremorRaftConfig>> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(LOGS).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = self.db.begin_read().map_err(r_err)?;
let table = read_txn.open_table(LOGS).map_err(r_err)?;
let last = table
Expand Down Expand Up @@ -894,6 +919,15 @@ impl Store {
/// # Errors
/// if the store fails to read the RPC address
pub fn get_self_addr(db: &Database) -> Result<Option<Addr>, Error> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write()?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(SYSTEM).map_err(r_err)?;
Expand All @@ -904,6 +938,15 @@ impl Store {
/// # Errors
/// if the store fails to read the node id
pub fn get_self_node_id(db: &Database) -> Result<Option<crate::raft::NodeId>, Error> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write()?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(SYSTEM).map_err(r_err)?;
Expand Down
9 changes: 9 additions & 0 deletions src/raft/store/statemachine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ impl TremorStateMachine {
where
T: for<'de> serde::Deserialize<'de>,
{
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(STATE_MACHINE).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = self.db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(STATE_MACHINE).map_err(r_err)?;
Expand Down
21 changes: 20 additions & 1 deletion src/raft/store/statemachine/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
apps: HashMap::new(),
world: world.clone(),
};
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write()?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(APPS).map_err(w_err)?;
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(INSTANCES).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = db.begin_read()?;
let apps = read_txn.open_table(APPS)?;
Expand All @@ -99,6 +109,7 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
me.load_archive(archive.value())
.map_err(|e| store::Error::Other(Box::new(e)))?;

Check warning on line 110 in src/raft/store/statemachine/apps.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/apps.rs#L108-L110

Added lines #L108 - L110 were not covered by tests
}

let instances = read_txn.open_table(INSTANCES)?;

// load instances
Expand Down Expand Up @@ -220,6 +231,14 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
}

Check warning on line 231 in src/raft/store/statemachine/apps.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/apps.rs#L230-L231

Added lines #L230 - L231 were not covered by tests

fn as_snapshot(&self) -> StorageResult<AppsSnapshot> {

Check warning on line 233 in src/raft/store/statemachine/apps.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/apps.rs#L233

Added line #L233 was not covered by tests
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(APPS).map_err(w_err)?;

Check warning on line 239 in src/raft/store/statemachine/apps.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/apps.rs#L236-L239

Added lines #L236 - L239 were not covered by tests
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;
let apps = read_txn.open_table(APPS).map_err(r_err)?;
let archives = apps
Expand Down Expand Up @@ -300,7 +319,7 @@ impl AppsStateMachine {
}
write_txn.commit().map_err(w_err)?;

let app = StateApp {
let app: StateApp = StateApp {
app,
main,
arena_indices,
Expand Down
17 changes: 17 additions & 0 deletions src/raft/store/statemachine/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl KvStateMachine {
/// try to obtain the value at the given `key`.
/// Returns `Ok(None)` if there is no value for that key.
pub(crate) fn get(&self, key: &str) -> StorageResult<Option<Vec<u8>>> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(DATA).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;
let table = read_txn.open_table(DATA).map_err(r_err)?;
table
Expand Down Expand Up @@ -85,6 +93,15 @@ impl RaftStateMachine<KvSnapshot, KvRequest> for KvStateMachine {
}

Check warning on line 93 in src/raft/store/statemachine/kv.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/kv.rs#L91-L93

Added lines #L91 - L93 were not covered by tests

fn as_snapshot(&self) -> StorageResult<KvSnapshot> {

Check warning on line 95 in src/raft/store/statemachine/kv.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/kv.rs#L95

Added line #L95 was not covered by tests
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(DATA).map_err(w_err)?;

Check warning on line 101 in src/raft/store/statemachine/kv.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/kv.rs#L98-L101

Added lines #L98 - L101 were not covered by tests
}
bug_fix_txn.commit().map_err(w_err)?;

Check warning on line 103 in src/raft/store/statemachine/kv.rs

View check run for this annotation

Codecov / codecov/patch

src/raft/store/statemachine/kv.rs#L103

Added line #L103 was not covered by tests

let read_tnx = self.db.begin_read().map_err(w_err)?;
let table = read_tnx.open_table(DATA).map_err(w_err)?;
let data = table
Expand Down
12 changes: 12 additions & 0 deletions src/raft/store/statemachine/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ impl RaftStateMachine<NodesSnapshot, NodesRequest> for NodesStateMachine {
Self: std::marker::Sized,
{
// load known nodes

// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write().map_err(w_err)?;
{
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?;
// ALLOW: this is just a workaround
let _argh = bug_fix_txn.open_table(NODES).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(SYSTEM).map_err(r_err)?;
Expand All @@ -66,6 +77,7 @@ impl RaftStateMachine<NodesSnapshot, NodesRequest> for NodesStateMachine {
debug!("No next_node_id stored in db, starting from 0");
0
};

let table = read_txn.open_table(NODES).map_err(r_err)?;

let known_nodes = table
Expand Down
14 changes: 7 additions & 7 deletions src/raft/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ async fn cluster_join_test() -> ClusterResult<()> {
let dir0 = tempfile::tempdir()?;
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;

// all see the same leader
let client0 = node0.client();
Expand Down Expand Up @@ -162,9 +162,9 @@ async fn kill_and_restart_voter() -> ClusterResult<()> {
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;

let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;

let client0 = node0.client();
let metrics = client0.metrics().await?;
Expand All @@ -181,7 +181,7 @@ async fn kill_and_restart_voter() -> ClusterResult<()> {
tokio::time::sleep(Duration::from_millis(500)).await;

// restart the node
let node1 = TestNode::just_start(dir1.path()).await?;
let node1 = TestNode::just_start(dir1.path().join("db")).await?;

// check that the leader is available
// TODO: solidify to guard against timing issues
Expand Down
16 changes: 8 additions & 8 deletions src/raft/test/learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ async fn add_learner_test() -> ClusterResult<()> {
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
let dir3 = tempfile::tempdir()?;
let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;
let client0 = node0.client();
let metrics = client0.metrics().await?;
let members = metrics
Expand All @@ -38,7 +38,7 @@ async fn add_learner_test() -> ClusterResult<()> {
.expect("No nodes in membership config");
assert_eq!(3, members.len());

let learner_node = TestNode::join_as_learner(dir3.path(), &node0.addr).await?;
let learner_node = TestNode::join_as_learner(dir3.path().join("db"), &node0.addr).await?;
let (learner_node_id, learner_addr) = learner_node.running.node_data();
// learner is known to the cluster
let nodemap = client0.get_nodes().await?;
Expand Down Expand Up @@ -82,9 +82,9 @@ async fn learner_runs_app() -> ClusterResult<()> {
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
let dir3 = tempfile::tempdir()?;
let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;
let client0 = node0.client();
let metrics = client0.metrics().await?;
let members = metrics
Expand All @@ -95,7 +95,7 @@ async fn learner_runs_app() -> ClusterResult<()> {
.expect("No nodes in membership config");
assert_eq!(3, members.len());

let learner_node = TestNode::join_as_learner(dir3.path(), &node0.addr).await?;
let learner_node = TestNode::join_as_learner(dir3.path().join("db"), &node0.addr).await?;
let (_learner_node_id, _learner_addr) = learner_node.running.node_data();
let tmpfile = tempfile::NamedTempFile::new()?;
let out_path = tmpfile.into_temp_path();
Expand Down
2 changes: 1 addition & 1 deletion tremor-value/src/known_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use beef::Cow;
use halfbrown::RawEntryMut;
use simd_json::ObjectHasher;
use std::fmt;
use std::hash::{BuildHasher};
use std::hash::BuildHasher;
use value_trait::prelude::*;

/// Well known key that can be looked up in a `Value` faster.
Expand Down

0 comments on commit caa48e2

Please sign in to comment.