From 1cb99ea8c0eac9774e58bf9d4d3f06eb0d004390 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Wed, 5 Jan 2022 23:49:20 +0900 Subject: [PATCH 1/7] Start work on a retained message tree --- mqtt-v5-broker/src/main.rs | 1 + mqtt-v5-broker/src/retained.rs | 152 +++++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 mqtt-v5-broker/src/retained.rs diff --git a/mqtt-v5-broker/src/main.rs b/mqtt-v5-broker/src/main.rs index bbce2d4..11a2823 100644 --- a/mqtt-v5-broker/src/main.rs +++ b/mqtt-v5-broker/src/main.rs @@ -22,6 +22,7 @@ use tokio_util::codec::Framed; mod broker; mod client; +mod retained; mod tree; async fn client_handler(stream: TcpStream, broker_tx: Sender) { diff --git a/mqtt-v5-broker/src/retained.rs b/mqtt-v5-broker/src/retained.rs new file mode 100644 index 0000000..f37223d --- /dev/null +++ b/mqtt-v5-broker/src/retained.rs @@ -0,0 +1,152 @@ +use mqtt_v5::topic::{Topic, TopicFilter, TopicLevel}; +use std::collections::{hash_map::Entry, HashMap}; + +#[derive(Debug)] +pub struct RetainedMessageTreeNode { + retained_data: Option, + concrete_topic_levels: HashMap>, +} + +#[derive(Debug)] +pub struct RetainedMessageTree { + root: RetainedMessageTreeNode, +} + +impl RetainedMessageTree { + pub fn new() -> Self { + Self { root: RetainedMessageTreeNode::new() } + } + + pub fn insert(&mut self, topic: &Topic, retained_data: T) { + self.root.insert(topic, retained_data); + } + + /// Get the retained messages which match a given topic filter. + pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { + self.root.retained_messages(topic_filter) + } + + pub fn remove(&mut self, topic: &Topic) -> Option { + self.root.remove(topic) + } + + #[allow(dead_code)] + fn is_empty(&self) -> bool { + self.root.is_empty() + } +} + +impl RetainedMessageTreeNode { + fn new() -> Self { + Self { retained_data: None, concrete_topic_levels: HashMap::new() } + } + + fn is_empty(&self) -> bool { + self.retained_data.is_none() && self.concrete_topic_levels.is_empty() + } + + fn insert(&mut self, topic: &Topic, retained_data: T) { + let mut current_tree = self; + + for level in topic.levels() { + match level { + TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => { + unreachable!("Publish topics only contain concrete levels"); + }, + TopicLevel::Concrete(concrete_topic_level) => { + if !current_tree.concrete_topic_levels.contains_key(concrete_topic_level) { + current_tree.concrete_topic_levels.insert( + concrete_topic_level.to_string(), + RetainedMessageTreeNode::new(), + ); + } + + // TODO - Do this without another hash lookup + current_tree = + current_tree.concrete_topic_levels.get_mut(concrete_topic_level).unwrap(); + }, + } + } + + current_tree.retained_data = Some(retained_data); + } + + fn remove(&mut self, topic: &Topic) -> Option { + let mut current_tree = self; + let mut stack: Vec<(*mut RetainedMessageTreeNode, usize)> = vec![]; + + let levels: Vec = topic.levels().collect(); + let mut level_index = 0; + + for level in &levels { + match level { + TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => { + unreachable!("Publish topics only contain concrete levels"); + }, + TopicLevel::Concrete(concrete_topic_level) => { + if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { + stack.push((&mut *current_tree, level_index)); + level_index += 1; + + current_tree = current_tree + .concrete_topic_levels + .get_mut(*concrete_topic_level) + .unwrap(); + } else { + return None; + } + }, + } + } + + let return_val = current_tree.retained_data.take(); + + // Go up the stack, cleaning up empty nodes + while let Some((stack_val, level_index)) = stack.pop() { + let tree = unsafe { &mut *stack_val }; + + let level = &levels[level_index]; + + match level { + TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => { + unreachable!("Publish topics only contain concrete levels"); + }, + TopicLevel::Concrete(concrete_topic_level) => { + if let Entry::Occupied(o) = + tree.concrete_topic_levels.entry((*concrete_topic_level).to_string()) + { + if o.get().is_empty() { + o.remove_entry(); + } + } + }, + } + } + + return_val + } + + pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { + // let mut subscriptions = Vec::new(); + let mut tree_stack = vec![(self, 0)]; + let levels: Vec = topic_filter.levels().collect(); + + vec![].into_iter() + } +} + +#[cfg(test)] +mod tests { + use crate::retained::RetainedMessageTree; + + #[test] + fn test_insert() { + let mut sub_tree = RetainedMessageTree::new(); + sub_tree.insert(&"home/kitchen/temperature".parse().unwrap(), 1); + sub_tree.insert(&"home/kitchen".parse().unwrap(), 7); + + assert_eq!(sub_tree.remove(&"home/kitchen/temperature".parse().unwrap()), Some(1)); + assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), Some(7)); + assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), None); + } +} From 7e4a59a544bfea0eb4727565e1054740a24c1abe Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Thu, 6 Jan 2022 00:33:18 +0900 Subject: [PATCH 2/7] More progress on the retained message tree --- mqtt-v5-broker/src/retained.rs | 76 +++++++++++++++++++++++++++++++++- mqtt-v5-broker/src/tree.rs | 1 - 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/mqtt-v5-broker/src/retained.rs b/mqtt-v5-broker/src/retained.rs index f37223d..68fa052 100644 --- a/mqtt-v5-broker/src/retained.rs +++ b/mqtt-v5-broker/src/retained.rs @@ -127,11 +127,74 @@ impl RetainedMessageTreeNode { } pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { - // let mut subscriptions = Vec::new(); + let mut retained_messages = Vec::new(); let mut tree_stack = vec![(self, 0)]; + let mut multi_level = false; let levels: Vec = topic_filter.levels().collect(); - vec![].into_iter() + while !tree_stack.is_empty() { + let (current_tree, current_level) = tree_stack.pop().unwrap(); + + if multi_level { + // Add all the retained messages and keep going. + for sub_tree in current_tree.concrete_topic_levels.values() { + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + retained_messages.push(retained_data); + } + + tree_stack.push((sub_tree, current_level + 1)); + } + + continue; + } + + let level = &levels[current_level]; + + match level { + TopicLevel::SingleLevelWildcard => { + for sub_tree in current_tree.concrete_topic_levels.values() { + if current_level + 1 < levels.len() { + tree_stack.push((sub_tree, current_level + 1)); + } else { + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + retained_messages.push(retained_data); + } + } + } + }, + TopicLevel::MultiLevelWildcard => { + multi_level = true; + + for sub_tree in current_tree.concrete_topic_levels.values() { + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + retained_messages.push(retained_data); + } + + tree_stack.push((sub_tree, current_level + 1)); + } + }, + TopicLevel::Concrete(concrete_topic_level) => { + if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { + let sub_tree = + current_tree.concrete_topic_levels.get(*concrete_topic_level).unwrap(); + + if current_level + 1 < levels.len() { + let sub_tree = current_tree + .concrete_topic_levels + .get(*concrete_topic_level) + .unwrap(); + tree_stack.push((sub_tree, current_level + 1)); + } else { + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + retained_messages.push(retained_data); + } + } + } + }, + } + } + + retained_messages.into_iter() } } @@ -143,10 +206,19 @@ mod tests { fn test_insert() { let mut sub_tree = RetainedMessageTree::new(); sub_tree.insert(&"home/kitchen/temperature".parse().unwrap(), 1); + sub_tree.insert(&"home/bedroom/temperature".parse().unwrap(), 2); sub_tree.insert(&"home/kitchen".parse().unwrap(), 7); + sub_tree.insert(&"office/cafe".parse().unwrap(), 12); + sub_tree.insert(&"office/cafe/temperature".parse().unwrap(), 27); + + for msg in sub_tree.retained_messages(&"+/+/temperature".parse().unwrap()) { + dbg!(msg); + } + assert_eq!(sub_tree.remove(&"home/kitchen/temperature".parse().unwrap()), Some(1)); assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), Some(7)); assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), None); + dbg!(sub_tree); } } diff --git a/mqtt-v5-broker/src/tree.rs b/mqtt-v5-broker/src/tree.rs index 8698e95..5faa30c 100644 --- a/mqtt-v5-broker/src/tree.rs +++ b/mqtt-v5-broker/src/tree.rs @@ -224,7 +224,6 @@ impl SubscriptionTreeNode { let sub_tree = current_tree.concrete_topic_levels.get(*level).unwrap(); if current_level + 1 < levels.len() { - let sub_tree = current_tree.concrete_topic_levels.get(*level).unwrap(); tree_stack.push((sub_tree, current_level + 1)); } else { subscriptions From 18ef0cea76217a162f6e98eb05462099a1479e82 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Thu, 6 Jan 2022 22:46:49 +0900 Subject: [PATCH 3/7] Recursive version of retained message matching seems to work --- mqtt-v5-broker/src/retained.rs | 261 ++++++++++++++++++++++++++------- 1 file changed, 210 insertions(+), 51 deletions(-) diff --git a/mqtt-v5-broker/src/retained.rs b/mqtt-v5-broker/src/retained.rs index 68fa052..b441249 100644 --- a/mqtt-v5-broker/src/retained.rs +++ b/mqtt-v5-broker/src/retained.rs @@ -4,6 +4,7 @@ use std::collections::{hash_map::Entry, HashMap}; #[derive(Debug)] pub struct RetainedMessageTreeNode { retained_data: Option, + // TODO(bschwind) - use TopicLevel instead of String concrete_topic_levels: HashMap>, } @@ -21,9 +22,14 @@ impl RetainedMessageTree { self.root.insert(topic, retained_data); } + // /// Get the retained messages which match a given topic filter. + // pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { + // self.root.retained_messages(topic_filter) + // } + /// Get the retained messages which match a given topic filter. pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { - self.root.retained_messages(topic_filter) + self.root.retained_messages_recursive(topic_filter) } pub fn remove(&mut self, topic: &Topic) -> Option { @@ -126,76 +132,197 @@ impl RetainedMessageTreeNode { return_val } - pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { - let mut retained_messages = Vec::new(); - let mut tree_stack = vec![(self, 0)]; - let mut multi_level = false; + pub fn retained_messages_recursive( + &self, + topic_filter: &TopicFilter, + ) -> impl Iterator { + let mut retained_messages = vec![]; + let mut path = vec![]; let levels: Vec = topic_filter.levels().collect(); - while !tree_stack.is_empty() { - let (current_tree, current_level) = tree_stack.pop().unwrap(); + Self::retained_messages_inner(self, &mut path, &levels, 0, false, &mut retained_messages); - if multi_level { - // Add all the retained messages and keep going. - for sub_tree in current_tree.concrete_topic_levels.values() { - if let Some(retained_data) = sub_tree.retained_data.as_ref() { - retained_messages.push(retained_data); - } + retained_messages.into_iter() + } - tree_stack.push((sub_tree, current_level + 1)); + fn retained_messages_inner<'a>( + current_tree: &'a Self, + path: &mut Vec, + levels: &[TopicLevel], + current_level: usize, + multi_level: bool, + retained_messages: &mut Vec<&'a T>, + ) { + if multi_level { + // Add all the retained messages and keep going. + for (level, sub_tree) in ¤t_tree.concrete_topic_levels { + path.push(level.to_string()); + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + println!("Adding {:?} at path: {:?}", retained_data, path); + retained_messages.push(retained_data); } - continue; - } + Self::retained_messages_inner( + sub_tree, + path, + levels, + current_level + 1, + multi_level, + retained_messages, + ); - let level = &levels[current_level]; + path.pop(); + } - match level { - TopicLevel::SingleLevelWildcard => { - for sub_tree in current_tree.concrete_topic_levels.values() { - if current_level + 1 < levels.len() { - tree_stack.push((sub_tree, current_level + 1)); - } else { - if let Some(retained_data) = sub_tree.retained_data.as_ref() { - retained_messages.push(retained_data); - } - } - } - }, - TopicLevel::MultiLevelWildcard => { - multi_level = true; + return; + } - for sub_tree in current_tree.concrete_topic_levels.values() { + let level = &levels[current_level]; + + match level { + TopicLevel::SingleLevelWildcard => { + for (level, sub_tree) in ¤t_tree.concrete_topic_levels { + path.push(level.to_string()); + + if current_level + 1 < levels.len() { + Self::retained_messages_inner( + sub_tree, + path, + levels, + current_level + 1, + multi_level, + retained_messages, + ); + } else { if let Some(retained_data) = sub_tree.retained_data.as_ref() { + println!("Adding {:?} at path: {:?}", retained_data, path); retained_messages.push(retained_data); } + } + path.pop(); + } + }, + TopicLevel::MultiLevelWildcard => { + for (level, sub_tree) in ¤t_tree.concrete_topic_levels { + path.push(level.to_string()); - tree_stack.push((sub_tree, current_level + 1)); + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + println!("Adding {:?} at path: {:?}", retained_data, path); + retained_messages.push(retained_data); } - }, - TopicLevel::Concrete(concrete_topic_level) => { - if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { + + Self::retained_messages_inner( + sub_tree, + path, + levels, + current_level + 1, + true, + retained_messages, + ); + path.pop(); + } + }, + TopicLevel::Concrete(concrete_topic_level) => { + if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { + let sub_tree = + current_tree.concrete_topic_levels.get(*concrete_topic_level).unwrap(); + + path.push(concrete_topic_level.to_string()); + + if current_level + 1 < levels.len() { let sub_tree = current_tree.concrete_topic_levels.get(*concrete_topic_level).unwrap(); - - if current_level + 1 < levels.len() { - let sub_tree = current_tree - .concrete_topic_levels - .get(*concrete_topic_level) - .unwrap(); - tree_stack.push((sub_tree, current_level + 1)); - } else { - if let Some(retained_data) = sub_tree.retained_data.as_ref() { - retained_messages.push(retained_data); - } + Self::retained_messages_inner( + sub_tree, + path, + levels, + current_level + 1, + false, + retained_messages, + ); + } else { + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + println!("Adding {:?} at path: {:?}", retained_data, path); + retained_messages.push(retained_data); } } - }, - } - } - retained_messages.into_iter() + path.pop(); + } + }, + } } + + // pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { + // let mut retained_messages = Vec::new(); + // let mut tree_stack = vec![(self, 0)]; + // let mut multi_level = false; + // let levels: Vec = topic_filter.levels().collect(); + + // while !tree_stack.is_empty() { + // let (current_tree, current_level) = tree_stack.pop().unwrap(); + + // if multi_level { + // // Add all the retained messages and keep going. + // for sub_tree in current_tree.concrete_topic_levels.values() { + // if let Some(retained_data) = sub_tree.retained_data.as_ref() { + // retained_messages.push(retained_data); + // } + + // tree_stack.push((sub_tree, current_level + 1)); + // } + + // continue; + // } + + // let level = &levels[current_level]; + + // match level { + // TopicLevel::SingleLevelWildcard => { + // for sub_tree in current_tree.concrete_topic_levels.values() { + // if current_level + 1 < levels.len() { + // tree_stack.push((sub_tree, current_level + 1)); + // } else { + // if let Some(retained_data) = sub_tree.retained_data.as_ref() { + // retained_messages.push(retained_data); + // } + // } + // } + // }, + // TopicLevel::MultiLevelWildcard => { + // multi_level = true; + + // for sub_tree in current_tree.concrete_topic_levels.values() { + // if let Some(retained_data) = sub_tree.retained_data.as_ref() { + // retained_messages.push(retained_data); + // } + + // tree_stack.push((sub_tree, current_level + 1)); + // } + // }, + // TopicLevel::Concrete(concrete_topic_level) => { + // if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { + // let sub_tree = + // current_tree.concrete_topic_levels.get(*concrete_topic_level).unwrap(); + + // if current_level + 1 < levels.len() { + // let sub_tree = current_tree + // .concrete_topic_levels + // .get(*concrete_topic_level) + // .unwrap(); + // tree_stack.push((sub_tree, current_level + 1)); + // } else { + // if let Some(retained_data) = sub_tree.retained_data.as_ref() { + // retained_messages.push(retained_data); + // } + // } + // } + // }, + // } + // } + + // retained_messages.into_iter() + // } } #[cfg(test)] @@ -221,4 +348,36 @@ mod tests { assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), None); dbg!(sub_tree); } + + #[test] + fn test_wildcards() { + let mut sub_tree = RetainedMessageTree::new(); + sub_tree.insert(&"home/bedroom/humidity/val".parse().unwrap(), 1); + sub_tree.insert(&"home/bedroom/temperature/val".parse().unwrap(), 2); + sub_tree.insert(&"home/kitchen/temperature/val".parse().unwrap(), 3); + sub_tree.insert(&"home/kitchen/humidity/val".parse().unwrap(), 4); + + let filter = "home/+/+/val"; + for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { + dbg!(msg); + } + + let filter = "home/bedroom/#"; + println!("{}", filter); + for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { + dbg!(msg); + } + + let filter = "#"; + println!("{}", filter); + for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { + dbg!(msg); + } + + let filter = "+"; + println!("{}", filter); + for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { + dbg!(msg); + } + } } From 6485c043f51472578e60d3d091f2bce45eaec8b2 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Thu, 6 Jan 2022 22:58:44 +0900 Subject: [PATCH 4/7] Remove multi_level flag and simplify the multi-level-wildcard case --- mqtt-v5-broker/src/retained.rs | 70 +++++++++++++++++----------------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/mqtt-v5-broker/src/retained.rs b/mqtt-v5-broker/src/retained.rs index b441249..63c59d7 100644 --- a/mqtt-v5-broker/src/retained.rs +++ b/mqtt-v5-broker/src/retained.rs @@ -140,7 +140,7 @@ impl RetainedMessageTreeNode { let mut path = vec![]; let levels: Vec = topic_filter.levels().collect(); - Self::retained_messages_inner(self, &mut path, &levels, 0, false, &mut retained_messages); + Self::retained_messages_inner(self, &mut path, &levels, 0, &mut retained_messages); retained_messages.into_iter() } @@ -150,33 +150,8 @@ impl RetainedMessageTreeNode { path: &mut Vec, levels: &[TopicLevel], current_level: usize, - multi_level: bool, retained_messages: &mut Vec<&'a T>, ) { - if multi_level { - // Add all the retained messages and keep going. - for (level, sub_tree) in ¤t_tree.concrete_topic_levels { - path.push(level.to_string()); - if let Some(retained_data) = sub_tree.retained_data.as_ref() { - println!("Adding {:?} at path: {:?}", retained_data, path); - retained_messages.push(retained_data); - } - - Self::retained_messages_inner( - sub_tree, - path, - levels, - current_level + 1, - multi_level, - retained_messages, - ); - - path.pop(); - } - - return; - } - let level = &levels[current_level]; match level { @@ -190,7 +165,6 @@ impl RetainedMessageTreeNode { path, levels, current_level + 1, - multi_level, retained_messages, ); } else { @@ -211,14 +185,7 @@ impl RetainedMessageTreeNode { retained_messages.push(retained_data); } - Self::retained_messages_inner( - sub_tree, - path, - levels, - current_level + 1, - true, - retained_messages, - ); + Self::retained_messages_multilevel(sub_tree, path, retained_messages); path.pop(); } }, @@ -237,7 +204,6 @@ impl RetainedMessageTreeNode { path, levels, current_level + 1, - false, retained_messages, ); } else { @@ -253,6 +219,25 @@ impl RetainedMessageTreeNode { } } + fn retained_messages_multilevel<'a>( + current_tree: &'a Self, + path: &mut Vec, + retained_messages: &mut Vec<&'a T>, + ) { + // Add all the retained messages and keep going. + for (level, sub_tree) in ¤t_tree.concrete_topic_levels { + path.push(level.to_string()); + if let Some(retained_data) = sub_tree.retained_data.as_ref() { + println!("Adding {:?} at path: {:?}", retained_data, path); + retained_messages.push(retained_data); + } + + Self::retained_messages_multilevel(sub_tree, path, retained_messages); + + path.pop(); + } + } + // pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { // let mut retained_messages = Vec::new(); // let mut tree_stack = vec![(self, 0)]; @@ -356,8 +341,15 @@ mod tests { sub_tree.insert(&"home/bedroom/temperature/val".parse().unwrap(), 2); sub_tree.insert(&"home/kitchen/temperature/val".parse().unwrap(), 3); sub_tree.insert(&"home/kitchen/humidity/val".parse().unwrap(), 4); + sub_tree.insert(&"home/kitchen/humidity/val/celsius".parse().unwrap(), 42); + + sub_tree.insert(&"office/cafe/humidity/val".parse().unwrap(), 5); + sub_tree.insert(&"office/cafe/temperature/val".parse().unwrap(), 6); + sub_tree.insert(&"office/meeting_room_1/temperature/val".parse().unwrap(), 7); + sub_tree.insert(&"office/meeting_room_1/humidity/val".parse().unwrap(), 8); let filter = "home/+/+/val"; + println!("{}", filter); for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { dbg!(msg); } @@ -379,5 +371,11 @@ mod tests { for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { dbg!(msg); } + + let filter = "+/+/#"; + println!("{}", filter); + for msg in sub_tree.retained_messages(&filter.parse().unwrap()) { + dbg!(msg); + } } } From 66b77cb26f4c8cdca78adbb1a35c8552b2b1a6fb Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Sat, 8 Jan 2022 18:45:50 +0900 Subject: [PATCH 5/7] Cleanup of the retained message tree --- mqtt-v5-broker/src/retained.rs | 103 +++++---------------------------- mqtt-v5/src/topic.rs | 6 ++ 2 files changed, 21 insertions(+), 88 deletions(-) diff --git a/mqtt-v5-broker/src/retained.rs b/mqtt-v5-broker/src/retained.rs index 63c59d7..790c3f9 100644 --- a/mqtt-v5-broker/src/retained.rs +++ b/mqtt-v5-broker/src/retained.rs @@ -22,13 +22,11 @@ impl RetainedMessageTree { self.root.insert(topic, retained_data); } - // /// Get the retained messages which match a given topic filter. - // pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { - // self.root.retained_messages(topic_filter) - // } - /// Get the retained messages which match a given topic filter. - pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { + pub fn retained_messages( + &self, + topic_filter: &TopicFilter, + ) -> impl Iterator { self.root.retained_messages_recursive(topic_filter) } @@ -135,7 +133,7 @@ impl RetainedMessageTreeNode { pub fn retained_messages_recursive( &self, topic_filter: &TopicFilter, - ) -> impl Iterator { + ) -> impl Iterator { let mut retained_messages = vec![]; let mut path = vec![]; let levels: Vec = topic_filter.levels().collect(); @@ -150,7 +148,7 @@ impl RetainedMessageTreeNode { path: &mut Vec, levels: &[TopicLevel], current_level: usize, - retained_messages: &mut Vec<&'a T>, + retained_messages: &mut Vec<(Topic, &'a T)>, ) { let level = &levels[current_level]; @@ -169,8 +167,8 @@ impl RetainedMessageTreeNode { ); } else { if let Some(retained_data) = sub_tree.retained_data.as_ref() { - println!("Adding {:?} at path: {:?}", retained_data, path); - retained_messages.push(retained_data); + let topic = Topic::from_concrete_levels(&path); + retained_messages.push((topic, retained_data)); } } path.pop(); @@ -181,8 +179,8 @@ impl RetainedMessageTreeNode { path.push(level.to_string()); if let Some(retained_data) = sub_tree.retained_data.as_ref() { - println!("Adding {:?} at path: {:?}", retained_data, path); - retained_messages.push(retained_data); + let topic = Topic::from_concrete_levels(&path); + retained_messages.push((topic, retained_data)); } Self::retained_messages_multilevel(sub_tree, path, retained_messages); @@ -208,8 +206,8 @@ impl RetainedMessageTreeNode { ); } else { if let Some(retained_data) = sub_tree.retained_data.as_ref() { - println!("Adding {:?} at path: {:?}", retained_data, path); - retained_messages.push(retained_data); + let topic = Topic::from_concrete_levels(&path); + retained_messages.push((topic, retained_data)); } } @@ -222,14 +220,14 @@ impl RetainedMessageTreeNode { fn retained_messages_multilevel<'a>( current_tree: &'a Self, path: &mut Vec, - retained_messages: &mut Vec<&'a T>, + retained_messages: &mut Vec<(Topic, &'a T)>, ) { // Add all the retained messages and keep going. for (level, sub_tree) in ¤t_tree.concrete_topic_levels { path.push(level.to_string()); if let Some(retained_data) = sub_tree.retained_data.as_ref() { - println!("Adding {:?} at path: {:?}", retained_data, path); - retained_messages.push(retained_data); + let topic = Topic::from_concrete_levels(&path); + retained_messages.push((topic, retained_data)); } Self::retained_messages_multilevel(sub_tree, path, retained_messages); @@ -237,77 +235,6 @@ impl RetainedMessageTreeNode { path.pop(); } } - - // pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator { - // let mut retained_messages = Vec::new(); - // let mut tree_stack = vec![(self, 0)]; - // let mut multi_level = false; - // let levels: Vec = topic_filter.levels().collect(); - - // while !tree_stack.is_empty() { - // let (current_tree, current_level) = tree_stack.pop().unwrap(); - - // if multi_level { - // // Add all the retained messages and keep going. - // for sub_tree in current_tree.concrete_topic_levels.values() { - // if let Some(retained_data) = sub_tree.retained_data.as_ref() { - // retained_messages.push(retained_data); - // } - - // tree_stack.push((sub_tree, current_level + 1)); - // } - - // continue; - // } - - // let level = &levels[current_level]; - - // match level { - // TopicLevel::SingleLevelWildcard => { - // for sub_tree in current_tree.concrete_topic_levels.values() { - // if current_level + 1 < levels.len() { - // tree_stack.push((sub_tree, current_level + 1)); - // } else { - // if let Some(retained_data) = sub_tree.retained_data.as_ref() { - // retained_messages.push(retained_data); - // } - // } - // } - // }, - // TopicLevel::MultiLevelWildcard => { - // multi_level = true; - - // for sub_tree in current_tree.concrete_topic_levels.values() { - // if let Some(retained_data) = sub_tree.retained_data.as_ref() { - // retained_messages.push(retained_data); - // } - - // tree_stack.push((sub_tree, current_level + 1)); - // } - // }, - // TopicLevel::Concrete(concrete_topic_level) => { - // if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) { - // let sub_tree = - // current_tree.concrete_topic_levels.get(*concrete_topic_level).unwrap(); - - // if current_level + 1 < levels.len() { - // let sub_tree = current_tree - // .concrete_topic_levels - // .get(*concrete_topic_level) - // .unwrap(); - // tree_stack.push((sub_tree, current_level + 1)); - // } else { - // if let Some(retained_data) = sub_tree.retained_data.as_ref() { - // retained_messages.push(retained_data); - // } - // } - // } - // }, - // } - // } - - // retained_messages.into_iter() - // } } #[cfg(test)] diff --git a/mqtt-v5/src/topic.rs b/mqtt-v5/src/topic.rs index afa2804..be352c3 100644 --- a/mqtt-v5/src/topic.rs +++ b/mqtt-v5/src/topic.rs @@ -27,6 +27,12 @@ impl Topic { pub fn topic_name(&self) -> &str { &self.topic_name } + + pub fn from_concrete_levels(levels: &[String]) -> Self { + let topic_name = levels.join(&TOPIC_SEPARATOR.to_string()); + + Self { topic_name, level_count: levels.len() as u32 } + } } #[derive(Debug, PartialEq)] From 1df84d06dc8991d429df59ee4ce13c7fdd38b8c3 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Sat, 8 Jan 2022 18:47:33 +0900 Subject: [PATCH 6/7] Fix clippy lints (except for unused lints) --- mqtt-v5-broker/src/retained.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/mqtt-v5-broker/src/retained.rs b/mqtt-v5-broker/src/retained.rs index 790c3f9..056cce8 100644 --- a/mqtt-v5-broker/src/retained.rs +++ b/mqtt-v5-broker/src/retained.rs @@ -165,11 +165,9 @@ impl RetainedMessageTreeNode { current_level + 1, retained_messages, ); - } else { - if let Some(retained_data) = sub_tree.retained_data.as_ref() { - let topic = Topic::from_concrete_levels(&path); - retained_messages.push((topic, retained_data)); - } + } else if let Some(retained_data) = sub_tree.retained_data.as_ref() { + let topic = Topic::from_concrete_levels(path); + retained_messages.push((topic, retained_data)); } path.pop(); } @@ -179,7 +177,7 @@ impl RetainedMessageTreeNode { path.push(level.to_string()); if let Some(retained_data) = sub_tree.retained_data.as_ref() { - let topic = Topic::from_concrete_levels(&path); + let topic = Topic::from_concrete_levels(path); retained_messages.push((topic, retained_data)); } @@ -204,11 +202,9 @@ impl RetainedMessageTreeNode { current_level + 1, retained_messages, ); - } else { - if let Some(retained_data) = sub_tree.retained_data.as_ref() { - let topic = Topic::from_concrete_levels(&path); - retained_messages.push((topic, retained_data)); - } + } else if let Some(retained_data) = sub_tree.retained_data.as_ref() { + let topic = Topic::from_concrete_levels(path); + retained_messages.push((topic, retained_data)); } path.pop(); @@ -226,7 +222,7 @@ impl RetainedMessageTreeNode { for (level, sub_tree) in ¤t_tree.concrete_topic_levels { path.push(level.to_string()); if let Some(retained_data) = sub_tree.retained_data.as_ref() { - let topic = Topic::from_concrete_levels(&path); + let topic = Topic::from_concrete_levels(path); retained_messages.push((topic, retained_data)); } From 0c43b83fee767c9b4048c0bc8712a0f6c9900129 Mon Sep 17 00:00:00 2001 From: Brian Schwind Date: Sat, 5 Feb 2022 18:37:41 +0900 Subject: [PATCH 7/7] Add a very simplistic retained message feature --- mqtt-v5-broker/src/broker.rs | 54 ++++++++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/mqtt-v5-broker/src/broker.rs b/mqtt-v5-broker/src/broker.rs index bbc5131..d561ede 100644 --- a/mqtt-v5-broker/src/broker.rs +++ b/mqtt-v5-broker/src/broker.rs @@ -1,4 +1,5 @@ -use crate::{client::ClientMessage, tree::SubscriptionTree}; +use crate::{client::ClientMessage, retained::RetainedMessageTree, tree::SubscriptionTree}; +use bytes::Bytes; use mqtt_v5::{ topic::TopicFilter, types::{ @@ -193,13 +194,20 @@ pub struct Broker { sender: Sender, receiver: Receiver, subscriptions: SubscriptionTree, + retained_messages: RetainedMessageTree, } impl Broker { pub fn new() -> Self { let (sender, receiver) = mpsc::channel(100); - Self { sessions: HashMap::new(), sender, receiver, subscriptions: SubscriptionTree::new() } + Self { + sessions: HashMap::new(), + sender, + receiver, + subscriptions: SubscriptionTree::new(), + retained_messages: RetainedMessageTree::new(), + } } pub fn sender(&self) -> Sender { @@ -346,6 +354,7 @@ impl Broker { async fn handle_subscribe(&mut self, client_id: String, packet: SubscribePacket) { let subscriptions = &mut self.subscriptions; + let retained_messages = &self.retained_messages; if let Some(session) = self.sessions.get_mut(&client_id) { // If a Server receives a SUBSCRIBE packet containing a Topic Filter that @@ -368,7 +377,7 @@ impl Broker { // and return the QoS that was granted. let granted_qos_values = packet .subscription_topics - .into_iter() + .iter() .map(|topic| { let session_subscription = SessionSubscription { client_id: client_id.clone(), @@ -394,6 +403,35 @@ impl Broker { }; session.send(ClientMessage::Packet(Packet::SubscribeAck(subscribe_ack))).await; + + // Send all retained messages which match the new subscriptions + let mut publish_packets = vec![]; + for topic in packet.subscription_topics { + for (topic, retained_message) in + retained_messages.retained_messages(&topic.topic_filter) + { + let publish = PublishPacket { + is_duplicate: false, + qos: QoS::AtMostOnce, // TODO(bschwind) + retain: true, + topic, + payload: retained_message.clone(), + + packet_id: None, // TODO(bschwind) + payload_format_indicator: None, + message_expiry_interval: None, + topic_alias: None, + response_topic: None, + correlation_data: None, + user_properties: vec![], + subscription_identifier: None, + content_type: None, + }; + publish_packets.push(Packet::Publish(publish)); + } + } + + session.send(ClientMessage::Packets(publish_packets)).await; } } @@ -531,6 +569,16 @@ impl Broker { async fn handle_publish(&mut self, client_id: String, packet: PublishPacket) { let mut is_dup = false; + if packet.retain { + if packet.payload.len() > 0 { + println!("Storing retained message for topic {:?}", packet.topic); + self.retained_messages.insert(&packet.topic, packet.payload.clone()); + } else { + println!("Deleting retained message for topic {:?}", packet.topic); + self.retained_messages.remove(&packet.topic); + } + } + // For QoS2, ensure this packet isn't delivered twice. So if we have an outgoing // publish receive with the same ID, just send the publish receive again but don't forward // the message.