From caa48e2b37840dc76e3c57ca8ec2d57da8117431 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Thu, 2 Nov 2023 15:58:29 +0100 Subject: [PATCH] Fix tests Signed-off-by: Heinz N. Gies --- src/connectors/prelude.rs | 2 +- src/raft/store.rs | 43 ++++++++++++++++++++++++++++ src/raft/store/statemachine.rs | 9 ++++++ src/raft/store/statemachine/apps.rs | 21 +++++++++++++- src/raft/store/statemachine/kv.rs | 17 +++++++++++ src/raft/store/statemachine/nodes.rs | 12 ++++++++ src/raft/test.rs | 14 ++++----- src/raft/test/learner.rs | 16 +++++------ tremor-value/src/known_key.rs | 2 +- 9 files changed, 118 insertions(+), 18 deletions(-) diff --git a/src/connectors/prelude.rs b/src/connectors/prelude.rs index c681d86067..b0cd599899 100644 --- a/src/connectors/prelude.rs +++ b/src/connectors/prelude.rs @@ -46,7 +46,7 @@ pub(crate) use tremor_common::{ }; pub(crate) use tremor_config::Impl; pub use tremor_config::NameWithConfig; -pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID}; +pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID}; pub(crate) use tremor_script::prelude::*; /// default buf size used for reading from files and streams (sockets etc) /// diff --git a/src/raft/store.rs b/src/raft/store.rs index 9b89ffc691..953db48c5c 100644 --- a/src/raft/store.rs +++ b/src/raft/store.rs @@ -477,6 +477,14 @@ impl Store { where T: for<'de> serde::Deserialize<'de>, { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(STORE).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; let read_txn = self.db.begin_read().map_err(r_err)?; let table = read_txn.open_table(STORE).map_err(r_err)?; let r = table.get(key).map_err(r_err)?; @@ -528,6 +536,14 @@ impl RaftLogReader for Store { &mut self, range: RB, ) -> StorageResult>> { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(LOGS).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; let read_txn = self.db.begin_read().map_err(r_err)?; let table = read_txn.open_table(LOGS).map_err(r_err)?; @@ -770,6 +786,15 @@ impl RaftStorage for Store { } async fn get_log_state(&mut self) -> StorageResult> { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(LOGS).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; + let read_txn = self.db.begin_read().map_err(r_err)?; let table = read_txn.open_table(LOGS).map_err(r_err)?; let last = table @@ -894,6 +919,15 @@ impl Store { /// # Errors /// if the store fails to read the RPC address pub fn get_self_addr(db: &Database) -> Result, Error> { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = db.begin_write()?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; + let read_txn = db.begin_read().map_err(r_err)?; let table = read_txn.open_table(SYSTEM).map_err(r_err)?; @@ -904,6 +938,15 @@ impl Store { /// # Errors /// if the store fails to read the node id pub fn get_self_node_id(db: &Database) -> Result, Error> { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = db.begin_write()?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; + let read_txn = db.begin_read().map_err(r_err)?; let table = read_txn.open_table(SYSTEM).map_err(r_err)?; diff --git a/src/raft/store/statemachine.rs b/src/raft/store/statemachine.rs index 6941095319..b258c29051 100644 --- a/src/raft/store/statemachine.rs +++ b/src/raft/store/statemachine.rs @@ -164,6 +164,15 @@ impl TremorStateMachine { where T: for<'de> serde::Deserialize<'de>, { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(STATE_MACHINE).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; + let read_txn = self.db.begin_read().map_err(r_err)?; let table = read_txn.open_table(STATE_MACHINE).map_err(r_err)?; diff --git a/src/raft/store/statemachine/apps.rs b/src/raft/store/statemachine/apps.rs index ede6bddeea..b1d380cab1 100644 --- a/src/raft/store/statemachine/apps.rs +++ b/src/raft/store/statemachine/apps.rs @@ -90,6 +90,16 @@ impl RaftStateMachine for AppsStateMachine { apps: HashMap::new(), world: world.clone(), }; + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = db.begin_write()?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(APPS).map_err(w_err)?; + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(INSTANCES).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; let read_txn = db.begin_read()?; let apps = read_txn.open_table(APPS)?; @@ -99,6 +109,7 @@ impl RaftStateMachine for AppsStateMachine { me.load_archive(archive.value()) .map_err(|e| store::Error::Other(Box::new(e)))?; } + let instances = read_txn.open_table(INSTANCES)?; // load instances @@ -220,6 +231,14 @@ impl RaftStateMachine for AppsStateMachine { } fn as_snapshot(&self) -> StorageResult { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(APPS).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; let read_txn = self.db.begin_read().map_err(r_err)?; let apps = read_txn.open_table(APPS).map_err(r_err)?; let archives = apps @@ -300,7 +319,7 @@ impl AppsStateMachine { } write_txn.commit().map_err(w_err)?; - let app = StateApp { + let app: StateApp = StateApp { app, main, arena_indices, diff --git a/src/raft/store/statemachine/kv.rs b/src/raft/store/statemachine/kv.rs index fbcc052c57..658719a94c 100644 --- a/src/raft/store/statemachine/kv.rs +++ b/src/raft/store/statemachine/kv.rs @@ -52,6 +52,14 @@ impl KvStateMachine { /// try to obtain the value at the given `key`. /// Returns `Ok(None)` if there is no value for that key. pub(crate) fn get(&self, key: &str) -> StorageResult>> { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(DATA).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; let read_txn = self.db.begin_read().map_err(r_err)?; let table = read_txn.open_table(DATA).map_err(r_err)?; table @@ -85,6 +93,15 @@ impl RaftStateMachine for KvStateMachine { } fn as_snapshot(&self) -> StorageResult { + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = self.db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(DATA).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; + let read_tnx = self.db.begin_read().map_err(w_err)?; let table = read_tnx.open_table(DATA).map_err(w_err)?; let data = table diff --git a/src/raft/store/statemachine/nodes.rs b/src/raft/store/statemachine/nodes.rs index a489ad4a18..d5159c53da 100644 --- a/src/raft/store/statemachine/nodes.rs +++ b/src/raft/store/statemachine/nodes.rs @@ -56,6 +56,17 @@ impl RaftStateMachine for NodesStateMachine { Self: std::marker::Sized, { // load known nodes + + // We need to use a write transaction despite just wanting a read transaction due to + // https://github.com/cberner/redb/issues/711 + let bug_fix_txn = db.begin_write().map_err(w_err)?; + { + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?; + // ALLOW: this is just a workaround + let _argh = bug_fix_txn.open_table(NODES).map_err(w_err)?; + } + bug_fix_txn.commit().map_err(w_err)?; let read_txn = db.begin_read().map_err(r_err)?; let table = read_txn.open_table(SYSTEM).map_err(r_err)?; @@ -66,6 +77,7 @@ impl RaftStateMachine for NodesStateMachine { debug!("No next_node_id stored in db, starting from 0"); 0 }; + let table = read_txn.open_table(NODES).map_err(r_err)?; let known_nodes = table diff --git a/src/raft/test.rs b/src/raft/test.rs index 037e31efc6..84373eb119 100644 --- a/src/raft/test.rs +++ b/src/raft/test.rs @@ -115,9 +115,9 @@ async fn cluster_join_test() -> ClusterResult<()> { let dir0 = tempfile::tempdir()?; let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; - let node0 = TestNode::bootstrap(dir0.path()).await?; - let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?; - let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?; + let node0 = TestNode::bootstrap(dir0.path().join("db")).await?; + let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?; + let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?; // all see the same leader let client0 = node0.client(); @@ -162,9 +162,9 @@ async fn kill_and_restart_voter() -> ClusterResult<()> { let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; - let node0 = TestNode::bootstrap(dir0.path()).await?; - let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?; - let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?; + let node0 = TestNode::bootstrap(dir0.path().join("db")).await?; + let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?; + let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?; let client0 = node0.client(); let metrics = client0.metrics().await?; @@ -181,7 +181,7 @@ async fn kill_and_restart_voter() -> ClusterResult<()> { tokio::time::sleep(Duration::from_millis(500)).await; // restart the node - let node1 = TestNode::just_start(dir1.path()).await?; + let node1 = TestNode::just_start(dir1.path().join("db")).await?; // check that the leader is available // TODO: solidify to guard against timing issues diff --git a/src/raft/test/learner.rs b/src/raft/test/learner.rs index 6ed7bc24b1..aa0779e47c 100644 --- a/src/raft/test/learner.rs +++ b/src/raft/test/learner.rs @@ -25,9 +25,9 @@ async fn add_learner_test() -> ClusterResult<()> { let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; let dir3 = tempfile::tempdir()?; - let node0 = TestNode::bootstrap(dir0.path()).await?; - let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?; - let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?; + let node0 = TestNode::bootstrap(dir0.path().join("db")).await?; + let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?; + let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?; let client0 = node0.client(); let metrics = client0.metrics().await?; let members = metrics @@ -38,7 +38,7 @@ async fn add_learner_test() -> ClusterResult<()> { .expect("No nodes in membership config"); assert_eq!(3, members.len()); - let learner_node = TestNode::join_as_learner(dir3.path(), &node0.addr).await?; + let learner_node = TestNode::join_as_learner(dir3.path().join("db"), &node0.addr).await?; let (learner_node_id, learner_addr) = learner_node.running.node_data(); // learner is known to the cluster let nodemap = client0.get_nodes().await?; @@ -82,9 +82,9 @@ async fn learner_runs_app() -> ClusterResult<()> { let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; let dir3 = tempfile::tempdir()?; - let node0 = TestNode::bootstrap(dir0.path()).await?; - let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?; - let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?; + let node0 = TestNode::bootstrap(dir0.path().join("db")).await?; + let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?; + let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?; let client0 = node0.client(); let metrics = client0.metrics().await?; let members = metrics @@ -95,7 +95,7 @@ async fn learner_runs_app() -> ClusterResult<()> { .expect("No nodes in membership config"); assert_eq!(3, members.len()); - let learner_node = TestNode::join_as_learner(dir3.path(), &node0.addr).await?; + let learner_node = TestNode::join_as_learner(dir3.path().join("db"), &node0.addr).await?; let (_learner_node_id, _learner_addr) = learner_node.running.node_data(); let tmpfile = tempfile::NamedTempFile::new()?; let out_path = tmpfile.into_temp_path(); diff --git a/tremor-value/src/known_key.rs b/tremor-value/src/known_key.rs index 0965c60ce5..bbd8a91307 100644 --- a/tremor-value/src/known_key.rs +++ b/tremor-value/src/known_key.rs @@ -17,7 +17,7 @@ use beef::Cow; use halfbrown::RawEntryMut; use simd_json::ObjectHasher; use std::fmt; -use std::hash::{BuildHasher}; +use std::hash::BuildHasher; use value_trait::prelude::*; /// Well known key that can be looked up in a `Value` faster.