From c5f3cdbd00d1c9c58f437eaa0018fe154a6aa324 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Thu, 9 Nov 2023 11:53:41 +0100 Subject: [PATCH] Add tests Signed-off-by: Heinz N. Gies --- Cargo.lock | 232 +++++---- Cargo.toml | 1 + k8s/README.md | 2 +- packaging/cross_build.sh | 2 +- src/connectors.rs | 10 +- src/connectors/impls/cluster_kv.rs | 653 +++++++++++++++++++++----- src/connectors/sink.rs | 7 +- src/connectors/source.rs | 4 +- src/pipeline.rs | 15 +- src/raft.rs | 10 +- src/raft/api/apps.rs | 12 +- src/raft/api/client.rs | 431 +++++++++++++++-- src/raft/archive.rs | 8 + src/raft/manager.rs | 698 +++++++++++++++++++++------- src/raft/store.rs | 422 +++++++++-------- src/raft/store/statemachine/apps.rs | 17 +- src/system.rs | 8 +- src/system/flow.rs | 69 +-- src/system/flow_supervisor.rs | 4 +- tremor-script/src/ast/deploy.rs | 13 + tremor-script/src/ast/module.rs | 4 +- tremor-script/src/deploy.rs | 19 +- 22 files changed, 1995 insertions(+), 646 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c820b539a9..08696730c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,7 +95,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", - "getrandom 0.2.10", + "getrandom 0.2.11", "once_cell", "version_check", "zerocopy", @@ -249,6 +249,16 @@ dependencies = [ "term", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -357,7 +367,7 @@ dependencies = [ "log", "parking", "polling", - "rustix 0.37.26", + "rustix 0.37.27", "slab", "socket2 0.4.10", "waker-fn", @@ -380,7 +390,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -428,7 +438,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -458,7 +468,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -1123,9 +1133,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c79ad7fb2dd38f3dabd76b09c6a5a20c038fc0213ef1e9afd30eb777f120f019" +checksum = "542f33a8835a0884b006a0c3df3dadd99c0c3f296ed26c2fdc8028e01ad6230c" dependencies = [ "memchr", "serde", @@ -1370,7 +1380,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -1448,6 +1458,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "colored" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6" +dependencies = [ + "is-terminal", + "lazy_static", + "windows-sys 0.48.0", +] + [[package]] name = "combine" version = "4.6.6" @@ -1573,9 +1594,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4" +checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" dependencies = [ "libc", ] @@ -1748,9 +1769,9 @@ dependencies = [ [[package]] name = "crypto-mac" -version = "0.10.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bff07008ec701e8028e2ceb8f83f0e4274ee62bd2dbdc4fefff2e9a91824081a" +checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" dependencies = [ "generic-array", "subtle", @@ -2025,9 +2046,9 @@ checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" [[package]] name = "dyn-clone" -version = "1.0.14" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d2f3407d9a573d666de4b5bdf10569d73ca9478087346697dcbae6244bfbcd" +checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d" [[package]] name = "ecdsa" @@ -2135,7 +2156,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -2148,7 +2169,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -2172,9 +2193,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e" dependencies = [ "libc", "windows-sys 0.48.0", @@ -2399,7 +2420,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -2475,9 +2496,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", "js-sys", @@ -3099,9 +3120,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" dependencies = [ "wasm-bindgen", ] @@ -3265,9 +3286,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.149" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "libflate" @@ -3299,6 +3320,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" +dependencies = [ + "bitflags 2.4.1", + "libc", + "redox_syscall 0.4.1", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -3325,9 +3357,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" +checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" [[package]] name = "lock_api" @@ -3536,6 +3568,25 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockito" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8d3038e23466858569c2d30a537f691fa0d53b51626630ae08262943e3bbb8b" +dependencies = [ + "assert-json-diff", + "colored", + "futures", + "hyper", + "log", + "rand 0.8.5", + "regex", + "serde_json", + "serde_urlencoded 0.7.1", + "similar", + "tokio", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3761,9 +3812,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.57" +version = "0.10.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" +checksum = "7a257ad03cd8fb16ad4172fedf8094451e1af1c4b70097636ef2eac9a5f0cc33" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -3782,7 +3833,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -3793,9 +3844,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.93" +version = "0.9.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" +checksum = "40a4130519a360279579c2053038317e40eff64d13fd3f004f9e1b72b8a6aaf9" dependencies = [ "cc", "libc", @@ -4070,7 +4121,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -4428,7 +4479,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", ] [[package]] @@ -4545,12 +4596,12 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" dependencies = [ - "getrandom 0.2.10", - "redox_syscall 0.2.16", + "getrandom 0.2.11", + "libredox", "thiserror", ] @@ -4571,7 +4622,7 @@ checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -4706,7 +4757,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" dependencies = [ "cc", - "getrandom 0.2.10", + "getrandom 0.2.11", "libc", "spin 0.9.8", "untrusted 0.9.0", @@ -4807,9 +4858,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.26" +version = "0.37.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84f3f8f960ed3b5a59055428714943298bf3fa2d4a1d53135084e0544829d995" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" dependencies = [ "bitflags 1.3.2", "errno", @@ -4828,7 +4879,7 @@ dependencies = [ "bitflags 2.4.1", "errno", "libc", - "linux-raw-sys 0.4.10", + "linux-raw-sys 0.4.11", "windows-sys 0.48.0", ] @@ -5091,7 +5142,7 @@ checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -5249,7 +5300,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -5376,7 +5427,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5a3720326b20bf5b95b72dbbd133caae7e0dcf71eae8f6e6656e71a7e5c9aaa" dependencies = [ "ahash", - "getrandom 0.2.10", + "getrandom 0.2.11", "halfbrown", "lexical-core 0.8.5", "once_cell", @@ -5410,7 +5461,7 @@ dependencies = [ "proc-macro2", "quote", "simd-json", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -5419,6 +5470,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "similar" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597" + [[package]] name = "simple-mutex" version = "1.1.5" @@ -5478,9 +5535,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" [[package]] name = "snap" @@ -5656,14 +5713,14 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] name = "subtle" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "surf" @@ -5676,7 +5733,7 @@ dependencies = [ "cfg-if", "encoding_rs", "futures-util", - "getrandom 0.2.10", + "getrandom 0.2.11", "http-client", "http-types", "log", @@ -5702,9 +5759,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.38" +version = "2.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" dependencies = [ "proc-macro2", "quote", @@ -5875,7 +5932,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -5887,7 +5944,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", "test-case-core", ] @@ -5927,7 +5984,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -6088,7 +6145,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -6326,7 +6383,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -6612,6 +6669,7 @@ dependencies = [ "log", "matches", "mime", + "mockito", "openraft", "pin-project-lite", "pretty_assertions", @@ -6842,7 +6900,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -6931,9 +6989,9 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" [[package]] name = "universal-hash" -version = "0.4.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" dependencies = [ "generic-array", "subtle", @@ -7008,7 +7066,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", "serde", ] @@ -7128,9 +7186,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -7138,24 +7196,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +checksum = "9afec9963e3d0994cac82455b2b3502b81a7f40f9a0d32181f7528d9f4b43e02" dependencies = [ "cfg-if", "js-sys", @@ -7165,9 +7223,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -7175,22 +7233,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" [[package]] name = "wasm-streams" @@ -7207,9 +7265,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" dependencies = [ "js-sys", "wasm-bindgen", @@ -7474,9 +7532,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.17" +version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" +checksum = "829846f3e3db426d4cee4510841b71a8e58aa2a76b1132579487ae430ccd9c7b" dependencies = [ "memchr", ] @@ -7532,22 +7590,22 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" [[package]] name = "zerocopy" -version = "0.7.15" +version = "0.7.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81ba595b9f2772fbee2312de30eeb80ec773b4cb2f1e8098db024afadda6c06f" +checksum = "8cd369a67c0edfef15010f980c3cbe45d7f651deac2cd67ce097cd801de16557" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.15" +version = "0.7.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "772666c41fb6dceaf520b564b962d738a8e1a83b41bd48945f50837aed78bb1d" +checksum = "c2f140bda219a26ccc0cdb03dba58af72590c53b22642577d88a927bc5c87d6b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9470d63204..7529897851 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -211,6 +211,7 @@ tempfile = { version = "3.8" } test-case = "3.1" testcontainers = { version = "0.14", features = ["watchdog"] } bytes = "1" +mockito = "1" [features] default = ["integration"] diff --git a/k8s/README.md b/k8s/README.md index 07e55d6e4f..6c4997f4e4 100644 --- a/k8s/README.md +++ b/k8s/README.md @@ -3,7 +3,7 @@ Experimental, for testing, don't use in production or if you do fix all the thin Based on https://kubernetes.io/docs/tutorials/stateful-application/cassandra/ -this are purely notes +These are purely notes ``` minikube start --cpus 16 --memory 8192 --disk-size 50000mb diff --git a/packaging/cross_build.sh b/packaging/cross_build.sh index f3adc5a146..f0e7e044f6 100755 --- a/packaging/cross_build.sh +++ b/packaging/cross_build.sh @@ -113,7 +113,7 @@ echo "Successfully built the binary: ${TARGET_BIN}" # linking check echo "Printing linking information for the binary..." file "$TARGET_BIN" -ldd "$TARGET_BIN" || true +ldd "$TARGET_BIN" # back to the origin dir, just in case popd > /dev/null diff --git a/src/connectors.rs b/src/connectors.rs index 8967dc4750..4e8b7da6c5 100644 --- a/src/connectors.rs +++ b/src/connectors.rs @@ -294,7 +294,7 @@ pub(crate) trait Context: Display + Clone { fn connector_type(&self) -> &ConnectorType; /// gets the API sender - fn raft(&self) -> &raft::ClusterInterface; + fn raft(&self) -> Option<&raft::Cluster>; /// the application context fn app_ctx(&self) -> &AppContext; @@ -389,8 +389,8 @@ impl Context for ConnectorContext { &self.notifier } - fn raft(&self) -> &raft::ClusterInterface { - &self.app_ctx.raft + fn raft(&self) -> Option<&raft::Cluster> { + self.app_ctx.raft.as_ref() } fn app_ctx(&self) -> &AppContext { @@ -1342,8 +1342,8 @@ pub(crate) mod unit_tests { fn connector_type(&self) -> &ConnectorType { &self.t } - fn raft(&self) -> &raft::ClusterInterface { - &self.app_ctx.raft + fn raft(&self) -> Option<&raft::Cluster> { + self.app_ctx.raft.as_ref() } fn app_ctx(&self) -> &AppContext { &self.app_ctx diff --git a/src/connectors/impls/cluster_kv.rs b/src/connectors/impls/cluster_kv.rs index 50b24682a2..9da3dd8cb8 100644 --- a/src/connectors/impls/cluster_kv.rs +++ b/src/connectors/impls/cluster_kv.rs @@ -15,7 +15,7 @@ // #![cfg_attr(coverage, no_coverage)] use crate::{ channel::{bounded, Receiver, Sender}, - raft, + raft::{self, ClusterError}, }; use crate::{connectors::prelude::*, system::flow::AppContext}; use serde::Deserialize; @@ -51,49 +51,6 @@ enum Command { /// Event Payload: data to put here /// Response: the putted value if successful Put { key: String }, - // /// Format: - // /// ```json - // /// {"swap": "the-key"} - // /// ``` - // /// Event Payload: data to put here - // /// - // /// Response: the old value or `null` is there was no previous value for this key - // Swap { key: Vec }, - - // /// Format: - // /// ```json - // /// {"delete": "the-key"} - // /// ``` - // /// - // /// Response: the old value - // Delete { key: Vec }, - // /// Format: - // /// ```json - // /// { - // /// "start": "key1", - // /// "end": "key2", - // /// } - // /// ``` - // /// - // /// Response: 1 event for each value in the scanned range - // Scan { - // start: Vec, - // end: Option>, - // }, - // /// Format: - // /// ```json - // /// { - // /// "cas": "key", - // /// "old": "", - // /// } - // /// ``` - // /// EventPayload: event payload - // /// - // /// Response: `null` if the operation succeeded, an event on `err` if it failed - // Cas { - // key: Vec, - // old: Option<&'v Value<'v>>, - // }, } impl<'v> TryFrom<&'v Value<'v>> for Command { @@ -108,20 +65,6 @@ impl<'v> TryFrom<&'v Value<'v>> for Command { }) } else if let Some(key) = v.get_str("put").map(ToString::to_string) { Ok(Command::Put { key }) - // } else if let Some(key) = v.get_bytes("swap").map(<[u8]>::to_vec) { - // Ok(Command::Swap { key }) - // } else if let Some(key) = v.get_bytes("cas").map(<[u8]>::to_vec) { - // Ok(Command::Cas { - // key, - // old: v.get("old"), - // }) - // } else if let Some(key) = v.get_bytes("delete").map(<[u8]>::to_vec) { - // Ok(Command::Delete { key }) - // } else if let Some(start) = v.get_bytes("scan").map(<[u8]>::to_vec) { - // Ok(Command::Scan { - // start, - // end: v.get_bytes("end").map(<[u8]>::to_vec), - // }) } else { Err(Error::InvalidCommand(v.to_string()).into()) } @@ -133,20 +76,12 @@ impl Command { match self { Command::Get { .. } => "get", Command::Put { .. } => "put", - // Command::Swap { .. } => "swap", - // Command::Delete { .. } => "delete", - // Command::Scan { .. } => "scan", - // Command::Cas { .. } => "cas", } } fn key(&self) -> &str { match self { Command::Get { key, .. } | Command::Put { key, .. } => key, - // | Command::Swap { key, .. } - // | Command::Delete { key } - // | Command::Cas { key, .. } => Some(key.clone()), - // Command::Scan { .. } => None, } } } @@ -240,7 +175,9 @@ impl Connector for Kv { let sink = KvSink { alias: ctx.alias().clone(), app_ctx: ctx.app_ctx().clone(), - raft: ctx.raft().clone(), + raft: ctx + .raft() + .map_or_else(|| Err(ClusterError::RaftNotRunning), |v| Ok(v.clone()))?, tx: self.tx.clone(), codec, origin_uri, @@ -257,7 +194,7 @@ impl Connector for Kv { struct KvSink { alias: alias::Connector, app_ctx: AppContext, - raft: raft::ClusterInterface, + raft: raft::Cluster, tx: Sender, codec: Json, origin_uri: EventOriginUri, @@ -305,46 +242,7 @@ impl KvSink { let combined_key = key_parts.join("."); self.raft.kv_set(combined_key, value_vec).await?; Ok(oks(op_name, key, value.clone_static())) - } // Command::Swap { key } => { - // // return the old value - // let value = self.encode(value)?; - // let v = self.db.insert(&key, value)?; - // self.decode(v, ingest_ns) - // .map(|old_value| oks(op_name, key, old_value)) - // } - // Command::Delete { key } => self - // .decode(self.db.remove(&key)?, ingest_ns) - // .map(|v| oks(op_name, key, v)), - // Command::Cas { key, old } => { - // let vec = self.encode(value)?; - // let old = old.map(|v| self.encode(v)).transpose()?; - // if let Err(CompareAndSwapError { current, proposed }) = - // self.db.compare_and_swap(&key, old, Some(vec))? - // { - // Err(format!( - // "CAS error: expected {} but found {}.", - // self.decode(proposed, ingest_ns)?, - // self.decode(current, ingest_ns)?, - // ) - // .into()) - // } else { - // Ok(oks(op_name, key, Value::null())) - // } - // } - // Command::Scan { start, end } => { - // let i = match end { - // None => self.db.range(start..), - // Some(end) => self.db.range(start..end), - // }; - // let mut res = Vec::with_capacity(i.size_hint().0); - // for e in i { - // let (key, e) = e?; - // let key: &[u8] = &key; - - // res.push(ok(op_name, key.to_vec(), self.decode(Some(e), ingest_ns)?)); - // } - // Ok(res) - // } + } } } } @@ -428,3 +326,542 @@ impl Sink for KvSink { false } } + +#[cfg(test)] +mod test { + + use matches::assert_matches; + use openraft::error::{CheckIsLeaderError, ForwardToLeader, RaftError}; + + use crate::raft::{api::APIStoreReq, manager::IFRequest, node::Addr}; + + use super::*; + + #[test] + fn try_from_command() { + let cmd = literal!({ + "kv": { + "get": "the-key", + "strict": true + } + }); + let cmd = Command::try_from(&cmd).expect("it's OK"); + assert_eq!(cmd.op_name(), "get"); + assert_eq!(cmd.key(), "the-key"); + + let cmd = literal!({ + "kv": { + "put": "the-key" + } + }); + let cmd = Command::try_from(&cmd).expect("it's OK"); + assert_eq!(cmd.op_name(), "put"); + assert_eq!(cmd.key(), "the-key"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn on_event_get() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, mut store_rx) = bounded(8); + let (cluster_tx, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::KVGet(_key, result_tx) => { + let _ = result_tx.send(Some(b"42".to_vec())); + } + _ => panic!("wrong request"), + }; + }); + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: ((), literal!({"kv": { "get": "the-key" }})).into(), + ..Event::default() + }; + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::ACK); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::from(42), + literal!({ "kv": { "op": "get", "ok": "the-key" } }), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] + async fn on_event_get_strict_local() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, mut store_rx) = bounded(8); + let (cluster_tx, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + match store_rx.recv().await.expect("rcv") { + APIStoreReq::KVGet(_key, result_tx) => { + let _ = result_tx.send(Some(b"42".to_vec())); + } + _ => panic!("wrong request"), + }; + }); + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: ((), literal!({"kv": { "get": "the-key", "strict": true }})).into(), + ..Event::default() + }; + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::ACK); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::from(42), + literal!({ "kv": { "op": "get", "ok": "the-key" } }), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] + async fn on_event_get_strict_remote() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, _) = bounded(8); + let (cluster_tx, mut cluster_rx) = bounded(8); + + let mut api_server = mockito::Server::new(); + + let api = api_server.host_with_port(); + let rpc = api.clone(); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Err(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: Some(Addr::new(api, rpc)), + leader_id: Some(1), + }), + ))); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: ((), literal!({"kv": { "get": "the-key", "strict": true }})).into(), + ..Event::default() + }; + + let api_mock = api_server + .mock("POST", "/v1/api/kv/consistent_read") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::ACK); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::from(42), + literal!({ "kv": { "op": "get", "ok": "the-key" } }), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + api_mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn on_event_put_local() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, _) = bounded(8); + let (cluster_tx, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + match cluster_rx.recv().await.expect("rcv") { + IFRequest::SetKeyLocal(_key, result_tx) => { + let _ = result_tx.send(Ok(b"42".to_vec())); + } + IFRequest::IsLeader(_) => panic!("wrong request"), + }; + }); + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: (literal!(42), literal!({"kv": { "put": "the-key", }})).into(), + ..Event::default() + }; + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::ACK); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::from(42), + literal!({ "kv": { "op": "put", "ok": "the-key" } }), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] + async fn on_event_put_remote() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, _) = bounded(8); + let (cluster_tx, mut cluster_rx) = bounded(8); + + let mut api_server = mockito::Server::new(); + let api = api_server.host_with_port(); + let rpc = api.clone(); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Err(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: Some(Addr::new(api, rpc)), + leader_id: Some(1), + }), + ))); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: (literal!(42), literal!({"kv": { "put": "the-key", }})).into(), + ..Event::default() + }; + + let api_mock = api_server + .mock("POST", "/v1/api/kv/write") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::ACK); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::from(42), + literal!({ "kv": { "op": "put", "ok": "the-key" } }), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + api_mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn bad_command() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, _) = bounded(8); + let (cluster_tx, _) = bounded(8); + + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: ((), literal!({"kv": { "snot": "the-badger" }})).into(), + ..Event::default() + }; + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::FAIL); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::null(), + literal!({ "error": "Invalid command: {\"snot\": String(\"the-badger\")}", "kv": () }), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn on_event_error() -> Result<()> { + let (tx, mut rx) = bounded(1); + + let (store_tx, _) = bounded(8); + let (cluster_tx, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + match cluster_rx.recv().await.expect("rcv") { + IFRequest::SetKeyLocal(_key, result_tx) => { + let _ = result_tx.send(Err(ClusterError::RaftNotRunning.into())); + } + IFRequest::IsLeader(_) => panic!("wrong request"), + }; + }); + let alias = alias::Connector::new("cluster_kv"); + let raft = raft::Cluster::dummy(store_tx, cluster_tx); + + let app_ctx = AppContext { + raft: Some(raft.clone()), + ..AppContext::default() + }; + let mut sink = KvSink { + alias, + app_ctx, + raft, + tx, + codec: Json::default(), + origin_uri: EventOriginUri { + scheme: "tremor-cluster-kv".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + source_is_connected: Arc::new(true.into()), + }; + let ctx = SinkContext::dummy("cluster_kv"); + let mut serializer = EventSerializer::dummy(None)?; + + let event = Event { + data: (literal!(42), literal!({"kv": { "put": "the-key", }})).into(), + ..Event::default() + }; + + let reply = sink + .on_event("in", event, &ctx, &mut serializer, 0) + .await + .expect("it's OK"); + + assert_eq!(reply, SinkReply::FAIL); + let reply = rx.recv().await.expect("it's OK"); + let expected: EventPayload = ( + Value::null(), + literal!({ "error": "The raft node isn't running", "kv": {"op": "put", "key": "the-key"}}), + ) + .into(); + assert_matches!( + reply, + SourceReply::Structured { + payload, + .. + } if payload == expected + ); + Ok(()) + } +} diff --git a/src/connectors/sink.rs b/src/connectors/sink.rs index d78ceb255b..2f8a890a89 100644 --- a/src/connectors/sink.rs +++ b/src/connectors/sink.rs @@ -340,8 +340,8 @@ impl Context for SinkContext { fn connector_type(&self) -> &ConnectorType { &self.0.connector_type } - fn raft(&self) -> &raft::ClusterInterface { - &self.0.app_ctx.raft + fn raft(&self) -> Option<&raft::Cluster> { + self.0.app_ctx.raft.as_ref() } fn app_ctx(&self) -> &AppContext { &self.0.app_ctx @@ -449,7 +449,6 @@ pub(crate) fn builder( config.codec.clone(), connector_codec_requirement, postprocessor_configs, - &config.connector_type, alias, )?; // the incoming channels for events are all bounded, so we can safely be unbounded here @@ -492,7 +491,6 @@ impl EventSerializer { codec_config, default, vec![], - &ConnectorType::from("dummy"), &alias::Connector::from("dummy"), ) } @@ -500,7 +498,6 @@ impl EventSerializer { codec_config: Option, default_codec: CodecReq, postprocessor_configs: Vec, - _connector_type: &ConnectorType, alias: &alias::Connector, ) -> Result { let codec_config = match default_codec { diff --git a/src/connectors/source.rs b/src/connectors/source.rs index 7eb2b15fb3..90c4b82527 100644 --- a/src/connectors/source.rs +++ b/src/connectors/source.rs @@ -359,8 +359,8 @@ impl Context for SourceContext { fn connector_type(&self) -> &ConnectorType { &self.connector_type } - fn raft(&self) -> &raft::ClusterInterface { - &self.app_ctx.raft + fn raft(&self) -> Option<&raft::Cluster> { + self.app_ctx.raft.as_ref() } fn app_ctx(&self) -> &AppContext { &self.app_ctx diff --git a/src/pipeline.rs b/src/pipeline.rs index f48de2d180..afd961682b 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -17,6 +17,7 @@ use crate::{ instance::State, primerge::PriorityMerge, qsize, + raft::Cluster, system::flow::AppContext, Result, }; @@ -501,12 +502,12 @@ impl PipelineContext { impl std::fmt::Display for PipelineContext { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "[Node:{}][Pipeline::{}]", - self.app_context.raft.id(), - &self.alias - ) + let node = if let Some(raft) = self.app_context.raft.as_ref() { + format!("[Node:{}]", raft.id()) + } else { + String::new() + }; + write!(f, "{node}[Pipeline::{}]", &self.alias) } } @@ -521,7 +522,7 @@ pub(crate) async fn pipeline_task( tick_handler: JoinHandle<()>, ) -> Result<()> { pipeline.id = id.to_string(); - let node_id = app_ctx.raft.id(); + let node_id = app_ctx.raft.as_ref().map_or(0, Cluster::id); let ctx = PipelineContext::new(app_ctx.clone(), id.clone()); let mut dests: Dests = halfbrown::HashMap::new(); diff --git a/src/raft.rs b/src/raft.rs index 73dc060f68..55c642d3e8 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -28,9 +28,9 @@ pub mod store; #[cfg(test)] mod test; -pub(crate) use self::manager::{Cluster, ClusterInterface}; +pub(crate) use self::manager::Cluster; use network::raft_network_impl::Network; -use openraft::{storage::Adaptor, Config, Raft, TokioRuntime}; +use openraft::{storage::Adaptor, AnyError, Config, Raft, TokioRuntime}; use std::io::Cursor; use store::{TremorRequest, TremorResponse}; @@ -84,6 +84,12 @@ type ClusterResult = crate::Result; /// so instead of forking we're doing the silly dance #[derive(Debug)] pub(crate) struct SillyError(anyhow::Error); +impl SillyError { + /// Create a new silly error + pub fn err(e: impl Into) -> AnyError { + AnyError::new(&Self(e.into())) + } +} impl std::fmt::Display for SillyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/src/raft/api/apps.rs b/src/raft/api/apps.rs index 5da50709ef..93cc676d4c 100644 --- a/src/raft/api/apps.rs +++ b/src/raft/api/apps.rs @@ -104,12 +104,22 @@ async fn uninstall_app( .map(|d| d.data) } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct AppState { pub def: TremorAppDef, pub instances: Instances, } +#[cfg(test)] +impl AppState { + pub(crate) fn dummy() -> Self { + AppState { + def: TremorAppDef::dummy(), + instances: HashMap::new(), + } + } +} + impl From<&StateApp> for AppState { fn from(state: &StateApp) -> Self { AppState { diff --git a/src/raft/api/client.rs b/src/raft/api/client.rs index a3d4827222..4aa7b0191f 100644 --- a/src/raft/api/client.rs +++ b/src/raft/api/client.rs @@ -101,7 +101,7 @@ impl Tremor { ); Err(Error::HTTP(err)) } else { - Err("Heisenerror, not error nor success".into()) + Err(Error::Heisenerror) } } } @@ -413,49 +413,420 @@ Snapshot: } /// Errors that can happen when calling the raft api -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum Error { /// HTTP error - HTTP(reqwest::Error), - /// Other error - Other(String), + #[error(transparent)] + HTTP(#[from] reqwest::Error), + /// Heisenerror, not error nor success + #[error("Heisenerror, not error nor success")] + Heisenerror, + /// RuntimeError + #[error(transparent)] + Other(#[from] crate::Error), } -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Error { + /// Checks if the error is a not found error + #[must_use] + pub fn is_not_found(&self) -> bool { match self { - Self::HTTP(e) => e.fmt(f), - Self::Other(e) => e.fmt(f), + Self::HTTP(e) => e.status() == Some(reqwest::StatusCode::NOT_FOUND), + _ => false, } } } -impl std::error::Error for Error {} +#[cfg(test)] +mod test { + use std::{collections::BTreeMap, sync::Arc}; + + use openraft::{LeaderId, Membership, ServerState, StoredMembership, Vote}; -impl From for Error { - fn from(e: reqwest::Error) -> Self { - Self::HTTP(e) + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn write() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let request = KVSet { + key: "foo".to_string(), + value: "bar".into(), + }; + let mock = server + .mock("POST", "/v1/api/kv/write") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let res = tremor.write(&request).await?; + assert_eq!(res, 42); + mock.assert(); + + Ok(()) } -} + #[tokio::test(flavor = "multi_thread")] + async fn read() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let request = "foo"; + let mock = server + .mock("POST", "/v1/api/kv/read") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let res = tremor.read(request).await?; + assert_eq!(res, 42); + mock.assert(); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] + async fn consistent_read() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let request = "foo"; + let mock = server + .mock("POST", "/v1/api/kv/consistent_read") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let res = tremor.consistent_read(request).await?; + assert_eq!(res, 42); + mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn install() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let request = vec![1, 2, 3]; + let mock = server + .mock("POST", "/v1/api/apps") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#""app""#) + .create(); -impl From for Error { - fn from(e: crate::Error) -> Self { - Self::Other(e.to_string()) + let res = tremor.install(&request).await?; + assert_eq!(res, "app".into()); + mock.assert(); + + Ok(()) } -} -impl<'s> From<&'s str> for Error { - fn from(e: &'s str) -> Self { - Self::Other(e.into()) + + #[tokio::test(flavor = "multi_thread")] + async fn uninstall_app() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let app = alias::App::new("app"); + let mock = server + .mock("DELETE", "/v1/api/apps/app") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#""app""#) + .create(); + + let res = tremor.uninstall_app(&app).await?; + assert_eq!(res, "app".into()); + mock.assert(); + + Ok(()) } -} -impl Error { - /// Checks if the error is a not found error - #[must_use] - pub fn is_not_found(&self) -> bool { - match self { - Self::HTTP(e) => e.status() == Some(reqwest::StatusCode::NOT_FOUND), - Self::Other(_) => false, - } + #[tokio::test(flavor = "multi_thread")] + async fn start() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let flow = "flow".into(); + let instance = alias::Flow::new("app", "instance"); + let mut config = std::collections::HashMap::new(); + config.insert("foo".to_string(), "bar".into()); + let mock = server + .mock("POST", "/v1/api/apps/app/flows/flow") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&instance)?) + .create(); + + let res = tremor.start(&flow, &instance, config, true, false).await?; + assert_eq!(res, instance); + mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn change_instance_state() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let instance = alias::Flow::new("app", "instance"); + let mock = server + .mock("POST", "/v1/api/apps/app/instances/instance") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&instance)?) + .create(); + + let res = tremor + .change_instance_state(&instance, TremorInstanceState::Pause) + .await?; + assert_eq!(res, instance); + mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn stop_instance() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let instance = alias::Flow::new("app", "instance"); + let mock = server + .mock("DELETE", "/v1/api/apps/app/instances/instance") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&instance)?) + .create(); + + let res = tremor.stop_instance(&instance).await?; + assert_eq!(res, instance); + mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn list() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + let mut apps = HashMap::new(); + apps.insert("app".into(), AppState::dummy()); + let mock = server + .mock("GET", "/v1/api/apps") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&apps)?) + .create(); + + let res = tremor.list().await?; + assert_eq!(res, apps); + mock.assert(); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn add_node() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let addr = Addr::default(); + + let mock = server + .mock("POST", "/v1/cluster/nodes") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"42"#) + .create(); + + let res = tremor.add_node(&addr).await?; + assert_eq!(res, 42); + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn remove_node() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let node_id = 42; + + let mock = server + .mock("DELETE", "/v1/cluster/nodes/42") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("null") + .create(); + + tremor.remove_node(&node_id).await?; + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn get_nodes() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let mut nodes = HashMap::new(); + nodes.insert(42, Addr::default()); + + let mock = server + .mock("GET", "/v1/cluster/nodes") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&nodes)?) + .create(); + + let res = tremor.get_nodes().await?; + assert_eq!(res, nodes); + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn add_learner() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let node_id = 42; + let log_id = LogId { + leader_id: LeaderId { + term: 23, + node_id: 42, + }, + index: 19, + }; + + let mock = server + .mock("PUT", "/v1/cluster/learners/42") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&log_id)?) + .create(); + + let res = tremor.add_learner(&node_id).await?; + assert_eq!(res, Some(log_id)); + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn remove_learner() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let node_id = 42; + + let mock = server + .mock("DELETE", "/v1/cluster/learners/42") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("null") + .create(); + + tremor.remove_learner(&node_id).await?; + + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn promote_voter() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let node_id = 42; + + let mock = server + .mock("PUT", "/v1/cluster/voters/42") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"42"#) + .create(); + + let res = tremor.promote_voter(&node_id).await?; + assert_eq!(res, Some(42)); + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn demote_voter() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let node_id = 42; + + let mock = server + .mock("DELETE", "/v1/cluster/voters/42") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"42"#) + .create(); + + let res = tremor.demote_voter(&node_id).await?; + assert_eq!(res, Some(42)); + mock.assert(); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn metrics() -> Result<()> { + let mut server = mockito::Server::new(); + + let tremor = Tremor::new(&server.host_with_port())?; + + let mut metrics = RaftMetrics { + running_state: Ok(()), + id: 42, + current_term: 23, + vote: Vote::new(42, 23), + last_log_index: Some(19), + last_applied: None, + snapshot: None, + purged: None, + state: ServerState::Leader, + current_leader: Some(42), + membership_config: Arc::new(StoredMembership::new( + Some(LogId { + leader_id: LeaderId { + term: 23, + node_id: 42, + }, + index: 19, + }), + Membership::new(Vec::new(), BTreeMap::new()), + )), + replication: None, + }; + metrics.id = 42; + + let mock = server + .mock("GET", "/v1/cluster/metrics") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&metrics)?) + .create(); + + let res = tremor.metrics().await?; + assert_eq!(res, metrics); + mock.assert(); + Ok(()) } } diff --git a/src/raft/archive.rs b/src/raft/archive.rs index 63fb21e949..f112754d85 100644 --- a/src/raft/archive.rs +++ b/src/raft/archive.rs @@ -78,6 +78,14 @@ impl TremorAppDef { pub fn name(&self) -> &alias::App { &self.name } + #[cfg(test)] + pub(crate) fn dummy() -> Self { + Self { + name: alias::App("dummy".to_string()), + sha256: "dummy".to_string(), + flows: HashMap::new(), + } + } } /// Packages a tremor application into a tarball, entry point is the `main.troy` file, target the tar.gz file diff --git a/src/raft/manager.rs b/src/raft/manager.rs index c1ce933857..4dc1307f23 100644 --- a/src/raft/manager.rs +++ b/src/raft/manager.rs @@ -18,9 +18,10 @@ use std::{ }; use super::{ + api::apps::AppState, node::Addr, - store::{StateApp, TremorRequest, TremorSet}, - ClusterError, TremorRaftConfig, TremorRaftImpl, + store::{StateApp, TremorSet}, + TremorRaftImpl, }; use crate::raft::api::APIStoreReq; use crate::raft::NodeId; @@ -29,28 +30,19 @@ use crate::{ channel::{bounded, oneshot, OneShotSender, Sender}, connectors::prelude::Receiver, }; -use openraft::{ - error::{CheckIsLeaderError, Fatal, ForwardToLeader, RaftError}, - raft::ClientWriteResponse, -}; +use openraft::error::{CheckIsLeaderError, Fatal, ForwardToLeader, RaftError}; use simd_json::OwnedValue; use tremor_common::alias; -#[derive(Clone, Default)] +#[derive(Clone, Debug)] pub(crate) struct Cluster { node_id: NodeId, - raft: Option, - sender: Option>, -} -#[derive(Clone, Default, Debug)] -pub(crate) struct ClusterInterface { - node_id: NodeId, - store_sender: Option>, - cluster_sender: Option>, + store: Sender, + cluster: Sender, } type IsLeaderResult = std::result::Result<(), RaftError>>; -enum IFRequest { +pub(crate) enum IFRequest { IsLeader(OneShotSender), SetKeyLocal(TremorSet, OneShotSender>>), } @@ -76,36 +68,75 @@ async fn cluster_interface(raft: TremorRaftImpl, mut rx: Receiver) { } } -impl ClusterInterface { +impl Cluster { + #[cfg(test)] + pub(crate) fn dummy(store: Sender, cluster: Sender) -> Self { + Cluster { + node_id: 42, + store, + cluster, + } + } + pub(crate) fn new(node_id: NodeId, store: Sender, raft: TremorRaftImpl) -> Self { + let (cluster, rx) = bounded(1042); + tokio::spawn(cluster_interface(raft, rx)); + Cluster { + node_id, + store, + cluster, + } + } pub(crate) fn id(&self) -> NodeId { self.node_id } - async fn send_store(&self, command: APIStoreReq) -> Result<()> { - self.store_sender - .as_ref() - .ok_or(ClusterError::RaftNotRunning)? - .send(command) - .await?; - Ok(()) + + // cluster + pub(crate) async fn get_node(&self, node_id: u64) -> Result> { + let (tx, rx) = oneshot(); + let command = APIStoreReq::GetNode(node_id, tx); + self.store.send(command).await?; + Ok(rx.await?) } - async fn send_cluster(&self, command: IFRequest) -> Result<()> { - self.cluster_sender - .as_ref() - .ok_or(ClusterError::RaftNotRunning)? - .send(command) - .await?; - Ok(()) + pub(crate) async fn get_nodes(&self) -> Result> { + let (tx, rx) = oneshot(); + let command = APIStoreReq::GetNodes(tx); + self.store.send(command).await?; + Ok(rx.await?) } + pub(crate) async fn get_node_id(&self, addr: Addr) -> Result> { + let (tx, rx) = oneshot(); + let command = APIStoreReq::GetNodeId(addr, tx); + self.store.send(command).await?; + Ok(rx.await?) + } + pub(crate) async fn get_last_membership(&self) -> Result> { let (tx, rx) = oneshot(); let command = APIStoreReq::GetLastMembership(tx); - self.send_store(command).await?; + self.store.send(command).await?; + Ok(rx.await?) + } + + // apps + pub(crate) async fn get_app_local(&self, app_id: alias::App) -> Result> { + let (tx, rx) = oneshot(); + let command = APIStoreReq::GetApp(app_id, tx); + self.store.send(command).await?; + Ok(rx.await?) + } + + pub(crate) async fn get_apps_local(&self) -> Result> { + let (tx, rx) = oneshot(); + let command = APIStoreReq::GetApps(tx); + self.store.send(command).await?; Ok(rx.await?) } + pub(crate) async fn is_leader(&self) -> IsLeaderResult { let (tx, rx) = oneshot(); let command = IFRequest::IsLeader(tx); - self.send_cluster(command) + self.cluster + .send(command) .await .map_err(|_| RaftError::Fatal(Fatal::Stopped))?; rx.await.map_err(|_| RaftError::Fatal(Fatal::Stopped))? @@ -137,7 +168,7 @@ impl ClusterInterface { pub(crate) async fn kv_set_local(&self, key: String, value: Vec) -> Result> { let (tx, rx) = oneshot(); let command = IFRequest::SetKeyLocal(TremorSet { key, value }, tx); - self.send_cluster(command).await?; + self.cluster.send(command).await?; rx.await? } @@ -149,7 +180,7 @@ impl ClusterInterface { .. }))) => { let client = crate::raft::api::client::Tremor::new(n.api())?; - let res = client.read(&key).await; + let res = client.consistent_read(&key).await; match res { Ok(v) => Ok(Some(v)), Err(e) if e.is_not_found() => Ok(None), @@ -159,10 +190,11 @@ impl ClusterInterface { Err(e) => Err(e.into()), } } + pub(crate) async fn kv_get_local(&self, key: String) -> Result> { let (tx, rx) = oneshot(); let command = APIStoreReq::KVGet(key, tx); - self.send_store(command).await?; + self.store.send(command).await?; Ok(rx .await? .map(|mut v| simd_json::from_slice(&mut v)) @@ -170,162 +202,502 @@ impl ClusterInterface { } } -impl From for ClusterInterface { - fn from(c: Cluster) -> Self { - let cluster_sender = c.raft.map(|r| { - let (tx, rx) = bounded(1042); - tokio::spawn(cluster_interface(r, rx)); - tx +#[cfg(test)] +mod test { + use super::*; + use crate::channel::{bounded, oneshot}; + + #[tokio::test(flavor = "multi_thread")] + async fn send_store() -> Result<()> { + let (result_tx, result_rx) = oneshot(); + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetNodeId(_, result_tx) => { + let _ = result_tx.send(Some(42)); + } + _ => panic!("wrong request"), + }; }); - Self { - cluster_sender, - node_id: c.node_id, - store_sender: c.sender, - } + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + iface + .store + .send(APIStoreReq::GetNodeId(Addr::default(), result_tx)) + .await?; + assert_eq!(result_rx.await?, Some(42)); + Ok(()) } -} -impl std::fmt::Debug for Cluster { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Manager") - .field("node_id", &self.node_id) - .field("raft", &self.raft.is_some()) - .field("sender", &self.sender.is_some()) - .finish() + #[tokio::test(flavor = "multi_thread")] + async fn send_cluster() -> Result<()> { + let (result_tx, result_rx) = oneshot(); + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + iface.cluster.send(IFRequest::IsLeader(result_tx)).await?; + assert!(result_rx.await?.is_ok()); + Ok(()) } -} -impl Cluster { - async fn send(&self, command: APIStoreReq) -> Result<()> { - self.sender - .as_ref() - .ok_or(ClusterError::RaftNotRunning)? - .send(command) - .await?; + #[tokio::test(flavor = "multi_thread")] + async fn get_node() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetNode(node_id, result_tx) => { + assert_eq!(node_id, 42); + let _ = result_tx.send(Some(Addr::default())); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!(iface.get_node(42).await?, Some(Addr::default())); Ok(()) } - fn raft(&self) -> Result<&TremorRaftImpl> { - Ok(self.raft.as_ref().ok_or(ClusterError::RaftNotRunning)?) + #[tokio::test(flavor = "multi_thread")] + async fn get_nodes() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetNodes(result_tx) => { + let mut nodes = HashMap::new(); + nodes.insert(42, Addr::default()); + let _ = result_tx.send(nodes); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + let mut nodes = HashMap::new(); + nodes.insert(42, Addr::default()); + assert_eq!(iface.get_nodes().await?, nodes); + Ok(()) } - async fn client_write(&self, command: T) -> Result> - where - T: Into + Send + 'static, - { - Ok(self.raft()?.client_write(command.into()).await?) + #[tokio::test(flavor = "multi_thread")] + async fn get_node_id() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetNodeId(addr, result_tx) => { + assert_eq!(addr, Addr::default()); + let _ = result_tx.send(Some(42)); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!(iface.get_node_id(Addr::default()).await?, Some(42)); + Ok(()) } - pub(crate) async fn is_leader(&self) -> IsLeaderResult { - self.raft() - .map_err(|_| RaftError::Fatal(Fatal::Stopped))? - .is_leader() - .await + #[tokio::test(flavor = "multi_thread")] + async fn get_last_membership() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetLastMembership(result_tx) => { + let _ = result_tx.send(BTreeSet::new()); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!(iface.get_last_membership().await?, BTreeSet::new()); + Ok(()) } - pub(crate) fn new(node_id: NodeId, sender: Sender, raft: TremorRaftImpl) -> Self { - Self { - node_id, - raft: Some(raft), - sender: Some(sender), - } + #[tokio::test(flavor = "multi_thread")] + async fn get_app_local() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetApp(_, result_tx) => { + let _ = result_tx.send(Some(StateApp::dummy())); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + + assert_eq!( + iface.get_app_local("app".into()).await?, + Some(StateApp::dummy()) + ); + Ok(()) } - // cluster - pub(crate) async fn get_node(&self, node_id: u64) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::GetNode(node_id, tx); - self.send(command).await?; - Ok(rx.await?) + #[tokio::test(flavor = "multi_thread")] + async fn get_apps_local() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::GetApps(result_tx) => { + let mut apps: HashMap = HashMap::new(); + apps.insert("app".into(), AppState::dummy()); + let _ = result_tx.send(apps); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + + let mut apps: HashMap = HashMap::new(); + apps.insert("app".into(), AppState::dummy()); + assert_eq!(iface.get_apps_local().await?, apps); + Ok(()) } - pub(crate) async fn get_nodes(&self) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::GetNodes(tx); - self.send(command).await?; - Ok(rx.await?) + + #[tokio::test(flavor = "multi_thread")] + async fn is_leader() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert!(iface.is_leader().await.is_ok()); + Ok(()) } - pub(crate) async fn get_node_id(&self, addr: Addr) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::GetNodeId(addr, tx); - self.send(command).await?; - Ok(rx.await?) + + #[tokio::test(flavor = "multi_thread")] + async fn kv_set_local() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::SetKeyLocal(set, result_tx) => { + let _ = result_tx.send(Ok(set.value)); + } + IFRequest::IsLeader(_) => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!( + iface.kv_set_local("key".to_string(), vec![1, 2, 3]).await?, + vec![1, 2, 3] + ); + Ok(()) } - pub(crate) async fn get_last_membership(&self) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::GetLastMembership(tx); - self.send(command).await?; - Ok(rx.await?) + + #[tokio::test(flavor = "multi_thread")] + async fn kv_get_local() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, _) = bounded(8); + + tokio::spawn(async move { + match store_rx.recv().await.expect("rcv") { + APIStoreReq::KVGet(_key, result_tx) => { + let _ = result_tx.send(Some(b"42".to_vec())); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!( + iface.kv_get_local("key".to_string()).await?, + Some(simd_json::json!(42)) + ); + Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn kv_set_on_leader() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); - // apps - pub(crate) async fn get_app_local(&self, app_id: alias::App) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::GetApp(app_id, tx); - self.send(command).await?; - Ok(rx.await?) + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + match cluster_rx.recv().await.expect("rcv") { + IFRequest::SetKeyLocal(set, result_tx) => { + let _ = result_tx.send(Ok(set.value)); + } + IFRequest::IsLeader(_) => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!( + iface.kv_set("key".to_string(), vec![1, 2, 3]).await?, + vec![1, 2, 3] + ); + Ok(()) } - pub(crate) async fn get_apps_local( - &self, - ) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::GetApps(tx); - self.send(command).await?; - Ok(rx.await?) + #[tokio::test(flavor = "multi_thread")] + async fn kv_get_on_leader() -> Result<()> { + let (store, mut store_rx) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Ok(())); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + match store_rx.recv().await.expect("rcv") { + APIStoreReq::KVGet(_key, result_tx) => { + let _ = result_tx.send(Some(b"42".to_vec())); + } + _ => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!( + iface.kv_get("key".to_string()).await?, + Some(simd_json::json!(42)) + ); + Ok(()) } - // kv - pub(crate) async fn kv_set(&self, key: String, mut value: Vec) -> Result> { - match self.is_leader().await { - Ok(()) => self.kv_set_local(key, value).await, - Err(RaftError::APIError(CheckIsLeaderError::ForwardToLeader(ForwardToLeader { - leader_node: Some(n), - .. - }))) => { - let client = crate::raft::api::client::Tremor::new(n.api())?; - // TODO: there should be a better way to forward then the client - Ok(simd_json::to_vec( - &client - .write(&crate::raft::api::kv::KVSet { - key, - value: simd_json::from_slice(&mut value)?, - }) - .await?, - )?) - } - Err(e) => Err(e.into()), - } + #[tokio::test(flavor = "multi_thread")] + async fn kv_set_no_leader() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Err(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: None, + leader_id: None, + }), + ))); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert!(iface + .kv_set("key".to_string(), vec![1, 2, 3]) + .await + .is_err(),); + Ok(()) } - pub(crate) async fn kv_set_local(&self, key: String, value: Vec) -> Result> { - let tremor_res = self.client_write(TremorSet { key, value }).await?; - Ok(tremor_res.data.into_kv_value()?) + + #[tokio::test(flavor = "multi_thread")] + async fn kv_get_no_leader() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Err(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: None, + leader_id: None, + }), + ))); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert!(iface.kv_get("key".to_string()).await.is_err()); + Ok(()) } - pub(crate) async fn kv_get(&self, key: String) -> Result> { - match self.is_leader().await { - Ok(()) => self.kv_get_local(key).await, - Err(RaftError::APIError(CheckIsLeaderError::ForwardToLeader(ForwardToLeader { - leader_node: Some(n), - .. - }))) => { - let client = crate::raft::api::client::Tremor::new(n.api())?; - let res = client.read(&key).await; - match res { - Ok(v) => Ok(Some(v)), - Err(e) if e.is_not_found() => Ok(None), - Err(e) => Err(e.into()), + #[tokio::test(flavor = "multi_thread")] + async fn kv_set_follower() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + + let mut api_server = mockito::Server::new(); + + let api = api_server.host_with_port(); + let rpc = api.clone(); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Err(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: Some(Addr::new(api, rpc)), + leader_id: Some(1), + }), + ))); } - } - Err(e) => Err(e.into()), - } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + + // Create a mock + let api_mock = api_server + .mock("POST", "/v1/api/kv/write") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!( + iface.kv_set("key".to_string(), b"42".to_vec()).await?, + b"42".to_vec() + ); + + api_mock.assert(); + Ok(()) } - pub(crate) async fn kv_get_local(&self, key: String) -> Result> { - let (tx, rx) = oneshot(); - let command = APIStoreReq::KVGet(key, tx); - self.send(command).await?; - Ok(rx - .await? - .map(|mut v| simd_json::from_slice(&mut v)) - .transpose()?) + + #[tokio::test(flavor = "multi_thread")] + async fn kv_get_follower() -> Result<()> { + let (store, _) = bounded(8); + let (cluster, mut cluster_rx) = bounded(8); + let mut api_server = mockito::Server::new(); + + let api = api_server.host_with_port(); + let rpc = api.clone(); + + tokio::spawn(async move { + match cluster_rx.recv().await.expect("rcv") { + IFRequest::IsLeader(result_tx) => { + let _ = result_tx.send(Err(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: Some(Addr::new(api, rpc)), + leader_id: Some(1), + }), + ))); + } + IFRequest::SetKeyLocal(_, _) => panic!("wrong request"), + }; + }); + + // Create a mock + let api_mock = api_server + .mock("POST", "/v1/api/kv/consistent_read") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("42") + .create(); + + let iface = Cluster { + node_id: 0, + store, + cluster, + }; + assert_eq!( + iface.kv_get("key".to_string()).await?, + Some(simd_json::json!(42)) + ); + + api_mock.assert(); + Ok(()) } } diff --git a/src/raft/store.rs b/src/raft/store.rs index 5c44bac6fb..71250a36e1 100644 --- a/src/raft/store.rs +++ b/src/raft/store.rs @@ -39,14 +39,7 @@ use redb::{ }; use serde::{Deserialize, Serialize}; use simd_json::OwnedValue; -use std::{ - fmt::{Debug, Display, Formatter}, - io::Cursor, - ops::RangeBounds, - path::Path, - string::FromUtf8Error, - sync::{Arc, Mutex}, -}; +use std::{fmt::Debug, io::Cursor, ops::RangeBounds, path::Path, string::FromUtf8Error, sync::Arc}; use tokio::sync::RwLock; use tremor_common::alias; @@ -263,11 +256,10 @@ impl TryFrom for alias::Flow { } /// A snapshot -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub struct TremorSnapshot { /// The meta data of the snapshot. pub meta: SnapshotMeta, - /// The data of the state machine at the time of this snapshot. pub data: Vec, } @@ -294,7 +286,7 @@ fn id_to_bin(id: u64) -> Result, Error> { fn bin_to_id(buf: &[u8]) -> Result { Ok(buf .get(0..8) - .ok_or_else(|| Error::Other(format!("Invalid buffer length: {}", buf.len()).into()))? + .ok_or_else(|| Error::InvalidBinIdBufferLen(buf.len()))? .read_u64::()?) } @@ -302,198 +294,84 @@ fn bin_to_id(buf: &[u8]) -> Result { #[derive(Debug, thiserror::Error)] pub enum Error { /// invalid cluster store, node_id missing - // #[error("invalid cluster store, node_id missing")] + #[error("invalid cluster store, node_id missing")] MissingNodeId, - // #[error("invalid cluster store, node_addr missing")] /// invalid cluster store, node_addr missing + #[error("invalid cluster store, node_addr missing")] MissingNodeAddr, + /// invalid buffer lenght for binay i + #[error("Invalid buffer length: {0}")] + InvalidBinIdBufferLen(usize), /// Invalid utf8 - Utf8(FromUtf8Error), + #[error(transparent)] + Utf8(#[from] FromUtf8Error), /// Invalid utf8 - StrUtf8(std::str::Utf8Error), + #[error(transparent)] + StrUtf8(#[from] std::str::Utf8Error), /// MsgPack encode error - MsgPackEncode(rmp_serde::encode::Error), + #[error(transparent)] + MsgPackEncode(#[from] rmp_serde::encode::Error), /// MsgPack decode error - MsgPackDecode(rmp_serde::decode::Error), + #[error(transparent)] + MsgPackDecode(#[from] rmp_serde::decode::Error), /// Database error - Database(DatabaseError), + #[error(transparent)] + Database(#[from] DatabaseError), /// Transaction error - Transaction(TransactionError), + #[error(transparent)] + Transaction(#[from] TransactionError), /// Transaction error - Table(TableError), + #[error(transparent)] + Table(#[from] TableError), /// StorageError - DbStorage(DbStorageError), + #[error(transparent)] + DbStorage(#[from] DbStorageError), /// Commit Error - Commit(CommitError), + #[error(transparent)] + Commit(#[from] CommitError), /// IO error - Io(std::io::Error), + #[error(transparent)] + Io(#[from] std::io::Error), /// Storage error - Storage(openraft::StorageError), + #[error(transparent)] + Storage(#[from] openraft::StorageError), /// Tremor error - Tremor(Mutex), + #[error(transparent)] + Tremor(#[from] RuntimeError), /// Tremor script error - TremorScript(Mutex), + #[error(transparent)] + TremorScript(#[from] tremor_script::errors::Error), /// Missing app + #[error("missing app {0}")] MissingApp(alias::App), /// Missing flow + #[error("missing flow {0} in app {1}")] MissingFlow(alias::App, alias::FlowDefinition), /// Missing instance + #[error("missing instance {0}")] MissingInstance(alias::Flow), /// App still has running instances + #[error("app {0} still has running instances")] RunningInstances(alias::App), /// Node already added + #[error("node {0} already added")] NodeAlreadyAdded(crate::raft::NodeId), /// Other error - Other(Box), -} - -impl From> for Error { - fn from(e: std::sync::PoisonError) -> Self { - Self::Other(Box::new(e)) - } -} - -impl From for Error { - fn from(e: FromUtf8Error) -> Self { - Error::Utf8(e) - } -} - -impl From for Error { - fn from(e: std::str::Utf8Error) -> Self { - Error::StrUtf8(e) - } -} - -impl From for Error { - fn from(e: rmp_serde::encode::Error) -> Self { - Error::MsgPackEncode(e) - } -} - -impl From for Error { - fn from(e: rmp_serde::decode::Error) -> Self { - Error::MsgPackDecode(e) - } -} -impl From for Error { - fn from(e: redb::DatabaseError) -> Self { - Error::Database(e) - } -} -impl From for Error { - fn from(e: redb::TransactionError) -> Self { - Error::Transaction(e) - } -} - -impl From for Error { - fn from(e: redb::TableError) -> Self { - Error::Table(e) - } -} - -impl From for Error { - fn from(e: redb::StorageError) -> Self { - Error::DbStorage(e) - } -} - -impl From for Error { - fn from(e: redb::CommitError) -> Self { - Error::Commit(e) - } -} - -impl From for Error { - fn from(e: std::io::Error) -> Self { - Error::Io(e) - } -} - -impl From for Error { - fn from(e: crate::errors::Error) -> Self { - Error::Tremor(Mutex::new(e)) - } -} -impl From for Error { - fn from(e: tremor_script::errors::Error) -> Self { - Error::TremorScript(Mutex::new(e)) - } -} - -impl From> for Error { - fn from(e: StorageError) -> Self { - Error::Storage(e) - } -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Error::MissingNodeId => write!(f, "missing node id"), - Error::MissingNodeAddr => write!(f, "missing node addr"), - Error::Utf8(e) => write!(f, "invalid utf8: {e}"), - Error::StrUtf8(e) => write!(f, "invalid utf8: {e}"), - - Error::MsgPackEncode(e) => write!(f, "invalid msgpack: {e}"), - Error::MsgPackDecode(e) => write!(f, "invalid msgpack: {e}"), - - Error::Database(e) => write!(f, "database error: {e}"), - Error::Transaction(e) => write!(f, "transaction error: {e}"), - Error::Table(e) => write!(f, "table error: {e}"), - Error::DbStorage(e) => write!(f, "storage error: {e}"), - Error::Commit(e) => write!(f, "commit error: {e}"), - - Error::Io(e) => write!(f, "io error: {e}"), - Error::Tremor(e) => write!(f, "tremor error: {:?}", e.lock()), - Error::TremorScript(e) => write!(f, "tremor script error: {:?}", e.lock()), - Error::Other(e) => write!(f, "other error: {e}"), - Error::MissingApp(app) => write!(f, "missing app: {app}"), - Error::MissingFlow(app, flow) => write!(f, "missing flow: {app}::{flow}"), - Error::MissingInstance(instance) => { - write!(f, "missing instance: {instance}") - } - Error::Storage(e) => write!(f, "Storage: {e}"), - Error::NodeAlreadyAdded(node_id) => write!(f, "Node {node_id} already added"), - Error::RunningInstances(app_id) => { - write!(f, "App {app_id} still has running instances") - } - } - } + #[error(transparent)] + Other(anyhow::Error), } fn w_err(e: impl Into) -> StorageError { - StorageIOError::new( - ErrorSubject::Store, - ErrorVerb::Write, - AnyError::new(&SillyError(e.into())), - ) - .into() + StorageIOError::new(ErrorSubject::Store, ErrorVerb::Write, SillyError::err(e)).into() } fn r_err(e: impl Into) -> StorageError { - StorageIOError::new( - ErrorSubject::Store, - ErrorVerb::Read, - AnyError::new(&SillyError(e.into())), - ) - .into() + StorageIOError::new(ErrorSubject::Store, ErrorVerb::Read, SillyError::err(e)).into() } fn logs_r_err(e: impl Into) -> StorageError { - StorageIOError::new( - ErrorSubject::Logs, - ErrorVerb::Read, - AnyError::new(&SillyError(e.into())), - ) - .into() + StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, SillyError::err(e)).into() } fn logs_w_err(e: impl Into) -> StorageError { - StorageIOError::new( - ErrorSubject::Logs, - ErrorVerb::Read, - AnyError::new(&SillyError(e.into())), - ) - .into() + StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, SillyError::err(e)).into() } impl From for StorageError { @@ -602,8 +480,8 @@ impl RaftLogReader for Store { .collect::>() } } -#[async_trait] +#[async_trait] impl RaftSnapshotBuilder for Store { async fn build_snapshot(&mut self) -> StorageResult> { let data; @@ -998,23 +876,193 @@ impl Store { #[cfg(test)] mod tests { - // use crate::raft::ClusterResult; - - // use super::*; - - // #[test] - // fn init_db_is_idempotent() -> ClusterResult<()> { - // let dir = tempfile::tempdir()?; - // let db = Store::init_db(dir.path())?; - // let handle = db.cf_handle(Store::STORE).expect("no data"); - // let data = vec![1_u8, 2_u8, 3_u8]; - // db.put_cf(handle, "node_id", data.clone())?; - // drop(db); - - // let db2 = Store::init_db(dir.path())?; - // let handle2 = db2.cf_handle(Store::STORE).expect("no data"); - // let res2 = db2.get_cf(handle2, "node_id")?; - // assert_eq!(Some(data), res2); - // Ok(()) - // } + + use openraft::{LeaderId, Membership}; + + use crate::system::WorldConfig; + + use super::*; + + #[test] + fn tremor_response() { + use alias::{App, Flow}; + let kv = TremorResponse::KvValue(vec![1, 2, 3]); + let app = TremorResponse::AppId(App::default()); + let node = TremorResponse::NodeId(1); + let instance = TremorResponse::AppFlowInstanceId(Flow::default()); + + assert_eq!(kv.into_kv_value().expect("ok"), vec![1, 2, 3]); + assert_eq!(App::try_from(app).expect("ok"), App::default()); + assert_eq!(NodeId::try_from(node).expect("ok"), 1); + assert_eq!(Flow::try_from(instance).expect("ok"), Flow::default()); + assert!(matches!( + TremorResponse::None.into_kv_value(), + Err(ResponseError::NotKv) + )); + assert!(matches!( + App::try_from(TremorResponse::None), + Err(ResponseError::NotAppId) + )); + assert!(matches!( + NodeId::try_from(TremorResponse::None), + Err(ResponseError::NotNodeId) + )); + assert!(matches!( + Flow::try_from(TremorResponse::None), + Err(ResponseError::NotAppFlowInstanceId) + )); + } + + #[test] + fn ids() { + let id = 1; + let bin = id_to_bin(id).expect("ok"); + assert_eq!(bin.len(), 8); + assert_eq!(bin_to_id(&bin).expect("ok"), id); + } + #[test] + fn test_err_functions() { + let e = Error::Other(anyhow::anyhow!("test")); + assert!(matches!(w_err(e), StorageError::IO { .. })); + let e = Error::Other(anyhow::anyhow!("test")); + assert!(matches!(r_err(e), StorageError::IO { .. })); + let e = Error::Other(anyhow::anyhow!("test")); + assert!(matches!(logs_r_err(e), StorageError::IO { .. })); + let e = Error::Other(anyhow::anyhow!("test")); + assert!(matches!(logs_w_err(e), StorageError::IO { .. })); + } + #[tokio::test(flavor = "multi_thread")] + async fn test_vote() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let mut store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + + let vote = Vote::new(1, 1); + store.save_vote(&vote).await?; + let vote2 = store.read_vote().await?; + assert_eq!(Some(vote), vote2); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] + async fn test_last_purged() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + let log_id = LogId::new(LeaderId::new(1, 1), 1); + store.set_last_purged_(&log_id)?; + let log_id2 = store.get_last_purged_()?; + assert_eq!(Some(log_id), log_id2); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_snapshot_index() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + + let snapshot_index = 1; + store.set_snapshot_index_(snapshot_index)?; + let snapshot_index2 = store.get_snapshot_index_()?; + assert_eq!(snapshot_index, snapshot_index2); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] + async fn test_snapshot() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + + let membership = Membership::default(); + let snapshot = TremorSnapshot { + meta: SnapshotMeta { + last_log_id: Some(LogId::new(LeaderId::new(1, 1), 1)), + last_membership: StoredMembership::new( + Some(LogId::new(LeaderId::new(1, 1), 1)), + membership, + ), + snapshot_id: "test".to_string(), + }, + data: vec![1, 2, 3], + }; + store.set_current_snapshot_(&snapshot)?; + let snapshot2 = store.get_current_snapshot_()?; + assert_eq!(Some(snapshot), snapshot2); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn try_get_log_entries() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let mut store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + + let entry = Entry { + log_id: LogId::new(LeaderId::new(1, 1), 1), + payload: EntryPayload::Blank, + }; + store.append_to_log(vec![entry.clone()]).await?; + let entries = store.try_get_log_entries(0..2).await?; + assert_eq!(entries.len(), 1); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn build_snapshot() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let mut store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + + let entry = Entry { + log_id: LogId::new(LeaderId::new(1, 1), 1), + payload: EntryPayload::Blank, + }; + store.append_to_log(vec![entry.clone()]).await?; + + let snapshot = store.build_snapshot().await?; + // A single log entry will not create a snapshot so we will get None back + assert_eq!(snapshot.meta.last_log_id, None); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn save_vote() -> Result<()> { + let node_id = 1; + let addr = Addr::default(); + let (runtime, _) = Runtime::start(WorldConfig::default()) + .await + .expect("runtime"); + let dir = tempfile::tempdir()?; + let mut store = Store::bootstrap(node_id, &addr, dir.path().join("db"), runtime).await?; + + let vote = Vote::new(1, 1); + store.save_vote(&vote).await?; + let vote2 = store.read_vote().await?; + assert_eq!(Some(vote), vote2); + Ok(()) + } } diff --git a/src/raft/store/statemachine/apps.rs b/src/raft/store/statemachine/apps.rs index 403af32703..c5b94d3dc7 100644 --- a/src/raft/store/statemachine/apps.rs +++ b/src/raft/store/statemachine/apps.rs @@ -55,7 +55,7 @@ pub struct FlowInstance { } pub type Instances = HashMap; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct StateApp { pub app: TremorAppDef, pub instances: Instances, @@ -64,6 +64,19 @@ pub struct StateApp { main: Deploy, } +#[cfg(test)] +impl StateApp { + #[must_use] + pub(crate) fn dummy() -> Self { + Self { + app: TremorAppDef::dummy(), + instances: HashMap::new(), + arena_indices: Vec::new(), + main: Deploy::dummy(), + } + } +} + #[derive(Clone, Debug)] pub(crate) struct AppsStateMachine { db: Arc, @@ -108,7 +121,7 @@ impl RaftStateMachine for AppsStateMachine { for kv in apps.iter()? { let (_, archive) = kv?; me.load_archive(archive.value()) - .map_err(|e| store::Error::Other(Box::new(e)))?; + .map_err(|e| store::Error::Other(e.into()))?; } let instances = read_txn.open_table(INSTANCES)?; diff --git a/src/system.rs b/src/system.rs index ea491156a5..e1ab3a5428 100644 --- a/src/system.rs +++ b/src/system.rs @@ -28,8 +28,7 @@ use crate::{ connectors, errors::{ErrorKind, Result}, instance::IntendedState as IntendedInstanceState, - log_error, - raft::{self, ClusterInterface}, + log_error, raft, }; use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; use tremor_common::alias; @@ -217,10 +216,7 @@ impl Runtime { app: app_id, flow: Box::new(flow.clone()), sender: tx, - raft: self - .maybe_get_manager()? - .map(ClusterInterface::from) - .unwrap_or_default(), + raft: self.maybe_get_manager()?, deployment_type, }) .await?; diff --git a/src/system/flow.rs b/src/system/flow.rs index e777f9ee14..a979a7aafa 100644 --- a/src/system/flow.rs +++ b/src/system/flow.rs @@ -16,7 +16,7 @@ use crate::{ channel::empty_e, channel::{bounded, oneshot, OneShotSender, Sender}, connectors::AliasableConnectorResult, - raft::{self, NodeId}, + raft::{self, manager::Cluster, NodeId}, }; use crate::{ connectors::{self, ConnectorResult, Known}, @@ -113,7 +113,7 @@ pub struct StatusReport { #[derive(Debug, Clone, Default)] pub(crate) struct AppContext { pub(crate) id: alias::Flow, - pub(crate) raft: raft::ClusterInterface, + pub(crate) raft: Option, pub(crate) metrics: MetricsChannel, } @@ -125,13 +125,13 @@ impl AppContext { self.id.instance_id() } pub fn node_id(&self) -> NodeId { - self.raft.id() + self.raft.as_ref().map_or(0, Cluster::id) } } impl std::fmt::Display for AppContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[Node::{}][{}]", self.raft.id(), self.id) + write!(f, "[Node::{}][{}]", self.node_id(), self.id) } } @@ -607,39 +607,42 @@ impl RunningFlow { while let Some(wrapped) = self.input_channel.next().await { match wrapped { MsgWrapper::Msg(Msg::Tick) => { - if let Ok(Ok(members)) = timeout( - Duration::from_millis(100), - self.app_ctx.raft.get_last_membership(), - ) - .await + let Some(raft) = self.app_ctx.raft.as_ref() else { + continue; + }; + + let Ok(Ok(members)) = + timeout(Duration::from_millis(100), raft.get_last_membership()).await + else { + continue; + }; + + current_nodes = members.into_iter().collect(); + slot = jh.slot(&hash_key, current_nodes.len() as u32) as usize; + + if is_active_node(¤t_nodes, slot, node_id) + && intended_active_state == IntendedState::Running { - current_nodes = members.into_iter().collect(); - slot = jh.slot(&hash_key, current_nodes.len() as u32) as usize; - - if is_active_node(¤t_nodes, slot, node_id) - && intended_active_state == IntendedState::Running - { - match self.state { - State::Paused => { - if let Err(e) = self.handle_resume(&prefix).await { - error!("{prefix} Error during resuming: {e}"); - self.change_state(State::Failed); - } - } - State::Initializing => { - if let Err(e) = self.handle_start(&prefix).await { - error!("{prefix} Error starting: {e}"); - self.change_state(State::Failed); - }; - } - state => { - debug!("not changing from state: {state}"); + match self.state { + State::Paused => { + if let Err(e) = self.handle_resume(&prefix).await { + error!("{prefix} Error during resuming: {e}"); + self.change_state(State::Failed); } } - } else if self.state == State::Running { - self.handle_pause(&prefix).await?; - intended_active_state = IntendedState::Running; + State::Initializing => { + if let Err(e) = self.handle_start(&prefix).await { + error!("{prefix} Error starting: {e}"); + self.change_state(State::Failed); + }; + } + state => { + debug!("not changing from state: {state}"); + } } + } else if self.state == State::Running { + self.handle_pause(&prefix).await?; + intended_active_state = IntendedState::Running; } } MsgWrapper::Msg(Msg::ChangeState { diff --git a/src/system/flow_supervisor.rs b/src/system/flow_supervisor.rs index 2f712cbaf1..20c91760f9 100644 --- a/src/system/flow_supervisor.rs +++ b/src/system/flow_supervisor.rs @@ -51,7 +51,7 @@ pub(crate) enum Msg { /// result sender sender: OneShotSender>, /// API request sender - raft: raft::ClusterInterface, + raft: Option, /// Type of the deployment deployment_type: DeploymentType, }, @@ -112,7 +112,7 @@ impl FlowSupervisor { flow: DeployFlow<'static>, sender: oneshot::Sender>, kill_switch: &KillSwitch, - raft: raft::ClusterInterface, + raft: Option, deployment_type: DeploymentType, ) { let id = alias::Flow::new(app_id, &flow.instance_alias); diff --git a/tremor-script/src/ast/deploy.rs b/tremor-script/src/ast/deploy.rs index 8d4e5e3ea1..1a908539e5 100644 --- a/tremor-script/src/ast/deploy.rs +++ b/tremor-script/src/ast/deploy.rs @@ -45,6 +45,19 @@ pub struct Deploy<'script> { mid: Box, } +impl Deploy<'static> { + #[must_use] + pub(crate) fn dummy() -> Self { + Self { + docs: Docs::default(), + stmts: vec![], + config: HashMap::new(), + scope: Scope::default(), + mid: NodeMeta::dummy(), + } + } +} + impl<'script> BaseExpr for Deploy<'script> { fn meta(&self) -> &crate::NodeMeta { self.mid.meta() diff --git a/tremor-script/src/ast/module.rs b/tremor-script/src/ast/module.rs index 1b305c1775..2083e4bab6 100644 --- a/tremor-script/src/ast/module.rs +++ b/tremor-script/src/ast/module.rs @@ -485,12 +485,12 @@ impl Manager { MODULES.write()?.path.add(path); Ok(()) } - + #[cfg(feature = "arena-delete")] pub(crate) fn delete_arena_index(idx: arena::Index) -> Result<()> { MODULES.write()?.delete_arena_index_(idx); Ok(()) } - + #[cfg(feature = "arena-delete")] fn delete_arena_index_(&mut self, idx: arena::Index) { self.modules.retain(|m| { if m.arena_idx == idx { diff --git a/tremor-script/src/deploy.rs b/tremor-script/src/deploy.rs index faca2edfc4..4818a3e635 100644 --- a/tremor-script/src/deploy.rs +++ b/tremor-script/src/deploy.rs @@ -20,11 +20,12 @@ use crate::{ lexer::{self, Lexer}, module::PreCachedNodes, prelude::*, + NodeMeta, }; use std::collections::BTreeSet; /// A tremor deployment ( troy) -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Deploy { /// The deployment pub deploy: ast::Deploy<'static>, @@ -35,8 +36,22 @@ pub struct Deploy { /// Number of local variables (should be 0) pub locals: usize, } + +impl Deploy { + /// Creates a dummy deployment for testing + #[must_use] + pub fn dummy() -> Self { + Self { + deploy: ast::Deploy::dummy(), + aid: arena::Index::INVALID, + warnings: BTreeSet::new(), + locals: 0, + } + } +} + impl BaseExpr for Deploy { - fn meta(&self) -> &crate::NodeMeta { + fn meta(&self) -> &NodeMeta { self.deploy.meta() } }