From 051491d164b2300bf6901d807343657d4ef3cfff Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Wed, 15 Jan 2020 01:32:17 +0900 Subject: [PATCH 01/14] First pass at topic filter tries --- src/main.rs | 1 + src/topic.rs | 164 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 src/topic.rs diff --git a/src/main.rs b/src/main.rs index 7470400..38c1779 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ mod broker; mod client; mod decoder; mod encoder; +mod topic; mod types; pub struct MqttCodec { diff --git a/src/topic.rs b/src/topic.rs new file mode 100644 index 0000000..10fe63a --- /dev/null +++ b/src/topic.rs @@ -0,0 +1,164 @@ +use std::collections::HashMap; + +#[derive(Debug)] +pub struct SubscriptionTrie { + subscribers: Vec, + single_level_wildcards: Option>>, + multi_level_wildcards: Vec, + concrete_topic_levels: HashMap>, +} + +// TODO(bschwind) - All these topic strings need validation before +// operating on them. + +impl SubscriptionTrie { + pub fn new() -> Self { + Self { + subscribers: Vec::new(), + single_level_wildcards: None, + multi_level_wildcards: Vec::new(), + concrete_topic_levels: HashMap::new(), + } + } + + pub fn insert(&mut self, topic_filter: String, value: T) { + let mut current_trie = self; + let mut multi_level = false; + + for level in topic_filter.split("/") { + match level { + "+" => { + if current_trie.single_level_wildcards.is_some() { + current_trie = current_trie.single_level_wildcards.as_mut().unwrap(); + } else { + current_trie.single_level_wildcards = + Some(Box::new(SubscriptionTrie::new())); + current_trie = current_trie.single_level_wildcards.as_mut().unwrap(); + } + }, + "#" => { + multi_level = true; + break; + }, + concrete_topic_level => { + if current_trie.concrete_topic_levels.contains_key(concrete_topic_level) { + current_trie = current_trie + .concrete_topic_levels + .get_mut(concrete_topic_level) + .unwrap(); + } else { + current_trie + .concrete_topic_levels + .insert(concrete_topic_level.to_string(), SubscriptionTrie::new()); + + // TODO - Do this without another hash lookup + current_trie = current_trie + .concrete_topic_levels + .get_mut(concrete_topic_level) + .unwrap(); + } + }, + } + } + + if multi_level { + current_trie.multi_level_wildcards.push(value); + } else { + current_trie.subscribers.push(value); + } + } + + pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: String, mut sub_fn: F) { + let mut trie_stack = vec![]; + let levels: Vec<&str> = topic_name.split("/").collect(); + + trie_stack.push((self, 0)); + + while !trie_stack.is_empty() { + let (current_trie, current_level) = trie_stack.pop().unwrap(); + let level = levels[current_level]; + + for subscriber in ¤t_trie.multi_level_wildcards { + sub_fn(subscriber); + } + + if let Some(sub_trie) = ¤t_trie.single_level_wildcards { + if current_level + 1 < levels.len() { + trie_stack.push((sub_trie, current_level + 1)); + } else { + for subscriber in &sub_trie.subscribers { + sub_fn(subscriber); + } + } + } + + if current_trie.concrete_topic_levels.contains_key(level) { + let sub_trie = current_trie.concrete_topic_levels.get(level).unwrap(); + + if current_level + 1 < levels.len() { + let sub_trie = current_trie.concrete_topic_levels.get(level).unwrap(); + trie_stack.push((sub_trie, current_level + 1)); + } else { + for subscriber in &sub_trie.subscribers { + sub_fn(subscriber); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::topic::SubscriptionTrie; + + #[test] + fn test_insert() { + let mut sub_trie = SubscriptionTrie::new(); + sub_trie.insert("home/kitchen/temperature".to_string(), 1); + sub_trie.insert("home/kitchen/humidity".to_string(), 2); + sub_trie.insert("home/kitchen".to_string(), 3); + sub_trie.insert("home/+/humidity".to_string(), 4); + sub_trie.insert("home/+".to_string(), 5); + sub_trie.insert("home/#".to_string(), 6); + sub_trie.insert("home/+/temperature".to_string(), 7); + sub_trie.insert("office/stairwell/temperature".to_string(), 8); + sub_trie.insert("office/+/+".to_string(), 9); + sub_trie.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); + sub_trie.insert("office/+/+/some_desk/+/#".to_string(), 11); + sub_trie.insert("#".to_string(), 12); + + println!("{:#?}", sub_trie); + + sub_trie.matching_subscribers("home/kitchen".to_string(), |s| { + println!("{}", s); + }); + + println!(); + + sub_trie.matching_subscribers("home/kitchen/humidity".to_string(), |s| { + println!("{}", s); + }); + + println!(); + + sub_trie.matching_subscribers("office/stairwell/temperature".to_string(), |s| { + println!("{}", s); + }); + + println!(); + + sub_trie.matching_subscribers( + "office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature".to_string(), + |s| { + println!("{}", s); + }, + ); + + println!(); + + sub_trie.matching_subscribers("home".to_string(), |s| { + println!("{}", s); + }); + } +} From 415fc3d6268791c68d13d559ab0a3aaef1192274 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Fri, 17 Jan 2020 17:09:21 +0900 Subject: [PATCH 02/14] Add gross version of remove --- src/topic.rs | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 2 deletions(-) diff --git a/src/topic.rs b/src/topic.rs index 10fe63a..aa38dc1 100644 --- a/src/topic.rs +++ b/src/topic.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{hash_map::Entry, HashMap}; #[derive(Debug)] pub struct SubscriptionTrie { @@ -11,7 +11,7 @@ pub struct SubscriptionTrie { // TODO(bschwind) - All these topic strings need validation before // operating on them. -impl SubscriptionTrie { +impl SubscriptionTrie { pub fn new() -> Self { Self { subscribers: Vec::new(), @@ -21,6 +21,13 @@ impl SubscriptionTrie { } } + pub fn is_empty(&self) -> bool { + return self.subscribers.is_empty() + && self.single_level_wildcards.is_none() + && self.multi_level_wildcards.is_empty() + && self.concrete_topic_levels.is_empty(); + } + pub fn insert(&mut self, topic_filter: String, value: T) { let mut current_trie = self; let mut multi_level = false; @@ -68,6 +75,95 @@ impl SubscriptionTrie { } } + pub fn remove(&mut self, topic_filter: String, value: T) -> Option { + let mut current_trie = self; + let mut stack: Vec<(*mut SubscriptionTrie, usize)> = vec![]; + + let levels: Vec<&str> = topic_filter.split("/").collect(); + let mut level_index = 0; + + for level in &levels { + match *level { + "+" => { + if current_trie.single_level_wildcards.is_some() { + stack.push((&mut *current_trie, level_index)); + level_index += 1; + + current_trie = current_trie.single_level_wildcards.as_mut().unwrap(); + } else { + return None; + } + }, + "#" => { + break; + }, + concrete_topic_level => { + if current_trie.concrete_topic_levels.contains_key(concrete_topic_level) { + stack.push((&mut *current_trie, level_index)); + level_index += 1; + + current_trie = current_trie + .concrete_topic_levels + .get_mut(concrete_topic_level) + .unwrap(); + } else { + return None; + } + }, + } + } + + // Get the return value + let return_val = { + let level = &levels[levels.len() - 1]; + + if *level == "#" { + if let Some(pos) = + current_trie.multi_level_wildcards.iter().position(|x| *x == value) + { + Some(current_trie.multi_level_wildcards.remove(pos)) + } else { + None + } + } else { + if let Some(pos) = current_trie.subscribers.iter().position(|x| *x == value) { + Some(current_trie.subscribers.remove(pos)) + } else { + None + } + } + }; + + // Go up the stack, cleaning up empty nodes + while let Some((stack_val, level_index)) = stack.pop() { + let mut trie = unsafe { &mut *stack_val }; + + let level = levels[level_index]; + + match level { + "+" => { + if trie.single_level_wildcards.as_ref().map(|t| t.is_empty()).unwrap_or(false) { + trie.single_level_wildcards = None; + } + }, + "#" => { + // TODO - Ignore this case? + }, + concrete_topic_level => { + if let Entry::Occupied(o) = + trie.concrete_topic_levels.entry(concrete_topic_level.to_string()) + { + if o.get().is_empty() { + o.remove_entry(); + } + } + }, + } + } + + return_val + } + pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: String, mut sub_fn: F) { let mut trie_stack = vec![]; let levels: Vec<&str> = topic_name.split("/").collect(); @@ -126,6 +222,7 @@ mod tests { sub_trie.insert("office/+/+".to_string(), 9); sub_trie.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); sub_trie.insert("office/+/+/some_desk/+/#".to_string(), 11); + sub_trie.insert("sport/tennis/+".to_string(), 21); sub_trie.insert("#".to_string(), 12); println!("{:#?}", sub_trie); @@ -160,5 +257,49 @@ mod tests { sub_trie.matching_subscribers("home".to_string(), |s| { println!("{}", s); }); + + println!(); + + sub_trie.matching_subscribers("sport/tennis/player1".to_string(), |s| { + println!("{}", s); + }); + + println!(); + + sub_trie.matching_subscribers("sport/tennis/player2".to_string(), |s| { + println!("{}", s); + }); + + println!(); + + sub_trie.matching_subscribers("sport/tennis/player1/ranking".to_string(), |s| { + println!("{}", s); + }); + } + + #[test] + fn test_remove() { + let mut sub_trie = SubscriptionTrie::new(); + sub_trie.insert("home/kitchen/temperature".to_string(), 1); + sub_trie.insert("home/kitchen/temperature".to_string(), 2); + sub_trie.insert("home/kitchen/humidity".to_string(), 1); + sub_trie.insert("home/kitchen/#".to_string(), 1); + sub_trie.insert("home/kitchen/+".to_string(), 3); + sub_trie.insert("home/kitchen/+".to_string(), 2); + sub_trie.insert("#".to_string(), 6); + + println!("{:#?}", sub_trie); + + sub_trie.remove("home/kitchen/temperature".to_string(), 1); + println!("{:#?}", sub_trie); + + sub_trie.remove("home/kitchen/temperature".to_string(), 2); + println!("{:#?}", sub_trie); + + sub_trie.remove("home/kitchen/#".to_string(), 1); + println!("{:#?}", sub_trie); + + sub_trie.remove("home/kitchen/+".to_string(), 3); + println!("{:#?}", sub_trie); } } From 7957104aad984bbbe772c1fb2d98319d8a3c1445 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Sat, 18 Jan 2020 21:01:14 +0900 Subject: [PATCH 03/14] Rename trie to tree, add TopicFilter struct and parsing logic --- src/topic.rs | 603 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 501 insertions(+), 102 deletions(-) diff --git a/src/topic.rs b/src/topic.rs index aa38dc1..0cca149 100644 --- a/src/topic.rs +++ b/src/topic.rs @@ -1,17 +1,171 @@ -use std::collections::{hash_map::Entry, HashMap}; +use std::{ + collections::{hash_map::Entry, HashMap}, + str::FromStr, +}; + +const TOPIC_SEPARATOR: char = '/'; + +const MULTI_LEVEL_WILDCARD: char = '#'; +const MULTI_LEVEL_WILDCARD_STR: &'static str = "#"; + +const SINGLE_LEVEL_WILDCARD: char = '+'; +const SINGLE_LEVEL_WILDCARD_STR: &'static str = "+"; + +const SHARED_SUBSCRIPTION_PREFIX: &'static str = "$share/"; + +pub const MAX_TOPIC_LEN_BYTES: usize = 65_535; + +// TODO(bschwind) - Support shared subscriptions + +/// A filter for subscribers to indicate which topics they want +/// to receive messages from. Can contain wildcards. +#[derive(Debug, PartialEq)] +pub enum TopicFilter { + Concrete { filter: String, level_count: u32 }, + Wildcard { filter: String, level_count: u32 }, + SharedConcrete { group_name: String, filter: String, level_count: u32 }, + SharedWildcard { group_name: String, filter: String, level_count: u32 }, +} + +/// A topic name publishers use when sending MQTT messages. +/// Cannot contain wildcards. +#[derive(Debug)] +pub struct Topic { + topic_name: String, + level_count: u32, +} + +#[derive(Debug, PartialEq)] +pub enum TopicParseError { + EmptyTopic, + TopicTooLong, + MultilevelWildcardNotAtEnd, + InvalidWildcardLevel, + InvalidSharedGroupName, + EmptySharedGroupName, +} + +/// If Ok, returns (level_count, contains_wildcards). +fn process_filter(filter: &str) -> Result<(u32, bool), TopicParseError> { + let mut level_count = 0; + let mut contains_wildcards = false; + for level in filter.split(TOPIC_SEPARATOR) { + let level_contains_wildcard = + level.contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD); + if level_contains_wildcard { + // Any wildcards on a particular level must be specified on their own + if level.len() > 1 { + return Err(TopicParseError::InvalidWildcardLevel); + } + + contains_wildcards = true; + } + + level_count += 1; + } + + Ok((level_count, contains_wildcards)) +} + +impl FromStr for TopicFilter { + type Err = TopicParseError; + + fn from_str(filter: &str) -> Result { + // Filters and topics cannot be empty + if filter.is_empty() { + return Err(TopicParseError::EmptyTopic); + } + + // Filters cannot exceed the byte length in the MQTT spec + if filter.len() > MAX_TOPIC_LEN_BYTES { + return Err(TopicParseError::TopicTooLong); + } + + // Multi-level wildcards can only be at the end of the topic + if let Some(pos) = filter.rfind(MULTI_LEVEL_WILDCARD) { + if pos != filter.len() - 1 { + return Err(TopicParseError::MultilevelWildcardNotAtEnd); + } + } + + let mut shared_group = None; + + if filter.starts_with(SHARED_SUBSCRIPTION_PREFIX) { + let filter_rest = &filter[SHARED_SUBSCRIPTION_PREFIX.len()..]; + + if filter_rest.is_empty() { + return Err(TopicParseError::EmptySharedGroupName); + } + + if let Some(slash_pos) = filter_rest.find(TOPIC_SEPARATOR) { + let shared_name = &filter_rest[0..slash_pos]; + + // slash_pos+1 is safe here, we've already validated the string + // has a nonzero length. + let shared_filter = &filter_rest[(slash_pos + 1)..]; + + if shared_name.is_empty() { + return Err(TopicParseError::EmptySharedGroupName); + } + + if shared_name + .contains(|x: char| x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD) + { + return Err(TopicParseError::InvalidSharedGroupName); + } + + if shared_filter.is_empty() { + return Err(TopicParseError::EmptyTopic); + } + + shared_group = Some((shared_name, shared_filter)) + } else { + return Err(TopicParseError::EmptyTopic); + } + } + + let topic_filter = if let Some((group_name, shared_filter)) = shared_group { + let (level_count, contains_wildcards) = process_filter(shared_filter)?; + + if contains_wildcards { + TopicFilter::SharedWildcard { + group_name: group_name.to_string(), + filter: shared_filter.to_string(), + level_count, + } + } else { + TopicFilter::SharedConcrete { + group_name: group_name.to_string(), + filter: shared_filter.to_string(), + level_count, + } + } + } else { + let (level_count, contains_wildcards) = process_filter(filter)?; + + if contains_wildcards { + TopicFilter::Wildcard { filter: filter.to_string(), level_count } + } else { + TopicFilter::Concrete { filter: filter.to_string(), level_count } + } + }; + + Ok(topic_filter) + } +} #[derive(Debug)] -pub struct SubscriptionTrie { +pub struct SubscriptionTree { subscribers: Vec, - single_level_wildcards: Option>>, + single_level_wildcards: Option>>, multi_level_wildcards: Vec, - concrete_topic_levels: HashMap>, + concrete_topic_levels: HashMap>, } // TODO(bschwind) - All these topic strings need validation before // operating on them. -impl SubscriptionTrie { +impl SubscriptionTree { pub fn new() -> Self { Self { subscribers: Vec::new(), @@ -29,37 +183,37 @@ impl SubscriptionTrie { } pub fn insert(&mut self, topic_filter: String, value: T) { - let mut current_trie = self; + let mut current_tree = self; let mut multi_level = false; - for level in topic_filter.split("/") { + for level in topic_filter.split(TOPIC_SEPARATOR) { match level { - "+" => { - if current_trie.single_level_wildcards.is_some() { - current_trie = current_trie.single_level_wildcards.as_mut().unwrap(); + SINGLE_LEVEL_WILDCARD_STR => { + if current_tree.single_level_wildcards.is_some() { + current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } else { - current_trie.single_level_wildcards = - Some(Box::new(SubscriptionTrie::new())); - current_trie = current_trie.single_level_wildcards.as_mut().unwrap(); + current_tree.single_level_wildcards = + Some(Box::new(SubscriptionTree::new())); + current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } }, - "#" => { + MULTI_LEVEL_WILDCARD_STR => { multi_level = true; break; }, concrete_topic_level => { - if current_trie.concrete_topic_levels.contains_key(concrete_topic_level) { - current_trie = current_trie + if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { + current_tree = current_tree .concrete_topic_levels .get_mut(concrete_topic_level) .unwrap(); } else { - current_trie + current_tree .concrete_topic_levels - .insert(concrete_topic_level.to_string(), SubscriptionTrie::new()); + .insert(concrete_topic_level.to_string(), SubscriptionTree::new()); // TODO - Do this without another hash lookup - current_trie = current_trie + current_tree = current_tree .concrete_topic_levels .get_mut(concrete_topic_level) .unwrap(); @@ -69,40 +223,40 @@ impl SubscriptionTrie { } if multi_level { - current_trie.multi_level_wildcards.push(value); + current_tree.multi_level_wildcards.push(value); } else { - current_trie.subscribers.push(value); + current_tree.subscribers.push(value); } } pub fn remove(&mut self, topic_filter: String, value: T) -> Option { - let mut current_trie = self; - let mut stack: Vec<(*mut SubscriptionTrie, usize)> = vec![]; + let mut current_tree = self; + let mut stack: Vec<(*mut SubscriptionTree, usize)> = vec![]; - let levels: Vec<&str> = topic_filter.split("/").collect(); + let levels: Vec<&str> = topic_filter.split(TOPIC_SEPARATOR).collect(); let mut level_index = 0; for level in &levels { match *level { - "+" => { - if current_trie.single_level_wildcards.is_some() { - stack.push((&mut *current_trie, level_index)); + SINGLE_LEVEL_WILDCARD_STR => { + if current_tree.single_level_wildcards.is_some() { + stack.push((&mut *current_tree, level_index)); level_index += 1; - current_trie = current_trie.single_level_wildcards.as_mut().unwrap(); + current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } else { return None; } }, - "#" => { + MULTI_LEVEL_WILDCARD_STR => { break; }, concrete_topic_level => { - if current_trie.concrete_topic_levels.contains_key(concrete_topic_level) { - stack.push((&mut *current_trie, level_index)); + if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { + stack.push((&mut *current_tree, level_index)); level_index += 1; - current_trie = current_trie + current_tree = current_tree .concrete_topic_levels .get_mut(concrete_topic_level) .unwrap(); @@ -117,17 +271,17 @@ impl SubscriptionTrie { let return_val = { let level = &levels[levels.len() - 1]; - if *level == "#" { + if *level == MULTI_LEVEL_WILDCARD_STR { if let Some(pos) = - current_trie.multi_level_wildcards.iter().position(|x| *x == value) + current_tree.multi_level_wildcards.iter().position(|x| *x == value) { - Some(current_trie.multi_level_wildcards.remove(pos)) + Some(current_tree.multi_level_wildcards.remove(pos)) } else { None } } else { - if let Some(pos) = current_trie.subscribers.iter().position(|x| *x == value) { - Some(current_trie.subscribers.remove(pos)) + if let Some(pos) = current_tree.subscribers.iter().position(|x| *x == value) { + Some(current_tree.subscribers.remove(pos)) } else { None } @@ -136,22 +290,22 @@ impl SubscriptionTrie { // Go up the stack, cleaning up empty nodes while let Some((stack_val, level_index)) = stack.pop() { - let mut trie = unsafe { &mut *stack_val }; + let mut tree = unsafe { &mut *stack_val }; let level = levels[level_index]; match level { - "+" => { - if trie.single_level_wildcards.as_ref().map(|t| t.is_empty()).unwrap_or(false) { - trie.single_level_wildcards = None; + SINGLE_LEVEL_WILDCARD_STR => { + if tree.single_level_wildcards.as_ref().map(|t| t.is_empty()).unwrap_or(false) { + tree.single_level_wildcards = None; } }, - "#" => { + MULTI_LEVEL_WILDCARD_STR => { // TODO - Ignore this case? }, concrete_topic_level => { if let Entry::Occupied(o) = - trie.concrete_topic_levels.entry(concrete_topic_level.to_string()) + tree.concrete_topic_levels.entry(concrete_topic_level.to_string()) { if o.get().is_empty() { o.remove_entry(); @@ -164,38 +318,38 @@ impl SubscriptionTrie { return_val } - pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: String, mut sub_fn: F) { - let mut trie_stack = vec![]; - let levels: Vec<&str> = topic_name.split("/").collect(); + pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) { + let mut tree_stack = vec![]; + let levels: Vec<&str> = topic_name.split(TOPIC_SEPARATOR).collect(); - trie_stack.push((self, 0)); + tree_stack.push((self, 0)); - while !trie_stack.is_empty() { - let (current_trie, current_level) = trie_stack.pop().unwrap(); + while !tree_stack.is_empty() { + let (current_tree, current_level) = tree_stack.pop().unwrap(); let level = levels[current_level]; - for subscriber in ¤t_trie.multi_level_wildcards { + for subscriber in ¤t_tree.multi_level_wildcards { sub_fn(subscriber); } - if let Some(sub_trie) = ¤t_trie.single_level_wildcards { + if let Some(sub_tree) = ¤t_tree.single_level_wildcards { if current_level + 1 < levels.len() { - trie_stack.push((sub_trie, current_level + 1)); + tree_stack.push((sub_tree, current_level + 1)); } else { - for subscriber in &sub_trie.subscribers { + for subscriber in &sub_tree.subscribers { sub_fn(subscriber); } } } - if current_trie.concrete_topic_levels.contains_key(level) { - let sub_trie = current_trie.concrete_topic_levels.get(level).unwrap(); + if current_tree.concrete_topic_levels.contains_key(level) { + let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); if current_level + 1 < levels.len() { - let sub_trie = current_trie.concrete_topic_levels.get(level).unwrap(); - trie_stack.push((sub_trie, current_level + 1)); + let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); + tree_stack.push((sub_tree, current_level + 1)); } else { - for subscriber in &sub_trie.subscribers { + for subscriber in &sub_tree.subscribers { sub_fn(subscriber); } } @@ -206,47 +360,292 @@ impl SubscriptionTrie { #[cfg(test)] mod tests { - use crate::topic::SubscriptionTrie; + use crate::topic::{SubscriptionTree, TopicFilter, TopicParseError, MAX_TOPIC_LEN_BYTES}; + + #[test] + fn test_topic_filter_parse_empty_topic() { + assert_eq!("".parse::().unwrap_err(), TopicParseError::EmptyTopic); + } + + #[test] + fn test_topic_filter_parse_length() { + let just_right_topic = "a".repeat(MAX_TOPIC_LEN_BYTES); + assert!(just_right_topic.parse::().is_ok()); + + let too_long_topic = "a".repeat(MAX_TOPIC_LEN_BYTES + 1); + assert_eq!( + too_long_topic.parse::().unwrap_err(), + TopicParseError::TopicTooLong + ); + } + + #[test] + fn test_topic_filter_parse_concrete() { + assert_eq!( + "/".parse::().unwrap(), + TopicFilter::Concrete { filter: "/".to_string(), level_count: 2 } + ); + + assert_eq!( + "a".parse::().unwrap(), + TopicFilter::Concrete { filter: "a".to_string(), level_count: 1 } + ); + + // $SYS topics can be subscribed to, but can't be published + assert_eq!( + "home/kitchen".parse::().unwrap(), + TopicFilter::Concrete { filter: "home/kitchen".to_string(), level_count: 2 } + ); + + assert_eq!( + "home/kitchen/temperature".parse::().unwrap(), + TopicFilter::Concrete { + filter: "home/kitchen/temperature".to_string(), + level_count: 3, + } + ); + + assert_eq!( + "home/kitchen/temperature/celsius".parse::().unwrap(), + TopicFilter::Concrete { + filter: "home/kitchen/temperature/celsius".to_string(), + level_count: 4, + } + ); + } + + #[test] + fn test_topic_filter_parse_single_level_wildcard() { + assert_eq!( + "+".parse::().unwrap(), + TopicFilter::Wildcard { filter: "+".to_string(), level_count: 1 } + ); + + assert_eq!( + "+/".parse::().unwrap(), + TopicFilter::Wildcard { filter: "+/".to_string(), level_count: 2 } + ); + + assert_eq!( + "sport/+".parse::().unwrap(), + TopicFilter::Wildcard { filter: "sport/+".to_string(), level_count: 2 } + ); + + assert_eq!( + "/+".parse::().unwrap(), + TopicFilter::Wildcard { filter: "/+".to_string(), level_count: 2 } + ); + } + + #[test] + fn test_topic_filter_parse_multi_level_wildcard() { + assert_eq!( + "#".parse::().unwrap(), + TopicFilter::Wildcard { filter: "#".to_string(), level_count: 1 } + ); + + assert_eq!( + "#/".parse::().unwrap_err(), + TopicParseError::MultilevelWildcardNotAtEnd + ); + + assert_eq!( + "/#".parse::().unwrap(), + TopicFilter::Wildcard { filter: "/#".to_string(), level_count: 2 } + ); + + assert_eq!( + "sport/#".parse::().unwrap(), + TopicFilter::Wildcard { filter: "sport/#".to_string(), level_count: 2 } + ); + + assert_eq!( + "home/kitchen/temperature/#".parse::().unwrap(), + TopicFilter::Wildcard { + filter: "home/kitchen/temperature/#".to_string(), + level_count: 4, + } + ); + } + + #[test] + fn test_topic_filter_parse_shared_subscription_concrete() { + assert_eq!( + "$share/group_a/home".parse::().unwrap(), + TopicFilter::SharedConcrete { + group_name: "group_a".to_string(), + filter: "home".to_string(), + level_count: 1, + } + ); + + assert_eq!( + "$share/group_a/home/kitchen/temperature".parse::().unwrap(), + TopicFilter::SharedConcrete { + group_name: "group_a".to_string(), + filter: "home/kitchen/temperature".to_string(), + level_count: 3, + } + ); + + assert_eq!( + "$share/group_a//".parse::().unwrap(), + TopicFilter::SharedConcrete { + group_name: "group_a".to_string(), + filter: "/".to_string(), + level_count: 2, + } + ); + } + + #[test] + fn test_topic_filter_parse_shared_subscription_wildcard() { + assert_eq!( + "$share/group_b/#".parse::().unwrap(), + TopicFilter::SharedWildcard { + group_name: "group_b".to_string(), + filter: "#".to_string(), + level_count: 1, + } + ); + + assert_eq!( + "$share/group_b/+".parse::().unwrap(), + TopicFilter::SharedWildcard { + group_name: "group_b".to_string(), + filter: "+".to_string(), + level_count: 1, + } + ); + + assert_eq!( + "$share/group_b/+/temperature".parse::().unwrap(), + TopicFilter::SharedWildcard { + group_name: "group_b".to_string(), + filter: "+/temperature".to_string(), + level_count: 2, + } + ); + + assert_eq!( + "$share/group_c/+/temperature/+/meta".parse::().unwrap(), + TopicFilter::SharedWildcard { + group_name: "group_c".to_string(), + filter: "+/temperature/+/meta".to_string(), + level_count: 4, + } + ); + } + + #[test] + fn test_topic_filter_parse_invalid_shared_subscription() { + assert_eq!( + "$share/".parse::().unwrap_err(), + TopicParseError::EmptySharedGroupName + ); + assert_eq!("$share/a".parse::().unwrap_err(), TopicParseError::EmptyTopic); + assert_eq!("$share/a/".parse::().unwrap_err(), TopicParseError::EmptyTopic); + assert_eq!( + "$share//".parse::().unwrap_err(), + TopicParseError::EmptySharedGroupName + ); + assert_eq!( + "$share///".parse::().unwrap_err(), + TopicParseError::EmptySharedGroupName + ); + + assert_eq!( + "$share/invalid_group#/#".parse::().unwrap_err(), + TopicParseError::InvalidSharedGroupName + ); + } + + #[test] + fn test_topic_filter_parse_sys_prefix() { + assert_eq!( + "$SYS/stats".parse::().unwrap(), + TopicFilter::Concrete { filter: "$SYS/stats".to_string(), level_count: 2 } + ); + + assert_eq!( + "/$SYS/stats".parse::().unwrap(), + TopicFilter::Concrete { filter: "/$SYS/stats".to_string(), level_count: 3 } + ); + + assert_eq!( + "$SYS/+".parse::().unwrap(), + TopicFilter::Wildcard { filter: "$SYS/+".to_string(), level_count: 2 } + ); + + assert_eq!( + "$SYS/#".parse::().unwrap(), + TopicFilter::Wildcard { filter: "$SYS/#".to_string(), level_count: 2 } + ); + } + + #[test] + fn test_topic_filter_parse_invalid_filters() { + assert_eq!( + "sport/#/stats".parse::().unwrap_err(), + TopicParseError::MultilevelWildcardNotAtEnd + ); + assert_eq!( + "sport/#/stats#".parse::().unwrap_err(), + TopicParseError::InvalidWildcardLevel + ); + assert_eq!( + "sport#/stats#".parse::().unwrap_err(), + TopicParseError::InvalidWildcardLevel + ); + assert_eq!( + "sport/tennis#".parse::().unwrap_err(), + TopicParseError::InvalidWildcardLevel + ); + assert_eq!( + "sport/++".parse::().unwrap_err(), + TopicParseError::InvalidWildcardLevel + ); + } #[test] fn test_insert() { - let mut sub_trie = SubscriptionTrie::new(); - sub_trie.insert("home/kitchen/temperature".to_string(), 1); - sub_trie.insert("home/kitchen/humidity".to_string(), 2); - sub_trie.insert("home/kitchen".to_string(), 3); - sub_trie.insert("home/+/humidity".to_string(), 4); - sub_trie.insert("home/+".to_string(), 5); - sub_trie.insert("home/#".to_string(), 6); - sub_trie.insert("home/+/temperature".to_string(), 7); - sub_trie.insert("office/stairwell/temperature".to_string(), 8); - sub_trie.insert("office/+/+".to_string(), 9); - sub_trie.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); - sub_trie.insert("office/+/+/some_desk/+/#".to_string(), 11); - sub_trie.insert("sport/tennis/+".to_string(), 21); - sub_trie.insert("#".to_string(), 12); - - println!("{:#?}", sub_trie); - - sub_trie.matching_subscribers("home/kitchen".to_string(), |s| { + let mut sub_tree = SubscriptionTree::new(); + sub_tree.insert("home/kitchen/temperature".to_string(), 1); + sub_tree.insert("home/kitchen/humidity".to_string(), 2); + sub_tree.insert("home/kitchen".to_string(), 3); + sub_tree.insert("home/+/humidity".to_string(), 4); + sub_tree.insert("home/+".to_string(), 5); + sub_tree.insert("home/#".to_string(), 6); + sub_tree.insert("home/+/temperature".to_string(), 7); + sub_tree.insert("office/stairwell/temperature".to_string(), 8); + sub_tree.insert("office/+/+".to_string(), 9); + sub_tree.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); + sub_tree.insert("office/+/+/some_desk/+/#".to_string(), 11); + sub_tree.insert("sport/tennis/+".to_string(), 21); + sub_tree.insert("#".to_string(), 12); + + println!("{:#?}", sub_tree); + + sub_tree.matching_subscribers("home/kitchen", |s| { println!("{}", s); }); println!(); - sub_trie.matching_subscribers("home/kitchen/humidity".to_string(), |s| { + sub_tree.matching_subscribers("home/kitchen/humidity", |s| { println!("{}", s); }); println!(); - sub_trie.matching_subscribers("office/stairwell/temperature".to_string(), |s| { + sub_tree.matching_subscribers("office/stairwell/temperature", |s| { println!("{}", s); }); println!(); - sub_trie.matching_subscribers( - "office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature".to_string(), + sub_tree.matching_subscribers( + "office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature", |s| { println!("{}", s); }, @@ -254,52 +653,52 @@ mod tests { println!(); - sub_trie.matching_subscribers("home".to_string(), |s| { + sub_tree.matching_subscribers("home", |s| { println!("{}", s); }); println!(); - sub_trie.matching_subscribers("sport/tennis/player1".to_string(), |s| { + sub_tree.matching_subscribers("sport/tennis/player1", |s| { println!("{}", s); }); println!(); - sub_trie.matching_subscribers("sport/tennis/player2".to_string(), |s| { + sub_tree.matching_subscribers("sport/tennis/player2", |s| { println!("{}", s); }); println!(); - sub_trie.matching_subscribers("sport/tennis/player1/ranking".to_string(), |s| { + sub_tree.matching_subscribers("sport/tennis/player1/ranking", |s| { println!("{}", s); }); } #[test] fn test_remove() { - let mut sub_trie = SubscriptionTrie::new(); - sub_trie.insert("home/kitchen/temperature".to_string(), 1); - sub_trie.insert("home/kitchen/temperature".to_string(), 2); - sub_trie.insert("home/kitchen/humidity".to_string(), 1); - sub_trie.insert("home/kitchen/#".to_string(), 1); - sub_trie.insert("home/kitchen/+".to_string(), 3); - sub_trie.insert("home/kitchen/+".to_string(), 2); - sub_trie.insert("#".to_string(), 6); + let mut sub_tree = SubscriptionTree::new(); + sub_tree.insert("home/kitchen/temperature".to_string(), 1); + sub_tree.insert("home/kitchen/temperature".to_string(), 2); + sub_tree.insert("home/kitchen/humidity".to_string(), 1); + sub_tree.insert("home/kitchen/#".to_string(), 1); + sub_tree.insert("home/kitchen/+".to_string(), 3); + sub_tree.insert("home/kitchen/+".to_string(), 2); + sub_tree.insert("#".to_string(), 6); - println!("{:#?}", sub_trie); + println!("{:#?}", sub_tree); - sub_trie.remove("home/kitchen/temperature".to_string(), 1); - println!("{:#?}", sub_trie); + sub_tree.remove("home/kitchen/temperature".to_string(), 1); + println!("{:#?}", sub_tree); - sub_trie.remove("home/kitchen/temperature".to_string(), 2); - println!("{:#?}", sub_trie); + sub_tree.remove("home/kitchen/temperature".to_string(), 2); + println!("{:#?}", sub_tree); - sub_trie.remove("home/kitchen/#".to_string(), 1); - println!("{:#?}", sub_trie); + sub_tree.remove("home/kitchen/#".to_string(), 1); + println!("{:#?}", sub_tree); - sub_trie.remove("home/kitchen/+".to_string(), 3); - println!("{:#?}", sub_trie); + sub_tree.remove("home/kitchen/+".to_string(), 3); + println!("{:#?}", sub_tree); } } From bb78f02c2b76c10e1c7a5a6c63c41812de3437bd Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Sat, 18 Jan 2020 21:02:49 +0900 Subject: [PATCH 04/14] Update README --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 338b2bf..2856f1d 100644 --- a/README.md +++ b/README.md @@ -20,3 +20,9 @@ Testing ------- $ cargo test + +Code Format +----------- +The formatting options currently use nightly-only options. + + $ cargo +nightly fmt From 23edd50fbde1fab3ad1a06395cbbc7d6781be2bb Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Sat, 18 Jan 2020 21:16:41 +0900 Subject: [PATCH 05/14] Reorganize topic structure --- src/{topic.rs => topic/filter.rs} | 322 +----------------------------- src/topic/mod.rs | 17 ++ src/topic/tree.rs | 308 ++++++++++++++++++++++++++++ 3 files changed, 330 insertions(+), 317 deletions(-) rename src/{topic.rs => topic/filter.rs} (53%) create mode 100644 src/topic/mod.rs create mode 100644 src/topic/tree.rs diff --git a/src/topic.rs b/src/topic/filter.rs similarity index 53% rename from src/topic.rs rename to src/topic/filter.rs index 0cca149..6e827a4 100644 --- a/src/topic.rs +++ b/src/topic/filter.rs @@ -1,21 +1,8 @@ -use std::{ - collections::{hash_map::Entry, HashMap}, - str::FromStr, +use crate::topic::{ + MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, + TOPIC_SEPARATOR, }; - -const TOPIC_SEPARATOR: char = '/'; - -const MULTI_LEVEL_WILDCARD: char = '#'; -const MULTI_LEVEL_WILDCARD_STR: &'static str = "#"; - -const SINGLE_LEVEL_WILDCARD: char = '+'; -const SINGLE_LEVEL_WILDCARD_STR: &'static str = "+"; - -const SHARED_SUBSCRIPTION_PREFIX: &'static str = "$share/"; - -pub const MAX_TOPIC_LEN_BYTES: usize = 65_535; - -// TODO(bschwind) - Support shared subscriptions +use std::str::FromStr; /// A filter for subscribers to indicate which topics they want /// to receive messages from. Can contain wildcards. @@ -154,213 +141,9 @@ impl FromStr for TopicFilter { } } -#[derive(Debug)] -pub struct SubscriptionTree { - subscribers: Vec, - single_level_wildcards: Option>>, - multi_level_wildcards: Vec, - concrete_topic_levels: HashMap>, -} - -// TODO(bschwind) - All these topic strings need validation before -// operating on them. - -impl SubscriptionTree { - pub fn new() -> Self { - Self { - subscribers: Vec::new(), - single_level_wildcards: None, - multi_level_wildcards: Vec::new(), - concrete_topic_levels: HashMap::new(), - } - } - - pub fn is_empty(&self) -> bool { - return self.subscribers.is_empty() - && self.single_level_wildcards.is_none() - && self.multi_level_wildcards.is_empty() - && self.concrete_topic_levels.is_empty(); - } - - pub fn insert(&mut self, topic_filter: String, value: T) { - let mut current_tree = self; - let mut multi_level = false; - - for level in topic_filter.split(TOPIC_SEPARATOR) { - match level { - SINGLE_LEVEL_WILDCARD_STR => { - if current_tree.single_level_wildcards.is_some() { - current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); - } else { - current_tree.single_level_wildcards = - Some(Box::new(SubscriptionTree::new())); - current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); - } - }, - MULTI_LEVEL_WILDCARD_STR => { - multi_level = true; - break; - }, - concrete_topic_level => { - if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { - current_tree = current_tree - .concrete_topic_levels - .get_mut(concrete_topic_level) - .unwrap(); - } else { - current_tree - .concrete_topic_levels - .insert(concrete_topic_level.to_string(), SubscriptionTree::new()); - - // TODO - Do this without another hash lookup - current_tree = current_tree - .concrete_topic_levels - .get_mut(concrete_topic_level) - .unwrap(); - } - }, - } - } - - if multi_level { - current_tree.multi_level_wildcards.push(value); - } else { - current_tree.subscribers.push(value); - } - } - - pub fn remove(&mut self, topic_filter: String, value: T) -> Option { - let mut current_tree = self; - let mut stack: Vec<(*mut SubscriptionTree, usize)> = vec![]; - - let levels: Vec<&str> = topic_filter.split(TOPIC_SEPARATOR).collect(); - let mut level_index = 0; - - for level in &levels { - match *level { - SINGLE_LEVEL_WILDCARD_STR => { - if current_tree.single_level_wildcards.is_some() { - stack.push((&mut *current_tree, level_index)); - level_index += 1; - - current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); - } else { - return None; - } - }, - MULTI_LEVEL_WILDCARD_STR => { - break; - }, - concrete_topic_level => { - if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { - stack.push((&mut *current_tree, level_index)); - level_index += 1; - - current_tree = current_tree - .concrete_topic_levels - .get_mut(concrete_topic_level) - .unwrap(); - } else { - return None; - } - }, - } - } - - // Get the return value - let return_val = { - let level = &levels[levels.len() - 1]; - - if *level == MULTI_LEVEL_WILDCARD_STR { - if let Some(pos) = - current_tree.multi_level_wildcards.iter().position(|x| *x == value) - { - Some(current_tree.multi_level_wildcards.remove(pos)) - } else { - None - } - } else { - if let Some(pos) = current_tree.subscribers.iter().position(|x| *x == value) { - Some(current_tree.subscribers.remove(pos)) - } else { - None - } - } - }; - - // Go up the stack, cleaning up empty nodes - while let Some((stack_val, level_index)) = stack.pop() { - let mut tree = unsafe { &mut *stack_val }; - - let level = levels[level_index]; - - match level { - SINGLE_LEVEL_WILDCARD_STR => { - if tree.single_level_wildcards.as_ref().map(|t| t.is_empty()).unwrap_or(false) { - tree.single_level_wildcards = None; - } - }, - MULTI_LEVEL_WILDCARD_STR => { - // TODO - Ignore this case? - }, - concrete_topic_level => { - if let Entry::Occupied(o) = - tree.concrete_topic_levels.entry(concrete_topic_level.to_string()) - { - if o.get().is_empty() { - o.remove_entry(); - } - } - }, - } - } - - return_val - } - - pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) { - let mut tree_stack = vec![]; - let levels: Vec<&str> = topic_name.split(TOPIC_SEPARATOR).collect(); - - tree_stack.push((self, 0)); - - while !tree_stack.is_empty() { - let (current_tree, current_level) = tree_stack.pop().unwrap(); - let level = levels[current_level]; - - for subscriber in ¤t_tree.multi_level_wildcards { - sub_fn(subscriber); - } - - if let Some(sub_tree) = ¤t_tree.single_level_wildcards { - if current_level + 1 < levels.len() { - tree_stack.push((sub_tree, current_level + 1)); - } else { - for subscriber in &sub_tree.subscribers { - sub_fn(subscriber); - } - } - } - - if current_tree.concrete_topic_levels.contains_key(level) { - let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); - - if current_level + 1 < levels.len() { - let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); - tree_stack.push((sub_tree, current_level + 1)); - } else { - for subscriber in &sub_tree.subscribers { - sub_fn(subscriber); - } - } - } - } - } -} - #[cfg(test)] mod tests { - use crate::topic::{SubscriptionTree, TopicFilter, TopicParseError, MAX_TOPIC_LEN_BYTES}; + use crate::topic::{TopicFilter, TopicParseError, MAX_TOPIC_LEN_BYTES}; #[test] fn test_topic_filter_parse_empty_topic() { @@ -606,99 +389,4 @@ mod tests { TopicParseError::InvalidWildcardLevel ); } - - #[test] - fn test_insert() { - let mut sub_tree = SubscriptionTree::new(); - sub_tree.insert("home/kitchen/temperature".to_string(), 1); - sub_tree.insert("home/kitchen/humidity".to_string(), 2); - sub_tree.insert("home/kitchen".to_string(), 3); - sub_tree.insert("home/+/humidity".to_string(), 4); - sub_tree.insert("home/+".to_string(), 5); - sub_tree.insert("home/#".to_string(), 6); - sub_tree.insert("home/+/temperature".to_string(), 7); - sub_tree.insert("office/stairwell/temperature".to_string(), 8); - sub_tree.insert("office/+/+".to_string(), 9); - sub_tree.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); - sub_tree.insert("office/+/+/some_desk/+/#".to_string(), 11); - sub_tree.insert("sport/tennis/+".to_string(), 21); - sub_tree.insert("#".to_string(), 12); - - println!("{:#?}", sub_tree); - - sub_tree.matching_subscribers("home/kitchen", |s| { - println!("{}", s); - }); - - println!(); - - sub_tree.matching_subscribers("home/kitchen/humidity", |s| { - println!("{}", s); - }); - - println!(); - - sub_tree.matching_subscribers("office/stairwell/temperature", |s| { - println!("{}", s); - }); - - println!(); - - sub_tree.matching_subscribers( - "office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature", - |s| { - println!("{}", s); - }, - ); - - println!(); - - sub_tree.matching_subscribers("home", |s| { - println!("{}", s); - }); - - println!(); - - sub_tree.matching_subscribers("sport/tennis/player1", |s| { - println!("{}", s); - }); - - println!(); - - sub_tree.matching_subscribers("sport/tennis/player2", |s| { - println!("{}", s); - }); - - println!(); - - sub_tree.matching_subscribers("sport/tennis/player1/ranking", |s| { - println!("{}", s); - }); - } - - #[test] - fn test_remove() { - let mut sub_tree = SubscriptionTree::new(); - sub_tree.insert("home/kitchen/temperature".to_string(), 1); - sub_tree.insert("home/kitchen/temperature".to_string(), 2); - sub_tree.insert("home/kitchen/humidity".to_string(), 1); - sub_tree.insert("home/kitchen/#".to_string(), 1); - sub_tree.insert("home/kitchen/+".to_string(), 3); - sub_tree.insert("home/kitchen/+".to_string(), 2); - sub_tree.insert("#".to_string(), 6); - - println!("{:#?}", sub_tree); - - sub_tree.remove("home/kitchen/temperature".to_string(), 1); - println!("{:#?}", sub_tree); - - sub_tree.remove("home/kitchen/temperature".to_string(), 2); - println!("{:#?}", sub_tree); - - sub_tree.remove("home/kitchen/#".to_string(), 1); - println!("{:#?}", sub_tree); - - sub_tree.remove("home/kitchen/+".to_string(), 3); - println!("{:#?}", sub_tree); - } } diff --git a/src/topic/mod.rs b/src/topic/mod.rs new file mode 100644 index 0000000..dd1858d --- /dev/null +++ b/src/topic/mod.rs @@ -0,0 +1,17 @@ +const TOPIC_SEPARATOR: char = '/'; + +const MULTI_LEVEL_WILDCARD: char = '#'; +const MULTI_LEVEL_WILDCARD_STR: &'static str = "#"; + +const SINGLE_LEVEL_WILDCARD: char = '+'; +const SINGLE_LEVEL_WILDCARD_STR: &'static str = "+"; + +const SHARED_SUBSCRIPTION_PREFIX: &'static str = "$share/"; + +pub const MAX_TOPIC_LEN_BYTES: usize = 65_535; + +mod filter; +mod tree; + +pub use filter::*; +pub use tree::*; diff --git a/src/topic/tree.rs b/src/topic/tree.rs new file mode 100644 index 0000000..5f8c4ba --- /dev/null +++ b/src/topic/tree.rs @@ -0,0 +1,308 @@ +use crate::topic::{MULTI_LEVEL_WILDCARD_STR, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR}; +use std::collections::{hash_map::Entry, HashMap}; + +// TODO(bschwind) - Support shared subscriptions + +#[derive(Debug)] +pub struct SubscriptionTree { + subscribers: Vec, + single_level_wildcards: Option>>, + multi_level_wildcards: Vec, + concrete_topic_levels: HashMap>, +} + +// TODO(bschwind) - All these topic strings need validation before +// operating on them. + +impl SubscriptionTree { + pub fn new() -> Self { + Self { + subscribers: Vec::new(), + single_level_wildcards: None, + multi_level_wildcards: Vec::new(), + concrete_topic_levels: HashMap::new(), + } + } + + pub fn is_empty(&self) -> bool { + return self.subscribers.is_empty() + && self.single_level_wildcards.is_none() + && self.multi_level_wildcards.is_empty() + && self.concrete_topic_levels.is_empty(); + } + + pub fn insert(&mut self, topic_filter: String, value: T) { + let mut current_tree = self; + let mut multi_level = false; + + for level in topic_filter.split(TOPIC_SEPARATOR) { + match level { + SINGLE_LEVEL_WILDCARD_STR => { + if current_tree.single_level_wildcards.is_some() { + current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); + } else { + current_tree.single_level_wildcards = + Some(Box::new(SubscriptionTree::new())); + current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); + } + }, + MULTI_LEVEL_WILDCARD_STR => { + multi_level = true; + break; + }, + concrete_topic_level => { + if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { + current_tree = current_tree + .concrete_topic_levels + .get_mut(concrete_topic_level) + .unwrap(); + } else { + current_tree + .concrete_topic_levels + .insert(concrete_topic_level.to_string(), SubscriptionTree::new()); + + // TODO - Do this without another hash lookup + current_tree = current_tree + .concrete_topic_levels + .get_mut(concrete_topic_level) + .unwrap(); + } + }, + } + } + + if multi_level { + current_tree.multi_level_wildcards.push(value); + } else { + current_tree.subscribers.push(value); + } + } + + pub fn remove(&mut self, topic_filter: String, value: T) -> Option { + let mut current_tree = self; + let mut stack: Vec<(*mut SubscriptionTree, usize)> = vec![]; + + let levels: Vec<&str> = topic_filter.split(TOPIC_SEPARATOR).collect(); + let mut level_index = 0; + + for level in &levels { + match *level { + SINGLE_LEVEL_WILDCARD_STR => { + if current_tree.single_level_wildcards.is_some() { + stack.push((&mut *current_tree, level_index)); + level_index += 1; + + current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); + } else { + return None; + } + }, + MULTI_LEVEL_WILDCARD_STR => { + break; + }, + concrete_topic_level => { + if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { + stack.push((&mut *current_tree, level_index)); + level_index += 1; + + current_tree = current_tree + .concrete_topic_levels + .get_mut(concrete_topic_level) + .unwrap(); + } else { + return None; + } + }, + } + } + + // Get the return value + let return_val = { + let level = &levels[levels.len() - 1]; + + if *level == MULTI_LEVEL_WILDCARD_STR { + if let Some(pos) = + current_tree.multi_level_wildcards.iter().position(|x| *x == value) + { + Some(current_tree.multi_level_wildcards.remove(pos)) + } else { + None + } + } else { + if let Some(pos) = current_tree.subscribers.iter().position(|x| *x == value) { + Some(current_tree.subscribers.remove(pos)) + } else { + None + } + } + }; + + // Go up the stack, cleaning up empty nodes + while let Some((stack_val, level_index)) = stack.pop() { + let mut tree = unsafe { &mut *stack_val }; + + let level = levels[level_index]; + + match level { + SINGLE_LEVEL_WILDCARD_STR => { + if tree.single_level_wildcards.as_ref().map(|t| t.is_empty()).unwrap_or(false) { + tree.single_level_wildcards = None; + } + }, + MULTI_LEVEL_WILDCARD_STR => { + // TODO - Ignore this case? + }, + concrete_topic_level => { + if let Entry::Occupied(o) = + tree.concrete_topic_levels.entry(concrete_topic_level.to_string()) + { + if o.get().is_empty() { + o.remove_entry(); + } + } + }, + } + } + + return_val + } + + pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) { + let mut tree_stack = vec![]; + let levels: Vec<&str> = topic_name.split(TOPIC_SEPARATOR).collect(); + + tree_stack.push((self, 0)); + + while !tree_stack.is_empty() { + let (current_tree, current_level) = tree_stack.pop().unwrap(); + let level = levels[current_level]; + + for subscriber in ¤t_tree.multi_level_wildcards { + sub_fn(subscriber); + } + + if let Some(sub_tree) = ¤t_tree.single_level_wildcards { + if current_level + 1 < levels.len() { + tree_stack.push((sub_tree, current_level + 1)); + } else { + for subscriber in &sub_tree.subscribers { + sub_fn(subscriber); + } + } + } + + if current_tree.concrete_topic_levels.contains_key(level) { + let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); + + if current_level + 1 < levels.len() { + let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); + tree_stack.push((sub_tree, current_level + 1)); + } else { + for subscriber in &sub_tree.subscribers { + sub_fn(subscriber); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::topic::SubscriptionTree; + + #[test] + fn test_insert() { + let mut sub_tree = SubscriptionTree::new(); + sub_tree.insert("home/kitchen/temperature".to_string(), 1); + sub_tree.insert("home/kitchen/humidity".to_string(), 2); + sub_tree.insert("home/kitchen".to_string(), 3); + sub_tree.insert("home/+/humidity".to_string(), 4); + sub_tree.insert("home/+".to_string(), 5); + sub_tree.insert("home/#".to_string(), 6); + sub_tree.insert("home/+/temperature".to_string(), 7); + sub_tree.insert("office/stairwell/temperature".to_string(), 8); + sub_tree.insert("office/+/+".to_string(), 9); + sub_tree.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); + sub_tree.insert("office/+/+/some_desk/+/#".to_string(), 11); + sub_tree.insert("sport/tennis/+".to_string(), 21); + sub_tree.insert("#".to_string(), 12); + + println!("{:#?}", sub_tree); + + sub_tree.matching_subscribers("home/kitchen", |s| { + println!("{}", s); + }); + + println!(); + + sub_tree.matching_subscribers("home/kitchen/humidity", |s| { + println!("{}", s); + }); + + println!(); + + sub_tree.matching_subscribers("office/stairwell/temperature", |s| { + println!("{}", s); + }); + + println!(); + + sub_tree.matching_subscribers( + "office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature", + |s| { + println!("{}", s); + }, + ); + + println!(); + + sub_tree.matching_subscribers("home", |s| { + println!("{}", s); + }); + + println!(); + + sub_tree.matching_subscribers("sport/tennis/player1", |s| { + println!("{}", s); + }); + + println!(); + + sub_tree.matching_subscribers("sport/tennis/player2", |s| { + println!("{}", s); + }); + + println!(); + + sub_tree.matching_subscribers("sport/tennis/player1/ranking", |s| { + println!("{}", s); + }); + } + + #[test] + fn test_remove() { + let mut sub_tree = SubscriptionTree::new(); + sub_tree.insert("home/kitchen/temperature".to_string(), 1); + sub_tree.insert("home/kitchen/temperature".to_string(), 2); + sub_tree.insert("home/kitchen/humidity".to_string(), 1); + sub_tree.insert("home/kitchen/#".to_string(), 1); + sub_tree.insert("home/kitchen/+".to_string(), 3); + sub_tree.insert("home/kitchen/+".to_string(), 2); + sub_tree.insert("#".to_string(), 6); + + println!("{:#?}", sub_tree); + + sub_tree.remove("home/kitchen/temperature".to_string(), 1); + println!("{:#?}", sub_tree); + + sub_tree.remove("home/kitchen/temperature".to_string(), 2); + println!("{:#?}", sub_tree); + + sub_tree.remove("home/kitchen/#".to_string(), 1); + println!("{:#?}", sub_tree); + + sub_tree.remove("home/kitchen/+".to_string(), 3); + println!("{:#?}", sub_tree); + } +} From 32fbed7fdfd5248ecfc3547aed4db9deb86424c9 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Wed, 22 Jan 2020 22:30:30 +0900 Subject: [PATCH 06/14] Add a TopicLevels iterator --- src/topic/filter.rs | 110 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/src/topic/filter.rs b/src/topic/filter.rs index 6e827a4..eb619e1 100644 --- a/src/topic/filter.rs +++ b/src/topic/filter.rs @@ -22,6 +22,13 @@ pub struct Topic { level_count: u32, } +#[derive(Debug, PartialEq)] +pub enum TopicLevel<'a> { + Concrete(&'a str), + SingleLevelWildcard, + MultiLevelWildcard, +} + #[derive(Debug, PartialEq)] pub enum TopicParseError { EmptyTopic, @@ -141,9 +148,41 @@ impl FromStr for TopicFilter { } } +pub struct TopicLevels<'a> { + levels_iter: std::str::Split<'a, char>, +} + +impl<'a> TopicFilter { + fn filter(&'a self) -> &'a str { + match self { + TopicFilter::Concrete { filter, .. } => filter, + TopicFilter::Wildcard { filter, .. } => filter, + TopicFilter::SharedConcrete { filter, .. } => filter, + TopicFilter::SharedWildcard { filter, .. } => filter, + } + } + + pub fn levels(&'a self) -> TopicLevels<'a> { + TopicLevels { levels_iter: self.filter().split(TOPIC_SEPARATOR) } + } +} + +impl<'a> Iterator for TopicLevels<'a> { + type Item = TopicLevel<'a>; + + fn next(&mut self) -> Option { + match self.levels_iter.next() { + Some("#") => Some(TopicLevel::MultiLevelWildcard), + Some("+") => Some(TopicLevel::SingleLevelWildcard), + Some(level) => Some(TopicLevel::Concrete(level)), + None => None, + } + } +} + #[cfg(test)] mod tests { - use crate::topic::{TopicFilter, TopicParseError, MAX_TOPIC_LEN_BYTES}; + use crate::topic::{TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES}; #[test] fn test_topic_filter_parse_empty_topic() { @@ -389,4 +428,73 @@ mod tests { TopicParseError::InvalidWildcardLevel ); } + + #[test] + fn test_topic_filter_level_iterator_simple() { + let filter: TopicFilter = "/".parse().unwrap(); + + let mut levels = filter.levels(); + + assert_eq!(levels.next(), Some(TopicLevel::Concrete(""))); + assert_eq!(levels.next(), Some(TopicLevel::Concrete(""))); + assert_eq!(levels.next(), None); + } + + #[test] + fn test_topic_filter_level_iterator_concrete() { + let filter: TopicFilter = "home/kitchen/temperature".parse().unwrap(); + + let mut levels = filter.levels(); + + assert_eq!(levels.next(), Some(TopicLevel::Concrete("home"))); + assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen"))); + assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature"))); + assert_eq!(levels.next(), None); + } + + #[test] + fn test_topic_filter_level_iterator_single_level_wildcard_1() { + let filter: TopicFilter = "home/+/+/temperature/+".parse().unwrap(); + + let mut levels = filter.levels(); + + assert_eq!(levels.next(), Some(TopicLevel::Concrete("home"))); + assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard)); + assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard)); + assert_eq!(levels.next(), Some(TopicLevel::Concrete("temperature"))); + assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard)); + assert_eq!(levels.next(), None); + } + + #[test] + fn test_topic_filter_level_iterator_single_level_wildcard_2() { + let filter: TopicFilter = "+".parse().unwrap(); + + let mut levels = filter.levels(); + + assert_eq!(levels.next(), Some(TopicLevel::SingleLevelWildcard)); + assert_eq!(levels.next(), None); + } + + #[test] + fn test_topic_filter_level_iterator_mutli_level_wildcard_1() { + let filter: TopicFilter = "home/kitchen/#".parse().unwrap(); + + let mut levels = filter.levels(); + + assert_eq!(levels.next(), Some(TopicLevel::Concrete("home"))); + assert_eq!(levels.next(), Some(TopicLevel::Concrete("kitchen"))); + assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard)); + assert_eq!(levels.next(), None); + } + + #[test] + fn test_topic_filter_level_iterator_mutli_level_wildcard_2() { + let filter: TopicFilter = "#".parse().unwrap(); + + let mut levels = filter.levels(); + + assert_eq!(levels.next(), Some(TopicLevel::MultiLevelWildcard)); + assert_eq!(levels.next(), None); + } } From 236699c31d651e369dfd77add066cc0b67798483 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Thu, 23 Jan 2020 00:05:45 +0900 Subject: [PATCH 07/14] Use TopicFilter type instead of String in SubscriptionTree --- src/topic/tree.rs | 55 +++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/src/topic/tree.rs b/src/topic/tree.rs index 5f8c4ba..1df0d35 100644 --- a/src/topic/tree.rs +++ b/src/topic/tree.rs @@ -1,4 +1,7 @@ -use crate::topic::{MULTI_LEVEL_WILDCARD_STR, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR}; +use crate::topic::{ + filter::{TopicFilter, TopicLevel}, + MULTI_LEVEL_WILDCARD_STR, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR, +}; use std::collections::{hash_map::Entry, HashMap}; // TODO(bschwind) - Support shared subscriptions @@ -31,13 +34,13 @@ impl SubscriptionTree { && self.concrete_topic_levels.is_empty(); } - pub fn insert(&mut self, topic_filter: String, value: T) { + pub fn insert(&mut self, topic_filter: TopicFilter, value: T) { let mut current_tree = self; let mut multi_level = false; - for level in topic_filter.split(TOPIC_SEPARATOR) { + for level in topic_filter.levels() { match level { - SINGLE_LEVEL_WILDCARD_STR => { + TopicLevel::SingleLevelWildcard => { if current_tree.single_level_wildcards.is_some() { current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } else { @@ -46,11 +49,11 @@ impl SubscriptionTree { current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } }, - MULTI_LEVEL_WILDCARD_STR => { + TopicLevel::MultiLevelWildcard => { multi_level = true; break; }, - concrete_topic_level => { + TopicLevel::Concrete(concrete_topic_level) => { if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { current_tree = current_tree .concrete_topic_levels @@ -214,19 +217,19 @@ mod tests { #[test] fn test_insert() { let mut sub_tree = SubscriptionTree::new(); - sub_tree.insert("home/kitchen/temperature".to_string(), 1); - sub_tree.insert("home/kitchen/humidity".to_string(), 2); - sub_tree.insert("home/kitchen".to_string(), 3); - sub_tree.insert("home/+/humidity".to_string(), 4); - sub_tree.insert("home/+".to_string(), 5); - sub_tree.insert("home/#".to_string(), 6); - sub_tree.insert("home/+/temperature".to_string(), 7); - sub_tree.insert("office/stairwell/temperature".to_string(), 8); - sub_tree.insert("office/+/+".to_string(), 9); - sub_tree.insert("office/+/+/some_desk/+/fan_speed/+/temperature".to_string(), 10); - sub_tree.insert("office/+/+/some_desk/+/#".to_string(), 11); - sub_tree.insert("sport/tennis/+".to_string(), 21); - sub_tree.insert("#".to_string(), 12); + sub_tree.insert("home/kitchen/temperature".parse().unwrap(), 1); + sub_tree.insert("home/kitchen/humidity".parse().unwrap(), 2); + sub_tree.insert("home/kitchen".parse().unwrap(), 3); + sub_tree.insert("home/+/humidity".parse().unwrap(), 4); + sub_tree.insert("home/+".parse().unwrap(), 5); + sub_tree.insert("home/#".parse().unwrap(), 6); + sub_tree.insert("home/+/temperature".parse().unwrap(), 7); + sub_tree.insert("office/stairwell/temperature".parse().unwrap(), 8); + sub_tree.insert("office/+/+".parse().unwrap(), 9); + sub_tree.insert("office/+/+/some_desk/+/fan_speed/+/temperature".parse().unwrap(), 10); + sub_tree.insert("office/+/+/some_desk/+/#".parse().unwrap(), 11); + sub_tree.insert("sport/tennis/+".parse().unwrap(), 21); + sub_tree.insert("#".parse().unwrap(), 12); println!("{:#?}", sub_tree); @@ -283,13 +286,13 @@ mod tests { #[test] fn test_remove() { let mut sub_tree = SubscriptionTree::new(); - sub_tree.insert("home/kitchen/temperature".to_string(), 1); - sub_tree.insert("home/kitchen/temperature".to_string(), 2); - sub_tree.insert("home/kitchen/humidity".to_string(), 1); - sub_tree.insert("home/kitchen/#".to_string(), 1); - sub_tree.insert("home/kitchen/+".to_string(), 3); - sub_tree.insert("home/kitchen/+".to_string(), 2); - sub_tree.insert("#".to_string(), 6); + sub_tree.insert("home/kitchen/temperature".parse().unwrap(), 1); + sub_tree.insert("home/kitchen/temperature".parse().unwrap(), 2); + sub_tree.insert("home/kitchen/humidity".parse().unwrap(), 1); + sub_tree.insert("home/kitchen/#".parse().unwrap(), 1); + sub_tree.insert("home/kitchen/+".parse().unwrap(), 3); + sub_tree.insert("home/kitchen/+".parse().unwrap(), 2); + sub_tree.insert("#".parse().unwrap(), 6); println!("{:#?}", sub_tree); From 70a46e002cd92ba8bd42a05bcd171b363096e4d7 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Thu, 23 Jan 2020 00:08:37 +0900 Subject: [PATCH 08/14] Use defined const strings instead of hardcoded strings --- src/topic/filter.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/topic/filter.rs b/src/topic/filter.rs index eb619e1..a336081 100644 --- a/src/topic/filter.rs +++ b/src/topic/filter.rs @@ -1,6 +1,6 @@ use crate::topic::{ - MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, - TOPIC_SEPARATOR, + MAX_TOPIC_LEN_BYTES, MULTI_LEVEL_WILDCARD, MULTI_LEVEL_WILDCARD_STR, + SHARED_SUBSCRIPTION_PREFIX, SINGLE_LEVEL_WILDCARD, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR, }; use std::str::FromStr; @@ -172,8 +172,8 @@ impl<'a> Iterator for TopicLevels<'a> { fn next(&mut self) -> Option { match self.levels_iter.next() { - Some("#") => Some(TopicLevel::MultiLevelWildcard), - Some("+") => Some(TopicLevel::SingleLevelWildcard), + Some(MULTI_LEVEL_WILDCARD_STR) => Some(TopicLevel::MultiLevelWildcard), + Some(SINGLE_LEVEL_WILDCARD_STR) => Some(TopicLevel::SingleLevelWildcard), Some(level) => Some(TopicLevel::Concrete(level)), None => None, } From 06739f1a211d0a54fffc01c672d77143dd24957c Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Thu, 23 Jan 2020 00:23:53 +0900 Subject: [PATCH 09/14] Use a u64 for subscriber identification --- src/topic/tree.rs | 93 +++++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 32 deletions(-) diff --git a/src/topic/tree.rs b/src/topic/tree.rs index 1df0d35..5ac5c1b 100644 --- a/src/topic/tree.rs +++ b/src/topic/tree.rs @@ -6,19 +6,47 @@ use std::collections::{hash_map::Entry, HashMap}; // TODO(bschwind) - Support shared subscriptions +#[derive(Debug)] +pub struct SubscriptionTreeNode { + subscribers: Vec<(u64, T)>, + single_level_wildcards: Option>>, + multi_level_wildcards: Vec<(u64, T)>, + concrete_topic_levels: HashMap>, +} + #[derive(Debug)] pub struct SubscriptionTree { - subscribers: Vec, - single_level_wildcards: Option>>, - multi_level_wildcards: Vec, - concrete_topic_levels: HashMap>, + root: SubscriptionTreeNode, + counter: u64, +} + +impl SubscriptionTree { + pub fn new() -> Self { + Self { root: SubscriptionTreeNode::new(), counter: 0 } + } + + pub fn insert(&mut self, topic_filter: TopicFilter, value: T) -> u64 { + let counter = self.counter; + self.root.insert(topic_filter, value, counter); + self.counter += 1; + + counter + } + + pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, sub_fn: F) { + self.root.matching_subscribers(topic_name, sub_fn) + } + + pub fn remove(&mut self, topic_filter: String, counter: u64) -> Option { + self.root.remove(topic_filter, counter) + } } // TODO(bschwind) - All these topic strings need validation before // operating on them. -impl SubscriptionTree { - pub fn new() -> Self { +impl SubscriptionTreeNode { + fn new() -> Self { Self { subscribers: Vec::new(), single_level_wildcards: None, @@ -27,14 +55,14 @@ impl SubscriptionTree { } } - pub fn is_empty(&self) -> bool { + fn is_empty(&self) -> bool { return self.subscribers.is_empty() && self.single_level_wildcards.is_none() && self.multi_level_wildcards.is_empty() && self.concrete_topic_levels.is_empty(); } - pub fn insert(&mut self, topic_filter: TopicFilter, value: T) { + fn insert(&mut self, topic_filter: TopicFilter, value: T, counter: u64) { let mut current_tree = self; let mut multi_level = false; @@ -45,7 +73,7 @@ impl SubscriptionTree { current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } else { current_tree.single_level_wildcards = - Some(Box::new(SubscriptionTree::new())); + Some(Box::new(SubscriptionTreeNode::new())); current_tree = current_tree.single_level_wildcards.as_mut().unwrap(); } }, @@ -62,7 +90,7 @@ impl SubscriptionTree { } else { current_tree .concrete_topic_levels - .insert(concrete_topic_level.to_string(), SubscriptionTree::new()); + .insert(concrete_topic_level.to_string(), SubscriptionTreeNode::new()); // TODO - Do this without another hash lookup current_tree = current_tree @@ -75,15 +103,15 @@ impl SubscriptionTree { } if multi_level { - current_tree.multi_level_wildcards.push(value); + current_tree.multi_level_wildcards.push((counter, value)); } else { - current_tree.subscribers.push(value); + current_tree.subscribers.push((counter, value)); } } - pub fn remove(&mut self, topic_filter: String, value: T) -> Option { + fn remove(&mut self, topic_filter: String, counter: u64) -> Option { let mut current_tree = self; - let mut stack: Vec<(*mut SubscriptionTree, usize)> = vec![]; + let mut stack: Vec<(*mut SubscriptionTreeNode, usize)> = vec![]; let levels: Vec<&str> = topic_filter.split(TOPIC_SEPARATOR).collect(); let mut level_index = 0; @@ -125,14 +153,15 @@ impl SubscriptionTree { if *level == MULTI_LEVEL_WILDCARD_STR { if let Some(pos) = - current_tree.multi_level_wildcards.iter().position(|x| *x == value) + current_tree.multi_level_wildcards.iter().position(|(c, _)| *c == counter) { Some(current_tree.multi_level_wildcards.remove(pos)) } else { None } } else { - if let Some(pos) = current_tree.subscribers.iter().position(|x| *x == value) { + if let Some(pos) = current_tree.subscribers.iter().position(|(c, _)| *c == counter) + { Some(current_tree.subscribers.remove(pos)) } else { None @@ -167,10 +196,10 @@ impl SubscriptionTree { } } - return_val + return_val.map(|(_, val)| val) } - pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) { + fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) { let mut tree_stack = vec![]; let levels: Vec<&str> = topic_name.split(TOPIC_SEPARATOR).collect(); @@ -180,7 +209,7 @@ impl SubscriptionTree { let (current_tree, current_level) = tree_stack.pop().unwrap(); let level = levels[current_level]; - for subscriber in ¤t_tree.multi_level_wildcards { + for (_, subscriber) in ¤t_tree.multi_level_wildcards { sub_fn(subscriber); } @@ -188,7 +217,7 @@ impl SubscriptionTree { if current_level + 1 < levels.len() { tree_stack.push((sub_tree, current_level + 1)); } else { - for subscriber in &sub_tree.subscribers { + for (_, subscriber) in &sub_tree.subscribers { sub_fn(subscriber); } } @@ -201,7 +230,7 @@ impl SubscriptionTree { let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); tree_stack.push((sub_tree, current_level + 1)); } else { - for subscriber in &sub_tree.subscribers { + for (_, subscriber) in &sub_tree.subscribers { sub_fn(subscriber); } } @@ -286,26 +315,26 @@ mod tests { #[test] fn test_remove() { let mut sub_tree = SubscriptionTree::new(); - sub_tree.insert("home/kitchen/temperature".parse().unwrap(), 1); - sub_tree.insert("home/kitchen/temperature".parse().unwrap(), 2); - sub_tree.insert("home/kitchen/humidity".parse().unwrap(), 1); - sub_tree.insert("home/kitchen/#".parse().unwrap(), 1); - sub_tree.insert("home/kitchen/+".parse().unwrap(), 3); - sub_tree.insert("home/kitchen/+".parse().unwrap(), 2); - sub_tree.insert("#".parse().unwrap(), 6); + let sub_1 = sub_tree.insert("home/kitchen/temperature".parse().unwrap(), "sub_1"); + let sub_2 = sub_tree.insert("home/kitchen/temperature".parse().unwrap(), "sub_2"); + let _sub_3 = sub_tree.insert("home/kitchen/humidity".parse().unwrap(), "sub_3"); + let sub_4 = sub_tree.insert("home/kitchen/#".parse().unwrap(), "sub_4"); + let sub_5 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_5"); + let _sub_6 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_6"); + let _sub_7 = sub_tree.insert("#".parse().unwrap(), "sub_7"); println!("{:#?}", sub_tree); - sub_tree.remove("home/kitchen/temperature".to_string(), 1); + sub_tree.remove("home/kitchen/temperature".to_string(), sub_1); println!("{:#?}", sub_tree); - sub_tree.remove("home/kitchen/temperature".to_string(), 2); + sub_tree.remove("home/kitchen/temperature".to_string(), sub_2); println!("{:#?}", sub_tree); - sub_tree.remove("home/kitchen/#".to_string(), 1); + sub_tree.remove("home/kitchen/#".to_string(), sub_4); println!("{:#?}", sub_tree); - sub_tree.remove("home/kitchen/+".to_string(), 3); + sub_tree.remove("home/kitchen/+".to_string(), sub_5); println!("{:#?}", sub_tree); } } From cb022feefa88f43d7f83ca4aa424c8656548e9ce Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Fri, 24 Jan 2020 22:14:06 +0900 Subject: [PATCH 10/14] Improve tree remove tests --- src/topic/tree.rs | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/topic/tree.rs b/src/topic/tree.rs index 5ac5c1b..505a8b4 100644 --- a/src/topic/tree.rs +++ b/src/topic/tree.rs @@ -40,6 +40,10 @@ impl SubscriptionTree { pub fn remove(&mut self, topic_filter: String, counter: u64) -> Option { self.root.remove(topic_filter, counter) } + + fn is_empty(&self) -> bool { + self.root.is_empty() + } } // TODO(bschwind) - All these topic strings need validation before @@ -317,24 +321,26 @@ mod tests { let mut sub_tree = SubscriptionTree::new(); let sub_1 = sub_tree.insert("home/kitchen/temperature".parse().unwrap(), "sub_1"); let sub_2 = sub_tree.insert("home/kitchen/temperature".parse().unwrap(), "sub_2"); - let _sub_3 = sub_tree.insert("home/kitchen/humidity".parse().unwrap(), "sub_3"); + let sub_3 = sub_tree.insert("home/kitchen/humidity".parse().unwrap(), "sub_3"); let sub_4 = sub_tree.insert("home/kitchen/#".parse().unwrap(), "sub_4"); let sub_5 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_5"); - let _sub_6 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_6"); - let _sub_7 = sub_tree.insert("#".parse().unwrap(), "sub_7"); + let sub_6 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_6"); + let sub_7 = sub_tree.insert("#".parse().unwrap(), "sub_7"); - println!("{:#?}", sub_tree); + assert!(!sub_tree.is_empty()); - sub_tree.remove("home/kitchen/temperature".to_string(), sub_1); - println!("{:#?}", sub_tree); + assert!(sub_tree.remove("#".to_string(), sub_1).is_none()); - sub_tree.remove("home/kitchen/temperature".to_string(), sub_2); - println!("{:#?}", sub_tree); + assert_eq!(sub_tree.remove("home/kitchen/temperature".to_string(), sub_1).unwrap(), "sub_1"); + assert_eq!(sub_tree.remove("home/kitchen/temperature".to_string(), sub_2).unwrap(), "sub_2"); + assert_eq!(sub_tree.remove("home/kitchen/#".to_string(), sub_4).unwrap(), "sub_4"); + assert_eq!(sub_tree.remove("home/kitchen/+".to_string(), sub_5).unwrap(), "sub_5"); + assert_eq!(sub_tree.remove("home/kitchen/humidity".to_string(), sub_3).unwrap(), "sub_3"); + assert_eq!(sub_tree.remove("#".to_string(), sub_7).unwrap(), "sub_7"); + assert_eq!(sub_tree.remove("home/kitchen/+".to_string(), sub_6).unwrap(), "sub_6"); - sub_tree.remove("home/kitchen/#".to_string(), sub_4); - println!("{:#?}", sub_tree); + assert!(sub_tree.is_empty()); - sub_tree.remove("home/kitchen/+".to_string(), sub_5); - println!("{:#?}", sub_tree); + assert!(sub_tree.remove("home/kitchen/+".to_string(), sub_6).is_none()); } } From a1318fba5f7db59eda3edbddb9152f3759537bd4 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Fri, 24 Jan 2020 22:24:56 +0900 Subject: [PATCH 11/14] Fix SubscriptionTree::remove() to use TopicFilter type --- src/topic/tree.rs | 101 +++++++++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/src/topic/tree.rs b/src/topic/tree.rs index 505a8b4..01d8bea 100644 --- a/src/topic/tree.rs +++ b/src/topic/tree.rs @@ -1,6 +1,6 @@ use crate::topic::{ filter::{TopicFilter, TopicLevel}, - MULTI_LEVEL_WILDCARD_STR, SINGLE_LEVEL_WILDCARD_STR, TOPIC_SEPARATOR, + TOPIC_SEPARATOR, }; use std::collections::{hash_map::Entry, HashMap}; @@ -25,7 +25,7 @@ impl SubscriptionTree { Self { root: SubscriptionTreeNode::new(), counter: 0 } } - pub fn insert(&mut self, topic_filter: TopicFilter, value: T) -> u64 { + pub fn insert(&mut self, topic_filter: &TopicFilter, value: T) -> u64 { let counter = self.counter; self.root.insert(topic_filter, value, counter); self.counter += 1; @@ -37,7 +37,7 @@ impl SubscriptionTree { self.root.matching_subscribers(topic_name, sub_fn) } - pub fn remove(&mut self, topic_filter: String, counter: u64) -> Option { + pub fn remove(&mut self, topic_filter: &TopicFilter, counter: u64) -> Option { self.root.remove(topic_filter, counter) } @@ -66,7 +66,7 @@ impl SubscriptionTreeNode { && self.concrete_topic_levels.is_empty(); } - fn insert(&mut self, topic_filter: TopicFilter, value: T, counter: u64) { + fn insert(&mut self, topic_filter: &TopicFilter, value: T, counter: u64) { let mut current_tree = self; let mut multi_level = false; @@ -113,16 +113,16 @@ impl SubscriptionTreeNode { } } - fn remove(&mut self, topic_filter: String, counter: u64) -> Option { + fn remove(&mut self, topic_filter: &TopicFilter, counter: u64) -> Option { let mut current_tree = self; let mut stack: Vec<(*mut SubscriptionTreeNode, usize)> = vec![]; - let levels: Vec<&str> = topic_filter.split(TOPIC_SEPARATOR).collect(); + let levels: Vec = topic_filter.levels().collect(); let mut level_index = 0; for level in &levels { - match *level { - SINGLE_LEVEL_WILDCARD_STR => { + match level { + TopicLevel::SingleLevelWildcard => { if current_tree.single_level_wildcards.is_some() { stack.push((&mut *current_tree, level_index)); level_index += 1; @@ -132,17 +132,17 @@ impl SubscriptionTreeNode { return None; } }, - MULTI_LEVEL_WILDCARD_STR => { + TopicLevel::MultiLevelWildcard => { break; }, - concrete_topic_level => { - if current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { + TopicLevel::Concrete(concrete_topic_level) => { + if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { stack.push((&mut *current_tree, level_index)); level_index += 1; current_tree = current_tree .concrete_topic_levels - .get_mut(concrete_topic_level) + .get_mut(*concrete_topic_level) .unwrap(); } else { return None; @@ -155,7 +155,7 @@ impl SubscriptionTreeNode { let return_val = { let level = &levels[levels.len() - 1]; - if *level == MULTI_LEVEL_WILDCARD_STR { + if *level == TopicLevel::MultiLevelWildcard { if let Some(pos) = current_tree.multi_level_wildcards.iter().position(|(c, _)| *c == counter) { @@ -177,18 +177,18 @@ impl SubscriptionTreeNode { while let Some((stack_val, level_index)) = stack.pop() { let mut tree = unsafe { &mut *stack_val }; - let level = levels[level_index]; + let level = &levels[level_index]; match level { - SINGLE_LEVEL_WILDCARD_STR => { + TopicLevel::SingleLevelWildcard => { if tree.single_level_wildcards.as_ref().map(|t| t.is_empty()).unwrap_or(false) { tree.single_level_wildcards = None; } }, - MULTI_LEVEL_WILDCARD_STR => { + TopicLevel::MultiLevelWildcard => { // TODO - Ignore this case? }, - concrete_topic_level => { + TopicLevel::Concrete(concrete_topic_level) => { if let Entry::Occupied(o) = tree.concrete_topic_levels.entry(concrete_topic_level.to_string()) { @@ -250,19 +250,19 @@ mod tests { #[test] fn test_insert() { let mut sub_tree = SubscriptionTree::new(); - sub_tree.insert("home/kitchen/temperature".parse().unwrap(), 1); - sub_tree.insert("home/kitchen/humidity".parse().unwrap(), 2); - sub_tree.insert("home/kitchen".parse().unwrap(), 3); - sub_tree.insert("home/+/humidity".parse().unwrap(), 4); - sub_tree.insert("home/+".parse().unwrap(), 5); - sub_tree.insert("home/#".parse().unwrap(), 6); - sub_tree.insert("home/+/temperature".parse().unwrap(), 7); - sub_tree.insert("office/stairwell/temperature".parse().unwrap(), 8); - sub_tree.insert("office/+/+".parse().unwrap(), 9); - sub_tree.insert("office/+/+/some_desk/+/fan_speed/+/temperature".parse().unwrap(), 10); - sub_tree.insert("office/+/+/some_desk/+/#".parse().unwrap(), 11); - sub_tree.insert("sport/tennis/+".parse().unwrap(), 21); - sub_tree.insert("#".parse().unwrap(), 12); + sub_tree.insert(&"home/kitchen/temperature".parse().unwrap(), 1); + sub_tree.insert(&"home/kitchen/humidity".parse().unwrap(), 2); + sub_tree.insert(&"home/kitchen".parse().unwrap(), 3); + sub_tree.insert(&"home/+/humidity".parse().unwrap(), 4); + sub_tree.insert(&"home/+".parse().unwrap(), 5); + sub_tree.insert(&"home/#".parse().unwrap(), 6); + sub_tree.insert(&"home/+/temperature".parse().unwrap(), 7); + sub_tree.insert(&"office/stairwell/temperature".parse().unwrap(), 8); + sub_tree.insert(&"office/+/+".parse().unwrap(), 9); + sub_tree.insert(&"office/+/+/some_desk/+/fan_speed/+/temperature".parse().unwrap(), 10); + sub_tree.insert(&"office/+/+/some_desk/+/#".parse().unwrap(), 11); + sub_tree.insert(&"sport/tennis/+".parse().unwrap(), 21); + sub_tree.insert(&"#".parse().unwrap(), 12); println!("{:#?}", sub_tree); @@ -319,28 +319,37 @@ mod tests { #[test] fn test_remove() { let mut sub_tree = SubscriptionTree::new(); - let sub_1 = sub_tree.insert("home/kitchen/temperature".parse().unwrap(), "sub_1"); - let sub_2 = sub_tree.insert("home/kitchen/temperature".parse().unwrap(), "sub_2"); - let sub_3 = sub_tree.insert("home/kitchen/humidity".parse().unwrap(), "sub_3"); - let sub_4 = sub_tree.insert("home/kitchen/#".parse().unwrap(), "sub_4"); - let sub_5 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_5"); - let sub_6 = sub_tree.insert("home/kitchen/+".parse().unwrap(), "sub_6"); - let sub_7 = sub_tree.insert("#".parse().unwrap(), "sub_7"); + let sub_1 = sub_tree.insert(&"home/kitchen/temperature".parse().unwrap(), "sub_1"); + let sub_2 = sub_tree.insert(&"home/kitchen/temperature".parse().unwrap(), "sub_2"); + let sub_3 = sub_tree.insert(&"home/kitchen/humidity".parse().unwrap(), "sub_3"); + let sub_4 = sub_tree.insert(&"home/kitchen/#".parse().unwrap(), "sub_4"); + let sub_5 = sub_tree.insert(&"home/kitchen/+".parse().unwrap(), "sub_5"); + let sub_6 = sub_tree.insert(&"home/kitchen/+".parse().unwrap(), "sub_6"); + let sub_7 = sub_tree.insert(&"#".parse().unwrap(), "sub_7"); assert!(!sub_tree.is_empty()); - assert!(sub_tree.remove("#".to_string(), sub_1).is_none()); + assert!(sub_tree.remove(&"#".parse().unwrap(), sub_1).is_none()); - assert_eq!(sub_tree.remove("home/kitchen/temperature".to_string(), sub_1).unwrap(), "sub_1"); - assert_eq!(sub_tree.remove("home/kitchen/temperature".to_string(), sub_2).unwrap(), "sub_2"); - assert_eq!(sub_tree.remove("home/kitchen/#".to_string(), sub_4).unwrap(), "sub_4"); - assert_eq!(sub_tree.remove("home/kitchen/+".to_string(), sub_5).unwrap(), "sub_5"); - assert_eq!(sub_tree.remove("home/kitchen/humidity".to_string(), sub_3).unwrap(), "sub_3"); - assert_eq!(sub_tree.remove("#".to_string(), sub_7).unwrap(), "sub_7"); - assert_eq!(sub_tree.remove("home/kitchen/+".to_string(), sub_6).unwrap(), "sub_6"); + assert_eq!( + sub_tree.remove(&"home/kitchen/temperature".parse().unwrap(), sub_1).unwrap(), + "sub_1" + ); + assert_eq!( + sub_tree.remove(&"home/kitchen/temperature".parse().unwrap(), sub_2).unwrap(), + "sub_2" + ); + assert_eq!(sub_tree.remove(&"home/kitchen/#".parse().unwrap(), sub_4).unwrap(), "sub_4"); + assert_eq!(sub_tree.remove(&"home/kitchen/+".parse().unwrap(), sub_5).unwrap(), "sub_5"); + assert_eq!( + sub_tree.remove(&"home/kitchen/humidity".parse().unwrap(), sub_3).unwrap(), + "sub_3" + ); + assert_eq!(sub_tree.remove(&"#".parse().unwrap(), sub_7).unwrap(), "sub_7"); + assert_eq!(sub_tree.remove(&"home/kitchen/+".parse().unwrap(), sub_6).unwrap(), "sub_6"); assert!(sub_tree.is_empty()); - assert!(sub_tree.remove("home/kitchen/+".to_string(), sub_6).is_none()); + assert!(sub_tree.remove(&"home/kitchen/+".parse().unwrap(), sub_6).is_none()); } } From 1b4b67e3f1093d43a897af621b266d9be31a2457 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Fri, 24 Jan 2020 22:31:24 +0900 Subject: [PATCH 12/14] Cargo clippy --- src/decoder.rs | 8 ++++---- src/encoder.rs | 30 +++++++++++++++--------------- src/main.rs | 2 +- src/topic/mod.rs | 6 +++--- src/topic/tree.rs | 17 ++++++++--------- 5 files changed, 31 insertions(+), 32 deletions(-) diff --git a/src/decoder.rs b/src/decoder.rs index 45d2d40..dc0ad1d 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -743,7 +743,7 @@ fn decode_publish_complete( fn decode_subscribe( bytes: &mut Cursor<&mut BytesMut>, remaining_packet_length: u32, - protocol_version: &ProtocolVersion, + protocol_version: ProtocolVersion, ) -> Result, DecodeError> { let start_cursor_pos = bytes.position(); @@ -752,7 +752,7 @@ fn decode_subscribe( let mut subscription_identifier = None; let mut user_properties = vec![]; - if *protocol_version == ProtocolVersion::V500 { + if protocol_version == ProtocolVersion::V500 { return_if_none!(decode_properties(bytes, |property| { match property { Property::SubscriptionIdentifier(p) => subscription_identifier = Some(p), @@ -1016,7 +1016,7 @@ fn decode_authenticate( } fn decode_packet( - protocol_version: &ProtocolVersion, + protocol_version: ProtocolVersion, packet_type: &PacketType, bytes: &mut Cursor<&mut BytesMut>, remaining_packet_length: u32, @@ -1043,7 +1043,7 @@ fn decode_packet( pub fn decode_mqtt( bytes: &mut BytesMut, - protocol_version: &ProtocolVersion, + protocol_version: ProtocolVersion, ) -> Result, DecodeError> { let mut bytes = Cursor::new(bytes); let first_byte = read_u8!(bytes); diff --git a/src/encoder.rs b/src/encoder.rs index 739db37..5215780 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -530,7 +530,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -562,7 +562,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -591,7 +591,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -608,7 +608,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -625,7 +625,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -642,7 +642,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -659,7 +659,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -683,7 +683,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -701,7 +701,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -718,7 +718,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -736,7 +736,7 @@ mod tests { let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -746,7 +746,7 @@ mod tests { let packet = Packet::PingRequest; let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -756,7 +756,7 @@ mod tests { let packet = Packet::PingResponse; let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -773,7 +773,7 @@ mod tests { }); let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } @@ -790,7 +790,7 @@ mod tests { }); let mut bytes = BytesMut::new(); encode_mqtt(&packet, &mut bytes); - let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap(); + let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap(); assert_eq!(packet, decoded); } diff --git a/src/main.rs b/src/main.rs index 38c1779..23518ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,7 +29,7 @@ impl MqttCodec { pub fn decode(&mut self, buf: &mut BytesMut) -> Result, DecodeError> { // TODO - Ideally we should keep a state machine to store the data we've read so far. - let packet = decoder::decode_mqtt(buf, &self.version); + let packet = decoder::decode_mqtt(buf, self.version); if let Ok(Some(Packet::Connect(packet))) = &packet { self.version = packet.protocol_version; diff --git a/src/topic/mod.rs b/src/topic/mod.rs index dd1858d..b342f0d 100644 --- a/src/topic/mod.rs +++ b/src/topic/mod.rs @@ -1,12 +1,12 @@ const TOPIC_SEPARATOR: char = '/'; const MULTI_LEVEL_WILDCARD: char = '#'; -const MULTI_LEVEL_WILDCARD_STR: &'static str = "#"; +const MULTI_LEVEL_WILDCARD_STR: &str = "#"; const SINGLE_LEVEL_WILDCARD: char = '+'; -const SINGLE_LEVEL_WILDCARD_STR: &'static str = "+"; +const SINGLE_LEVEL_WILDCARD_STR: &str = "+"; -const SHARED_SUBSCRIPTION_PREFIX: &'static str = "$share/"; +const SHARED_SUBSCRIPTION_PREFIX: &str = "$share/"; pub const MAX_TOPIC_LEN_BYTES: usize = 65_535; diff --git a/src/topic/tree.rs b/src/topic/tree.rs index 01d8bea..67a71a8 100644 --- a/src/topic/tree.rs +++ b/src/topic/tree.rs @@ -60,10 +60,10 @@ impl SubscriptionTreeNode { } fn is_empty(&self) -> bool { - return self.subscribers.is_empty() + self.subscribers.is_empty() && self.single_level_wildcards.is_none() && self.multi_level_wildcards.is_empty() - && self.concrete_topic_levels.is_empty(); + && self.concrete_topic_levels.is_empty() } fn insert(&mut self, topic_filter: &TopicFilter, value: T, counter: u64) { @@ -163,13 +163,12 @@ impl SubscriptionTreeNode { } else { None } + } else if let Some(pos) = + current_tree.subscribers.iter().position(|(c, _)| *c == counter) + { + Some(current_tree.subscribers.remove(pos)) } else { - if let Some(pos) = current_tree.subscribers.iter().position(|(c, _)| *c == counter) - { - Some(current_tree.subscribers.remove(pos)) - } else { - None - } + None } }; @@ -190,7 +189,7 @@ impl SubscriptionTreeNode { }, TopicLevel::Concrete(concrete_topic_level) => { if let Entry::Occupied(o) = - tree.concrete_topic_levels.entry(concrete_topic_level.to_string()) + tree.concrete_topic_levels.entry((*concrete_topic_level).to_string()) { if o.get().is_empty() { o.remove_entry(); From 61ed4d7caeb90a4f2331e853afe856cecc074d31 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Fri, 24 Jan 2020 23:11:08 +0900 Subject: [PATCH 13/14] Add parsing code for the Topic type --- src/topic/filter.rs | 88 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/src/topic/filter.rs b/src/topic/filter.rs index a336081..7d4f49e 100644 --- a/src/topic/filter.rs +++ b/src/topic/filter.rs @@ -16,7 +16,7 @@ pub enum TopicFilter { /// A topic name publishers use when sending MQTT messages. /// Cannot contain wildcards. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct Topic { topic_name: String, level_count: u32, @@ -37,6 +37,7 @@ pub enum TopicParseError { InvalidWildcardLevel, InvalidSharedGroupName, EmptySharedGroupName, + WildcardOrNullInTopic, } /// If Ok, returns (level_count, contains_wildcards). @@ -70,6 +71,11 @@ impl FromStr for TopicFilter { return Err(TopicParseError::EmptyTopic); } + // TODO - assert no null character U+0000 + if filter.contains('\0') { + return Err(TopicParseError::WildcardOrNullInTopic); + } + // Filters cannot exceed the byte length in the MQTT spec if filter.len() > MAX_TOPIC_LEN_BYTES { return Err(TopicParseError::TopicTooLong); @@ -148,6 +154,37 @@ impl FromStr for TopicFilter { } } +impl FromStr for Topic { + type Err = TopicParseError; + + fn from_str(topic: &str) -> Result { + // TODO - Consider disallowing leading $ characters + + // Topics cannot be empty + if topic.is_empty() { + return Err(TopicParseError::EmptyTopic); + } + + // Topics cannot exceed the byte length in the MQTT spec + if topic.len() > MAX_TOPIC_LEN_BYTES { + return Err(TopicParseError::TopicTooLong); + } + + // Topics cannot contain wildcards or null characters + if topic.contains(|x: char| { + x == SINGLE_LEVEL_WILDCARD || x == MULTI_LEVEL_WILDCARD || x == '\0' + }) { + return Err(TopicParseError::WildcardOrNullInTopic); + } + + let level_count = topic.split(TOPIC_SEPARATOR).count() as u32; + + let topic = Topic { topic_name: topic.to_string(), level_count }; + + Ok(topic) + } +} + pub struct TopicLevels<'a> { levels_iter: std::str::Split<'a, char>, } @@ -182,7 +219,7 @@ impl<'a> Iterator for TopicLevels<'a> { #[cfg(test)] mod tests { - use crate::topic::{TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES}; + use crate::topic::{Topic, TopicFilter, TopicLevel, TopicParseError, MAX_TOPIC_LEN_BYTES}; #[test] fn test_topic_filter_parse_empty_topic() { @@ -429,6 +466,53 @@ mod tests { ); } + #[test] + fn test_topic_name_success() { + assert_eq!( + "/".parse::().unwrap(), + Topic { topic_name: "/".to_string(), level_count: 2 } + ); + + assert_eq!( + "Accounts payable".parse::().unwrap(), + Topic { topic_name: "Accounts payable".to_string(), level_count: 1 } + ); + + assert_eq!( + "home/kitchen".parse::().unwrap(), + Topic { topic_name: "home/kitchen".to_string(), level_count: 2 } + ); + + assert_eq!( + "home/kitchen/temperature".parse::().unwrap(), + Topic { topic_name: "home/kitchen/temperature".to_string(), level_count: 3 } + ); + } + + #[test] + fn test_topic_name_failure() { + assert_eq!("#".parse::().unwrap_err(), TopicParseError::WildcardOrNullInTopic,); + + assert_eq!("+".parse::().unwrap_err(), TopicParseError::WildcardOrNullInTopic,); + + assert_eq!("\0".parse::().unwrap_err(), TopicParseError::WildcardOrNullInTopic,); + + assert_eq!( + "/multi/level/#".parse::().unwrap_err(), + TopicParseError::WildcardOrNullInTopic, + ); + + assert_eq!( + "/single/level/+".parse::().unwrap_err(), + TopicParseError::WildcardOrNullInTopic, + ); + + assert_eq!( + "/null/byte/\0".parse::().unwrap_err(), + TopicParseError::WildcardOrNullInTopic, + ); + } + #[test] fn test_topic_filter_level_iterator_simple() { let filter: TopicFilter = "/".parse().unwrap(); From 3bc9acc2183dc34fca1fb636bc7da734cf00d968 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Fri, 24 Jan 2020 23:25:31 +0900 Subject: [PATCH 14/14] Use Topic instead of String for matching_subscribers function --- src/topic/filter.rs | 6 ++++++ src/topic/tree.rs | 49 ++++++++++++++++++++++----------------------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/topic/filter.rs b/src/topic/filter.rs index 7d4f49e..185769c 100644 --- a/src/topic/filter.rs +++ b/src/topic/filter.rs @@ -204,6 +204,12 @@ impl<'a> TopicFilter { } } +impl<'a> Topic { + pub fn levels(&'a self) -> TopicLevels<'a> { + TopicLevels { levels_iter: self.topic_name.split(TOPIC_SEPARATOR) } + } +} + impl<'a> Iterator for TopicLevels<'a> { type Item = TopicLevel<'a>; diff --git a/src/topic/tree.rs b/src/topic/tree.rs index 67a71a8..0187425 100644 --- a/src/topic/tree.rs +++ b/src/topic/tree.rs @@ -1,7 +1,4 @@ -use crate::topic::{ - filter::{TopicFilter, TopicLevel}, - TOPIC_SEPARATOR, -}; +use crate::topic::{Topic, TopicFilter, TopicLevel}; use std::collections::{hash_map::Entry, HashMap}; // TODO(bschwind) - Support shared subscriptions @@ -33,8 +30,8 @@ impl SubscriptionTree { counter } - pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, sub_fn: F) { - self.root.matching_subscribers(topic_name, sub_fn) + pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic: &Topic, sub_fn: F) { + self.root.matching_subscribers(topic, sub_fn) } pub fn remove(&mut self, topic_filter: &TopicFilter, counter: u64) -> Option { @@ -202,15 +199,15 @@ impl SubscriptionTreeNode { return_val.map(|(_, val)| val) } - fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) { + fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic: &Topic, mut sub_fn: F) { let mut tree_stack = vec![]; - let levels: Vec<&str> = topic_name.split(TOPIC_SEPARATOR).collect(); + let levels: Vec = topic.levels().collect(); tree_stack.push((self, 0)); while !tree_stack.is_empty() { let (current_tree, current_level) = tree_stack.pop().unwrap(); - let level = levels[current_level]; + let level = &levels[current_level]; for (_, subscriber) in ¤t_tree.multi_level_wildcards { sub_fn(subscriber); @@ -226,15 +223,17 @@ impl SubscriptionTreeNode { } } - if current_tree.concrete_topic_levels.contains_key(level) { - let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); + if let TopicLevel::Concrete(level) = level { + if current_tree.concrete_topic_levels.contains_key(*level) { + let sub_tree = current_tree.concrete_topic_levels.get(*level).unwrap(); - if current_level + 1 < levels.len() { - let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap(); - tree_stack.push((sub_tree, current_level + 1)); - } else { - for (_, subscriber) in &sub_tree.subscribers { - sub_fn(subscriber); + if current_level + 1 < levels.len() { + let sub_tree = current_tree.concrete_topic_levels.get(*level).unwrap(); + tree_stack.push((sub_tree, current_level + 1)); + } else { + for (_, subscriber) in &sub_tree.subscribers { + sub_fn(subscriber); + } } } } @@ -265,26 +264,26 @@ mod tests { println!("{:#?}", sub_tree); - sub_tree.matching_subscribers("home/kitchen", |s| { + sub_tree.matching_subscribers(&"home/kitchen".parse().unwrap(), |s| { println!("{}", s); }); println!(); - sub_tree.matching_subscribers("home/kitchen/humidity", |s| { + sub_tree.matching_subscribers(&"home/kitchen/humidity".parse().unwrap(), |s| { println!("{}", s); }); println!(); - sub_tree.matching_subscribers("office/stairwell/temperature", |s| { + sub_tree.matching_subscribers(&"office/stairwell/temperature".parse().unwrap(), |s| { println!("{}", s); }); println!(); sub_tree.matching_subscribers( - "office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature", + &"office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature".parse().unwrap(), |s| { println!("{}", s); }, @@ -292,25 +291,25 @@ mod tests { println!(); - sub_tree.matching_subscribers("home", |s| { + sub_tree.matching_subscribers(&"home".parse().unwrap(), |s| { println!("{}", s); }); println!(); - sub_tree.matching_subscribers("sport/tennis/player1", |s| { + sub_tree.matching_subscribers(&"sport/tennis/player1".parse().unwrap(), |s| { println!("{}", s); }); println!(); - sub_tree.matching_subscribers("sport/tennis/player2", |s| { + sub_tree.matching_subscribers(&"sport/tennis/player2".parse().unwrap(), |s| { println!("{}", s); }); println!(); - sub_tree.matching_subscribers("sport/tennis/player1/ranking", |s| { + sub_tree.matching_subscribers(&"sport/tennis/player1/ranking".parse().unwrap(), |s| { println!("{}", s); }); }