Skip to content

Commit

Permalink
add unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
fredszaq committed Aug 6, 2018
1 parent cfc0b80 commit 79ca964
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use MqttOptions;
pub enum Command {
Status(#[debug_stub = ""] ::std::sync::mpsc::Sender<::state::MqttConnectionStatus>),
Subscribe(Subscription),
Unsubscribe(TopicPath),
Publish(Publish),
Connect,
Disconnect,
Expand Down Expand Up @@ -156,8 +157,21 @@ impl<'a> SubscriptionBuilder<'a> {
it: Subscription { qos, ..it },
}
}
pub fn send(self) -> Result<()> {
self.client.send_command(Command::Subscribe(self.it))
pub fn send(self) -> Result<SubscriptionToken<'a>> {
let token = SubscriptionToken { client: &self.client,topic_path: self.it.topic_path.clone()};
self.client.send_command(Command::Subscribe(self.it))?;
Ok(token)
}
}

pub struct SubscriptionToken<'a> {
client: &'a MqttClient,
topic_path: TopicPath
}

impl<'a> SubscriptionToken<'a> {
pub fn unsubscribe(self) -> Result<()> {
self.client.send_command(Command::Unsubscribe(self.topic_path))
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ impl ConnectionState {
self.turn_command()?;
}
mqtt3::Packet::Suback(suback) => self.mqtt_state.handle_incoming_suback(suback)?,
mqtt3::Packet::Unsuback(packet_identifier) => self.mqtt_state.handle_incoming_unsuback(packet_identifier)?,
mqtt3::Packet::Publish(publish) => {
let (_, server) = self.mqtt_state.handle_incoming_publish(publish)?;
if let Some(server) = server {
Expand Down Expand Up @@ -437,6 +438,10 @@ impl ConnectionState {
let packet = self.mqtt_state.handle_outgoing_subscribe(vec![sub])?;
self.send_packet(mqtt3::Packet::Subscribe(packet))?
}
Command::Unsubscribe(topic_path) => {
let packet = self.mqtt_state.handle_outgoing_unsubscribe(vec![topic_path])?;
self.send_packet(mqtt3::Packet::Unsubscribe(packet))?
}
Command::Status(tx) => {
let _ = tx.send(self.state().status());
}
Expand Down
24 changes: 24 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,26 @@ impl MqttState {
}
}

pub fn handle_outgoing_unsubscribe(
&mut self,
topics: Vec<::mqtt3::TopicPath>,
) -> Result<mqtt3::Unsubscribe> {
let pkid = self.next_pkid();
let topics: Vec<String> = topics.iter()
.map(|s| s.path.clone())
.collect();
self.subscriptions.retain(|it| !topics.contains(&it.topic_path.path));

if self.connection_status == MqttConnectionStatus::Connected {
Ok(mqtt3::Unsubscribe { pid: pkid, topics })
} else {
error!(
"State = {:?}. Shouldn't unsubscribe in this state",
self.connection_status
);
Err(ErrorKind::InvalidState.into())
}
}

pub fn handle_incoming_suback(&mut self, ack: mqtt3::Suback) -> Result<()> {
if ack.return_codes
Expand All @@ -313,6 +333,10 @@ impl MqttState {
Ok(())
}

pub fn handle_incoming_unsuback(&mut self, ack: mqtt3::PacketIdentifier) -> Result<()> {
Ok(())
}

pub fn handle_socket_disconnect(&mut self) {
self.await_pingresp = false;
self.set_status_after_error();
Expand Down
45 changes: 45 additions & 0 deletions tests/testsuite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,51 @@ fn basic_publishes_and_subscribes() {
assert_eq!(3, final_count.load(Ordering::SeqCst));
}

#[test]
fn publishes_and_subscribes_and_unsubscribes() {
// loggerv::init_with_level(log::LogLevel::Debug);
let client_options = MqttOptions::new("pubsubunsub", MOSQUITTO_ADDR);
let count = Arc::new(AtomicUsize::new(0));
let final_count = count.clone();
let count = count.clone();

let request = MqttClient::start(client_options).expect("Coudn't start");
let token = request
.subscribe(
"test/pubsubunsub",
Box::new(move |_| {
count.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap()
.send()
.unwrap();

let payload = format!("hello rust");
request
.publish("test/pubsubunsub")
.unwrap()
.payload(payload.clone().into_bytes())
.send()
.unwrap();

thread::sleep(Duration::from_secs(1));
token.unsubscribe().unwrap();
thread::sleep(Duration::from_secs(1));

request
.publish("test/pubsubunsub")
.unwrap()
.payload(payload.clone().into_bytes())
.send()
.unwrap();

thread::sleep(Duration::from_secs(1));

assert_eq!(1, final_count.load(Ordering::SeqCst));
}


#[test]
fn alive() {
// loggerv::init_with_level(log::LogLevel::Debug);
Expand Down

0 comments on commit 79ca964

Please sign in to comment.