Skip to content

Commit

Permalink
Use Topic instead of String for matching_subscribers function
Browse files Browse the repository at this point in the history
  • Loading branch information
bschwind committed Jan 24, 2020
1 parent 61ed4d7 commit 3bc9acc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
6 changes: 6 additions & 0 deletions src/topic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ impl<'a> TopicFilter {
}
}

impl<'a> Topic {
pub fn levels(&'a self) -> TopicLevels<'a> {
TopicLevels { levels_iter: self.topic_name.split(TOPIC_SEPARATOR) }
}
}

impl<'a> Iterator for TopicLevels<'a> {
type Item = TopicLevel<'a>;

Expand Down
49 changes: 24 additions & 25 deletions src/topic/tree.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::topic::{
filter::{TopicFilter, TopicLevel},
TOPIC_SEPARATOR,
};
use crate::topic::{Topic, TopicFilter, TopicLevel};
use std::collections::{hash_map::Entry, HashMap};

// TODO(bschwind) - Support shared subscriptions
Expand Down Expand Up @@ -33,8 +30,8 @@ impl<T: std::fmt::Debug> SubscriptionTree<T> {
counter
}

pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, sub_fn: F) {
self.root.matching_subscribers(topic_name, sub_fn)
pub fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic: &Topic, sub_fn: F) {
self.root.matching_subscribers(topic, sub_fn)
}

pub fn remove(&mut self, topic_filter: &TopicFilter, counter: u64) -> Option<T> {
Expand Down Expand Up @@ -202,15 +199,15 @@ impl<T: std::fmt::Debug> SubscriptionTreeNode<T> {
return_val.map(|(_, val)| val)
}

fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic_name: &str, mut sub_fn: F) {
fn matching_subscribers<'a, F: FnMut(&T)>(&'a self, topic: &Topic, mut sub_fn: F) {
let mut tree_stack = vec![];
let levels: Vec<&str> = topic_name.split(TOPIC_SEPARATOR).collect();
let levels: Vec<TopicLevel> = topic.levels().collect();

tree_stack.push((self, 0));

while !tree_stack.is_empty() {
let (current_tree, current_level) = tree_stack.pop().unwrap();
let level = levels[current_level];
let level = &levels[current_level];

for (_, subscriber) in &current_tree.multi_level_wildcards {
sub_fn(subscriber);
Expand All @@ -226,15 +223,17 @@ impl<T: std::fmt::Debug> SubscriptionTreeNode<T> {
}
}

if current_tree.concrete_topic_levels.contains_key(level) {
let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap();
if let TopicLevel::Concrete(level) = level {
if current_tree.concrete_topic_levels.contains_key(*level) {
let sub_tree = current_tree.concrete_topic_levels.get(*level).unwrap();

if current_level + 1 < levels.len() {
let sub_tree = current_tree.concrete_topic_levels.get(level).unwrap();
tree_stack.push((sub_tree, current_level + 1));
} else {
for (_, subscriber) in &sub_tree.subscribers {
sub_fn(subscriber);
if current_level + 1 < levels.len() {
let sub_tree = current_tree.concrete_topic_levels.get(*level).unwrap();
tree_stack.push((sub_tree, current_level + 1));
} else {
for (_, subscriber) in &sub_tree.subscribers {
sub_fn(subscriber);
}
}
}
}
Expand Down Expand Up @@ -265,52 +264,52 @@ mod tests {

println!("{:#?}", sub_tree);

sub_tree.matching_subscribers("home/kitchen", |s| {
sub_tree.matching_subscribers(&"home/kitchen".parse().unwrap(), |s| {
println!("{}", s);
});

println!();

sub_tree.matching_subscribers("home/kitchen/humidity", |s| {
sub_tree.matching_subscribers(&"home/kitchen/humidity".parse().unwrap(), |s| {
println!("{}", s);
});

println!();

sub_tree.matching_subscribers("office/stairwell/temperature", |s| {
sub_tree.matching_subscribers(&"office/stairwell/temperature".parse().unwrap(), |s| {
println!("{}", s);
});

println!();

sub_tree.matching_subscribers(
"office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature",
&"office/tokyo/shibuya/some_desk/cpu_1/fan_speed/blade_4/temperature".parse().unwrap(),
|s| {
println!("{}", s);
},
);

println!();

sub_tree.matching_subscribers("home", |s| {
sub_tree.matching_subscribers(&"home".parse().unwrap(), |s| {
println!("{}", s);
});

println!();

sub_tree.matching_subscribers("sport/tennis/player1", |s| {
sub_tree.matching_subscribers(&"sport/tennis/player1".parse().unwrap(), |s| {
println!("{}", s);
});

println!();

sub_tree.matching_subscribers("sport/tennis/player2", |s| {
sub_tree.matching_subscribers(&"sport/tennis/player2".parse().unwrap(), |s| {
println!("{}", s);
});

println!();

sub_tree.matching_subscribers("sport/tennis/player1/ranking", |s| {
sub_tree.matching_subscribers(&"sport/tennis/player1/ranking".parse().unwrap(), |s| {
println!("{}", s);
});
}
Expand Down

0 comments on commit 3bc9acc

Please sign in to comment.