Skip to content

Commit

Permalink
fix: added automatic session refresh on SessionTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
WhySoBad committed Apr 2, 2024
1 parent a102ed3 commit 1cdd703
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 85 deletions.
149 changes: 83 additions & 66 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,82 +87,89 @@ impl Device {
}
}

/// Attempt to refresh the auth session for the device
/// Forcefully refresh the session for the device
///
/// Should the session be expired or the previous refresh attempt failed a new attempt is started.
/// After 10 failed refresh attempts the session state changes to [`SessionStatus::RepeatedFailure`] which only allows
/// the next refresh attempt after 10 minutes
pub async fn try_refresh_session(&mut self) -> Result<(), Status> {
/// This method should only be called directly when a [`tapo::TapoResponseError::SessionTimeout`]
/// was returned from an api call
pub async fn refresh_session(&mut self) -> Result<(), Status> {
info!("Attempting to refresh session for device '{}'", self.name);
let now = SystemTime::now();

debug!("Try session refresh: {:?} {:?}", now, self.next_session_action);

if now.ge(&self.next_session_action) {
info!("Attempting to refresh session for device '{}'", self.name);

let current = self.session_status.clone();

let result = if let Some(handler) = &mut self.handler {
let result = match handler {
DeviceHandler::ColorLight(handler) => {
handler.refresh_session().await.map_err(|err| Status::internal(err.to_string())).err()
},
DeviceHandler::Light(handler) => {
handler.refresh_session().await.map_err(|err| Status::internal(err.to_string())).err()
},
DeviceHandler::Generic(handler) => {
handler.refresh_session().await.map_err(|err| Status::internal(err.to_string())).err()
},
};
if let Some(error) = result {
debug!("Session refresh failed for device '{}' with reason: {}", self.name, error);
self.session_status = SessionStatus::Failure;
self.refresh_retires = 1;
Err(error)
} else {
debug!("Successfully refreshed session for device '{}'", self.name);
let current = self.session_status.clone();

let result = if let Some(handler) = &mut self.handler {
let result = match handler {
DeviceHandler::ColorLight(handler) => {
handler.refresh_session().await.map_err(|err| Status::internal(err.to_string())).err()
},
DeviceHandler::Light(handler) => {
handler.refresh_session().await.map_err(|err| Status::internal(err.to_string())).err()
},
DeviceHandler::Generic(handler) => {
handler.refresh_session().await.map_err(|err| Status::internal(err.to_string())).err()
},
};
if let Some(error) = result {
debug!("Session refresh failed for device '{}' with reason: {}", self.name, error);
self.session_status = SessionStatus::Failure;
self.refresh_retires = 1;
Err(error)
} else {
debug!("Successfully refreshed session for device '{}'", self.name);
self.next_session_action = now + Duration::from_millis(SESSION_VALIDITY_MILLIS);
Ok(())
}
} else {
debug!("Attempting initial session acquisition for device '{}'", self.name);
match Self::acquire_handler(&self.r#type, &self.address, self.client.clone()).await {
Ok(handler) => {
self.session_status = SessionStatus::Authenticated;
self.next_session_action = now + Duration::from_millis(SESSION_VALIDITY_MILLIS);
self.refresh_retires = 0;
self.handler = Some(handler);
debug!("Initial session acquisition succeeded for device '{}'. Next action is required at {:?}", self.name, self.next_session_action);
Ok(())
}
} else {
debug!("Attempting initial session acquisition for device '{}'", self.name);
match Self::acquire_handler(&self.r#type, &self.address, self.client.clone()).await {
Ok(handler) => {
self.session_status = SessionStatus::Authenticated;
self.next_session_action = now + Duration::from_millis(SESSION_VALIDITY_MILLIS);
self.refresh_retires = 0;
self.handler = Some(handler);
debug!("Initial session acquisition succeeded for device '{}'. Next action is required at {:?}", self.name, self.next_session_action);
Ok(())
},
Err(status) => {
self.refresh_retires = min(self.refresh_retires + 1, SESSION_REFRESH_RETRIES);
if self.refresh_retires == SESSION_REFRESH_RETRIES {
self.session_status = SessionStatus::RepeatedFailure;
self.next_session_action = now + Duration::from_millis(REPEATED_FAILURE_RETRY_MILLIS)
}
debug!("Initial session acquisition failed for device '{}'. Next action is required at {:?}. Failures in row: {}", self.name, self.next_session_action, self.refresh_retires);
Err(status)
},
Err(status) => {
self.refresh_retires = min(self.refresh_retires + 1, SESSION_REFRESH_RETRIES);
if self.refresh_retires == SESSION_REFRESH_RETRIES {
self.session_status = SessionStatus::RepeatedFailure;
self.next_session_action = now + Duration::from_millis(REPEATED_FAILURE_RETRY_MILLIS)
}
debug!("Initial session acquisition failed for device '{}'. Next action is required at {:?}. Failures in row: {}", self.name, self.next_session_action, self.refresh_retires);
Err(status)
}
}
};

if current.ne(&self.session_status) {
debug!("Session status changed: {:?}", self.session_status);
let device = rpc::Device {
name: self.name.clone(),
status: i32::from(transform_session_status(&self.session_status)),
address: self.address.clone(),
r#type: format!("{:?}", self.r#type)
};

if current.ne(&self.session_status) {
debug!("Session status changed: {:?}", self.session_status);
let device = rpc::Device {
name: self.name.clone(),
status: i32::from(transform_session_status(&self.session_status)),
address: self.address.clone(),
r#type: format!("{:?}", self.r#type)
};

if let Err(err) = self.sender.send(create_event(EventType::DeviceAuthChange, device)) {
error!("Error whilst sending new device auth state: {err}")
}
if let Err(err) = self.sender.send(create_event(EventType::DeviceAuthChange, device)) {
error!("Error whilst sending new device auth state: {err}")
}
}

result
result
}

/// Attempt to refresh the auth session for the device
///
/// Should the session be expired or the previous refresh attempt failed a new attempt is started.
/// After 10 failed refresh attempts the session state changes to [`SessionStatus::RepeatedFailure`] which only allows
/// the next refresh attempt after 10 minutes
pub async fn try_refresh_session(&mut self) -> Result<(), Status> {
let now = SystemTime::now();

debug!("Try session refresh: {:?} {:?}", now, self.next_session_action);

if now.ge(&self.next_session_action) {
self.refresh_session().await
} else {
Ok(())
}
Expand All @@ -174,7 +181,17 @@ impl Device {
pub fn get_handler(&self) -> Result<&DeviceHandler, Status> {
match &self.handler {
Some(handler) => Ok(handler),
None => Err(Status::unavailable(format!("The device '{}' is currently unauthenticated. Try again later or verify the configuration should the issue persist.", self.name)))
None => Err(Status::unauthenticated(format!("The device '{}' is currently unauthenticated. Try again later or verify the configuration should the issue persist.", self.name)))
}
}

/// Mutable access to current device handler
///
/// Returns tonic status code should the handler be unavailable
pub fn get_handler_mut(&mut self) -> Result<&mut DeviceHandler, Status> {
match &mut self.handler {
Some(handler) => Ok(handler),
None => Err(Status::unauthenticated(format!("The device '{}' is currently unauthenticated. Try again later or verify the configuration should the issue persist.", self.name)))
}
}
}
Expand Down
28 changes: 27 additions & 1 deletion src/tapo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use log::{error, info};
use serde::Serialize;
use serde_json::json;
use spinoff::Spinner;
use tapo::ApiClient;
use tapo::{ApiClient, TapoResponseError};
use tokio::sync::Mutex;
use tonic::transport::Server;
use crate::cli::SpinnerOpt;
Expand Down Expand Up @@ -153,6 +153,32 @@ impl<R> TonicErrMap<R> for Result<R, tonic::Status> {
}
}

pub trait TapoErrMap<R> {
async fn map_tapo_err(self, handler: &mut Device) -> Result<R, tonic::Status>;
}

impl<R> TapoErrMap<R> for Result<R, tapo::Error> {
async fn map_tapo_err(self, device: &mut Device) -> Result<R, tonic::Status> {
match self {
Ok(data) => Ok(data),
Err(err) => {
match err {
// there is a session timeout which didn't get caught by the optimistic session
// validator. Forcefully refresh the session
tapo::Error::Tapo(TapoResponseError::SessionTimeout) => {
if let Some(error) = device.refresh_session().await.err() {
Err(error)
} else {
Err(tonic::Status::unauthenticated(err.to_string()))
}
},
_ => Err(tonic::Status::internal(err.to_string()))
}
}
}
}
}


impl Display for InfoResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down
36 changes: 18 additions & 18 deletions src/tapo/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rpc::tapo_server::Tapo;
use crate::tapo::server::rpc::{DeviceRequest, DevicesResponse, Empty, EventRequest, EventResponse, InfoJsonResponse, InfoResponse, IntegerValueChange, PowerResponse, SetRequest, UsagePerPeriod, UsageResponse};
use crate::device;
use crate::device::Device;
use crate::tapo::{transform_color, transform_session_status};
use crate::tapo::{TapoErrMap, transform_color, transform_session_status};
use crate::tapo::color::{any_to_rgb, color_to_hst};
use crate::tapo::state::State;

Expand Down Expand Up @@ -111,10 +111,10 @@ impl Tapo for TapoService {

match device.get_handler()? {
device::DeviceHandler::Light(handler) => {
handler.device_reset().await.map_err(|err| Status::internal(err.to_string()))?;
handler.device_reset().await.map_tapo_err(&mut device).await?;
}
device::DeviceHandler::ColorLight(handler) => {
handler.device_reset().await.map_err(|err| Status::internal(err.to_string()))?;
handler.device_reset().await.map_tapo_err(&mut device).await?;
},
_ => {
return Err(Status::unimplemented("Reset API is not supported by this device type"))
Expand All @@ -133,7 +133,7 @@ impl Tapo for TapoService {

let response = match device.get_handler()? {
device::DeviceHandler::Light(handler) => {
let info = handler.get_device_info().await.map_err(|err| Status::internal(err.to_string()))?;
let info = handler.get_device_info().await.map_tapo_err(&mut device).await?;
InfoResponse {
brightness: Some(info.brightness as u32),
device_on: Some(info.device_on),
Expand All @@ -144,7 +144,7 @@ impl Tapo for TapoService {
}
}
device::DeviceHandler::Generic(handler) => {
let info = handler.get_device_info().await.map_err(|err| Status::internal(err.to_string()))?;
let info = handler.get_device_info().await.map_tapo_err(&mut device).await?;
InfoResponse {
device_on: info.device_on,
on_time: info.on_time,
Expand All @@ -154,7 +154,7 @@ impl Tapo for TapoService {
}
}
device::DeviceHandler::ColorLight(handler) => {
let info = handler.get_device_info().await.map_err(|err| Status::internal(err.to_string()))?;
let info = handler.get_device_info().await.map_tapo_err(&mut device).await?;
let brightness = Some(info.brightness as u32);
let hue = info.hue.map(|v| v as u32);
let saturation = info.saturation.map(|v| v as u32);
Expand Down Expand Up @@ -185,13 +185,13 @@ impl Tapo for TapoService {

let info = match device.get_handler()? {
device::DeviceHandler::Light(handler) => {
handler.get_device_info_json().await.map_err(|err| Status::internal(err.to_string()))?
handler.get_device_info_json().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::Generic(handler) => {
handler.get_device_info_json().await.map_err(|err| Status::internal(err.to_string()))?
handler.get_device_info_json().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::ColorLight(handler) => {
handler.get_device_info_json().await.map_err(|err| Status::internal(err.to_string()))?
handler.get_device_info_json().await.map_tapo_err(&mut device).await?
}
};

Expand All @@ -211,10 +211,10 @@ impl Tapo for TapoService {

let usage = match device.get_handler()? {
device::DeviceHandler::Light(handler) => {
handler.get_device_usage().await.map_err(|err| Status::internal(err.to_string()))?
handler.get_device_usage().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::ColorLight(handler) => {
handler.get_device_usage().await.map_err(|err| Status::internal(err.to_string()))?
handler.get_device_usage().await.map_tapo_err(&mut device).await?
},
device::DeviceHandler::Generic(_) => {
return Err(Status::unimplemented("Device usage API is not supported by this device type"))
Expand Down Expand Up @@ -256,13 +256,13 @@ impl Tapo for TapoService {

match device.get_handler()? {
device::DeviceHandler::Light(handler) => {
handler.on().await.map_err(|err| Status::internal(err.to_string()))?
handler.on().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::Generic(handler) => {
handler.on().await.map_err(|err| Status::internal(err.to_string()))?
handler.on().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::ColorLight(handler) => {
handler.on().await.map_err(|err| Status::internal(err.to_string()))?
handler.on().await.map_tapo_err(&mut device).await?
}
}

Expand All @@ -282,13 +282,13 @@ impl Tapo for TapoService {

match device.get_handler()? {
device::DeviceHandler::Light(handler) => {
handler.off().await.map_err(|err| Status::internal(err.to_string()))?
handler.off().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::Generic(handler) => {
handler.off().await.map_err(|err| Status::internal(err.to_string()))?
handler.off().await.map_tapo_err(&mut device).await?
}
device::DeviceHandler::ColorLight(handler) => {
handler.off().await.map_err(|err| Status::internal(err.to_string()))?
handler.off().await.map_tapo_err(&mut device).await?
}
}

Expand Down Expand Up @@ -446,7 +446,7 @@ impl Tapo for TapoService {
}
}
info.color = any_to_rgb(info.temperature, info.hue, info.saturation, info.brightness);
set.send(handler).await.map_err(|err| Status::internal(err.to_string()))?;
set.send(handler).await.map_tapo_err(&mut device).await?;
self.get_state().await.update_info_optimistically(device.name.clone(), info.clone());
Ok(Response::new(info))
}
Expand Down

0 comments on commit 1cdd703

Please sign in to comment.