Skip to content

Commit

Permalink
Merge pull request #724 from Stremio/feat/streming-server-urls-bucket
Browse files Browse the repository at this point in the history
feat: streaming-server urls bucket
  • Loading branch information
elpiel authored Nov 28, 2024
2 parents d61d2f7 + 44c23a7 commit 4a6cfe8
Show file tree
Hide file tree
Showing 33 changed files with 378 additions and 10 deletions.
3 changes: 2 additions & 1 deletion src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub const LIBRARY_STORAGE_KEY: &str = "library";
pub const LIBRARY_RECENT_STORAGE_KEY: &str = "library_recent";
pub const STREAMS_STORAGE_KEY: &str = "streams";
pub const SEARCH_HISTORY_STORAGE_KEY: &str = "search_history";
pub const STREAMING_SERVER_URLS_STORAGE_KEY: &str = "streaming_server_urls";
pub const NOTIFICATIONS_STORAGE_KEY: &str = "notifications";
pub const CALENDAR_STORAGE_KEY: &str = "calendar";
pub const DISMISSED_EVENTS_STORAGE_KEY: &str = "dismissed_events";
Expand All @@ -38,7 +39,7 @@ pub const CALENDAR_ITEMS_COUNT: usize = 100;
pub const WATCHED_THRESHOLD_COEF: f64 = 0.7;
pub const CREDITS_THRESHOLD_COEF: f64 = 0.9;
/// The latest migration scheme version
pub const SCHEMA_VERSION: u32 = 14;
pub const SCHEMA_VERSION: u32 = 15;
pub const IMDB_LINK_CATEGORY: &str = "imdb";
pub const GENRES_LINK_CATEGORY: &str = "Genres";
pub const CINEMETA_TOP_CATALOG_ID: &str = "top";
Expand Down
26 changes: 25 additions & 1 deletion src/models/ctx/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::{
common::{DescriptorLoadable, Loadable, ResourceLoadable},
ctx::{
update_events, update_library, update_notifications, update_profile,
update_search_history, update_streams, update_trakt_addon, CtxError, OtherError,
update_search_history, update_streaming_server_urls, update_streams,
update_trakt_addon, CtxError, OtherError,
},
},
runtime::{
Expand All @@ -22,6 +23,7 @@ use crate::{
profile::{Auth, AuthKey, Profile},
resource::MetaItem,
search_history::SearchHistoryBucket,
server_urls::ServerUrlsBucket,
streams::StreamsBucket,
},
};
Expand Down Expand Up @@ -54,6 +56,8 @@ pub struct Ctx {
#[serde(skip)]
pub streams: StreamsBucket,
#[serde(skip)]
pub streaming_server_urls: ServerUrlsBucket,
#[serde(skip)]
pub search_history: SearchHistoryBucket,
#[serde(skip)]
pub dismissed_events: DismissedEventsBucket,
Expand All @@ -78,6 +82,7 @@ impl Ctx {
profile: Profile,
library: LibraryBucket,
streams: StreamsBucket,
streaming_server_urls: ServerUrlsBucket,
notifications: NotificationsBucket,

search_history: SearchHistoryBucket,
Expand All @@ -87,6 +92,7 @@ impl Ctx {
profile,
library,
streams,
streaming_server_urls,
search_history,
dismissed_events,
notifications,
Expand Down Expand Up @@ -119,6 +125,11 @@ impl<E: Env + 'static> Update<E> for Ctx {
let library_effects =
update_library::<E>(&mut self.library, &self.profile, &self.status, msg);
let streams_effects = update_streams::<E>(&mut self.streams, &self.status, msg);
let server_urls_effects = update_streaming_server_urls::<E>(
&mut self.streaming_server_urls,
&self.status,
msg,
);
let search_history_effects =
update_search_history::<E>(&mut self.search_history, &self.status, msg);
let events_effects =
Expand All @@ -144,6 +155,7 @@ impl<E: Env + 'static> Update<E> for Ctx {
.join(profile_effects)
.join(library_effects)
.join(streams_effects)
.join(server_urls_effects)
.join(search_history_effects)
.join(events_effects)
.join(trakt_addon_effects)
Expand All @@ -169,6 +181,11 @@ impl<E: Env + 'static> Update<E> for Ctx {
msg,
);
let streams_effects = update_streams::<E>(&mut self.streams, &self.status, msg);
let server_urls_effects = update_streaming_server_urls::<E>(
&mut self.streaming_server_urls,
&self.status,
msg,
);
let search_history_effects =
update_search_history::<E>(&mut self.search_history, &self.status, msg);
let events_effects =
Expand Down Expand Up @@ -237,6 +254,7 @@ impl<E: Env + 'static> Update<E> for Ctx {
profile_effects
.join(library_effects)
.join(streams_effects)
.join(server_urls_effects)
.join(trakt_addon_effects)
.join(notifications_effects)
.join(search_history_effects)
Expand All @@ -249,6 +267,11 @@ impl<E: Env + 'static> Update<E> for Ctx {
let library_effects =
update_library::<E>(&mut self.library, &self.profile, &self.status, msg);
let streams_effects = update_streams::<E>(&mut self.streams, &self.status, msg);
let server_urls_effects = update_streaming_server_urls::<E>(
&mut self.streaming_server_urls,
&self.status,
msg,
);
let trakt_addon_effects = update_trakt_addon::<E>(
&mut self.trakt_addon,
&self.profile,
Expand All @@ -270,6 +293,7 @@ impl<E: Env + 'static> Update<E> for Ctx {
profile_effects
.join(library_effects)
.join(streams_effects)
.join(server_urls_effects)
.join(trakt_addon_effects)
.join(notifications_effects)
.join(search_history_effects)
Expand Down
3 changes: 3 additions & 0 deletions src/models/ctx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use update_search_history::*;
mod update_trakt_addon;
use update_trakt_addon::*;

mod update_streaming_server_urls;
use update_streaming_server_urls::*;

mod error;
pub use error::*;

Expand Down
64 changes: 64 additions & 0 deletions src/models/ctx/update_streaming_server_urls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use futures::FutureExt;

use crate::constants::STREAMING_SERVER_URLS_STORAGE_KEY;
use crate::runtime::msg::{Action, ActionCtx, CtxAuthResponse};
use crate::runtime::{
msg::{Event, Internal, Msg},
Effect, EffectFuture, Effects, Env, EnvFutureExt,
};
use crate::types::server_urls::ServerUrlsBucket;

use super::{CtxError, CtxStatus};

pub fn update_streaming_server_urls<E: Env + 'static>(
streaming_server_urls: &mut ServerUrlsBucket,
status: &CtxStatus,
msg: &Msg,
) -> Effects {
match msg {
Msg::Action(Action::Ctx(ActionCtx::AddServerUrl(url))) => {
streaming_server_urls.add_url::<E>(url.clone());
Effects::msg(Msg::Internal(Internal::StreamingServerUrlsBucketChanged))
}
Msg::Action(Action::Ctx(ActionCtx::DeleteServerUrl(url))) => {
streaming_server_urls.delete_url(url);
Effects::msg(Msg::Internal(Internal::StreamingServerUrlsBucketChanged))
}
Msg::Internal(Internal::CtxAuthResult(auth_request, result)) => match (status, result) {
(CtxStatus::Loading(loading_auth_request), Ok(CtxAuthResponse { auth, .. }))
if loading_auth_request == auth_request =>
{
let next_server_urls = ServerUrlsBucket::new::<E>(Some(auth.user.id.to_owned()));
*streaming_server_urls = next_server_urls;
Effects::msg(Msg::Internal(Internal::StreamingServerUrlsBucketChanged))
}
_ => Effects::none().unchanged(),
},
Msg::Internal(Internal::StreamingServerUrlsBucketChanged) => {
Effects::one(push_server_urls_to_storage::<E>(streaming_server_urls)).unchanged()
}
_ => Effects::none().unchanged(),
}
}

fn push_server_urls_to_storage<E: Env + 'static>(
streaming_server_urls: &ServerUrlsBucket,
) -> Effect {
let uid: Option<String> = streaming_server_urls.uid.clone();

EffectFuture::Sequential(
E::set_storage(
STREAMING_SERVER_URLS_STORAGE_KEY,
Some(streaming_server_urls),
)
.map(move |result| match result {
Ok(_) => Msg::Event(Event::StreamingServerUrlsPushedToStorage { uid: uid.clone() }),
Err(error) => Msg::Event(Event::Error {
error: CtxError::from(error),
source: Box::new(Event::StreamingServerUrlsPushedToStorage { uid: uid.clone() }),
}),
})
.boxed_env(),
)
.into()
}
33 changes: 29 additions & 4 deletions src/runtime/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::addon_transport::{AddonHTTPTransport, AddonTransport, UnsupportedTran
use crate::constants::{
DISMISSED_EVENTS_STORAGE_KEY, LIBRARY_RECENT_STORAGE_KEY, LIBRARY_STORAGE_KEY,
PROFILE_STORAGE_KEY, SCHEMA_VERSION, SCHEMA_VERSION_STORAGE_KEY, SEARCH_HISTORY_STORAGE_KEY,
STREAMS_STORAGE_KEY,
STREAMING_SERVER_URLS_STORAGE_KEY, STREAMS_STORAGE_KEY,
};
use crate::models::ctx::Ctx;
use crate::models::streaming_server::StreamingServer;
Expand Down Expand Up @@ -265,6 +265,12 @@ pub trait Env {
.await?;
schema_version = 14;
}
if schema_version == 14 {
migrate_storage_schema_to_v15::<Self>()
.map_err(|error| EnvError::StorageSchemaVersionUpgrade(Box::new(error)))
.await?;
schema_version = 15;
}
if schema_version != SCHEMA_VERSION {
panic!(
"Storage schema version must be upgraded from {} to {}",
Expand Down Expand Up @@ -594,6 +600,12 @@ fn migrate_storage_schema_to_v14<E: Env>() -> TryEnvFuture<()> {
.boxed_env()
}

fn migrate_storage_schema_to_v15<E: Env>() -> TryEnvFuture<()> {
E::set_storage::<()>(STREAMING_SERVER_URLS_STORAGE_KEY, None)
.and_then(|_| E::set_storage(SCHEMA_VERSION_STORAGE_KEY, Some(&15)))
.boxed_env()
}

#[cfg(test)]
mod test {
use serde_json::{json, Value};
Expand All @@ -606,9 +618,9 @@ mod test {
env::{
migrate_storage_schema_to_v10, migrate_storage_schema_to_v11,
migrate_storage_schema_to_v12, migrate_storage_schema_to_v13,
migrate_storage_schema_to_v14, migrate_storage_schema_to_v6,
migrate_storage_schema_to_v7, migrate_storage_schema_to_v8,
migrate_storage_schema_to_v9,
migrate_storage_schema_to_v14, migrate_storage_schema_to_v15,
migrate_storage_schema_to_v6, migrate_storage_schema_to_v7,
migrate_storage_schema_to_v8, migrate_storage_schema_to_v9,
},
Env,
},
Expand Down Expand Up @@ -1125,4 +1137,17 @@ mod test {
"Profile should match"
);
}

#[tokio::test]
async fn test_migration_from_14_to_15() {
let _test_env_guard = TestEnv::reset().expect("Should lock TestEnv");

migrate_storage_schema_to_v15::<TestEnv>()
.await
.expect("Should migrate");

{
assert_storage_schema_version(15);
}
}
}
4 changes: 4 additions & 0 deletions src/runtime/msg/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub enum ActionCtx {
GetEvents,
/// Dismiss an event by id, either a Modal or Notification
DismissEvent(String),
/// Add a server URL to the list of available streaming servers
AddServerUrl(Url),
/// Delete a server URL from the list of available streaming servers
DeleteServerUrl(Url),
}

#[derive(Clone, Deserialize, Debug)]
Expand Down
6 changes: 6 additions & 0 deletions src/runtime/msg/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ pub enum Event {
PlayingOnDevice {
device: String,
},
StreamingServerUrlsBucketChanged {
uid: UID,
},
StreamingServerUrlsPushedToStorage {
uid: UID,
},
Error {
error: CtxError,
source: Box<Event>,
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/msg/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub enum Internal {
StreamsChanged(bool),
/// Search history has changed.
SearchHistoryChanged,
/// Server URLs bucket has changed.
StreamingServerUrlsBucketChanged,
/// User notifications have changed
NotificationsChanged,
/// Pulling of notifications triggered either by the user (with an action) or
Expand Down
1 change: 1 addition & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod player;
pub mod profile;
pub mod resource;
pub mod search_history;
pub mod server_urls;
pub mod streaming_server;
pub mod streams;
pub mod torrent;
Expand Down
2 changes: 2 additions & 0 deletions src/types/server_urls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod server_urls_bucket;
pub use server_urls_bucket::*;
39 changes: 39 additions & 0 deletions src/types/server_urls/server_urls_bucket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::{constants::STREAMING_SERVER_URL, runtime::Env, types::profile::UID};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use url::Url;

#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct ServerUrlsBucket {
/// User ID
pub uid: UID,
/// A map of the the server Url as a key and the modified/added datetime of that particular url.
pub items: HashMap<Url, DateTime<Utc>>,
}

impl ServerUrlsBucket {
/// Create a new `ServerUrlsBucket` with the base URL inserted.
pub fn new<E: Env + 'static>(uid: UID) -> Self {
let mut items = HashMap::new();
let base_url: &Url = &STREAMING_SERVER_URL;
let mtime = E::now();
items.insert(base_url.clone(), mtime);

ServerUrlsBucket { uid, items }
}

/// Add a URL to the bucket.
pub fn add_url<E: Env + 'static>(&mut self, url: Url) {
let mtime = E::now();
self.items.insert(url, mtime);
}

/// Delete a URL from the bucket.
pub fn delete_url(&mut self, url: &Url) {
let default_url: &Url = &STREAMING_SERVER_URL;
if url != default_url {
self.items.remove(url);
}
}
}
3 changes: 3 additions & 0 deletions src/unit_tests/catalog_with_filters/load_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::types::notifications::NotificationsBucket;
use crate::types::profile::Profile;
use crate::types::resource::MetaItemPreview;
use crate::types::search_history::SearchHistoryBucket;
use crate::types::server_urls::ServerUrlsBucket;
use crate::types::streams::StreamsBucket;
use crate::unit_tests::{
default_fetch_handler, Request, TestEnv, EVENTS, FETCH_HANDLER, REQUESTS, STATES,
Expand Down Expand Up @@ -50,6 +51,7 @@ fn default_catalog() {
Profile::default(),
LibraryBucket::default(),
StreamsBucket::default(),
ServerUrlsBucket::new::<TestEnv>(None),
NotificationsBucket::new::<TestEnv>(None, vec![]),
SearchHistoryBucket::default(),
DismissedEventsBucket::default(),
Expand Down Expand Up @@ -151,6 +153,7 @@ fn search_catalog() {
Profile::default(),
LibraryBucket::default(),
StreamsBucket::default(),
ServerUrlsBucket::new::<TestEnv>(None),
NotificationsBucket::new::<TestEnv>(None, vec![]),
SearchHistoryBucket::default(),
DismissedEventsBucket::default(),
Expand Down
3 changes: 3 additions & 0 deletions src/unit_tests/ctx/add_to_library.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types::notifications::NotificationsBucket;
use crate::types::profile::{Auth, AuthKey, GDPRConsent, Profile, User};
use crate::types::resource::{MetaItemBehaviorHints, MetaItemPreview, PosterShape};
use crate::types::search_history::SearchHistoryBucket;
use crate::types::server_urls::ServerUrlsBucket;
use crate::types::streams::StreamsBucket;
use crate::types::True;
use crate::unit_tests::{
Expand Down Expand Up @@ -106,6 +107,7 @@ fn actionctx_addtolibrary() {
..Default::default()
},
StreamsBucket::default(),
ServerUrlsBucket::new::<TestEnv>(None),
NotificationsBucket::new::<TestEnv>(None, vec![]),
SearchHistoryBucket::default(),
DismissedEventsBucket::default(),
Expand Down Expand Up @@ -243,6 +245,7 @@ fn actionctx_addtolibrary_already_added() {
.collect(),
},
StreamsBucket::default(),
ServerUrlsBucket::new::<TestEnv>(None),
NotificationsBucket::new::<TestEnv>(None, vec![]),
SearchHistoryBucket::default(),
DismissedEventsBucket::default(),
Expand Down
Loading

0 comments on commit 4a6cfe8

Please sign in to comment.