Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher in multiple rooms #91

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use janus_plugin::{
JanssonValue, JanusError, JanusResult, LibraryMetadata, Plugin, PluginCallbacks, PluginResult, PluginSession,
PluginRtpPacket, PluginRtcpPacket, PluginDataPacket, RawJanssonValue, RawPluginResult,
};
use messages::{JsepKind, MessageKind, OptionalField, Subscription};
use messages::{JsepKind, MessageKind, OptionalField, Subscription, parse_all_rooms};
use messages::{RoomId, UserId};
use once_cell::sync::{Lazy, OnceCell};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -323,9 +323,11 @@ extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) {
// if this user is entirely disconnected, notify their roommates.
// todo: is it better if this is instead when their publisher disconnects?
if !switchboard.is_connected(&joined.user_id) {
let response = json!({ "event": "leave", "user_id": &joined.user_id, "room_id": &joined.room_id });
let occupants = switchboard.publishers_occupying(&joined.room_id);
notify_except(&response, &joined.user_id, occupants);
for room_id in &joined.room_ids {
let response = json!({ "event": "leave", "user_id": &joined.user_id, "room_id": &room_id });
let occupants = switchboard.publishers_occupying(&room_id);
notify_except(&response, &joined.user_id, occupants);
}
}
}
sess.destroyed.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -408,8 +410,10 @@ extern "C" fn hangup_media(handle: *mut PluginSession) {
}

fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe: Option<Subscription>, token: Option<String>) -> MessageResult {
let (room_id, all_rooms) = parse_all_rooms(room_id.clone());
// todo: holy shit clean this function up somehow
let config = CONFIG.get().unwrap();
// TODO: check security for all_rooms and not only room_id (main room)
match (&config.auth_key, token) {
(None, _) => {
janus_verb!(
Expand Down Expand Up @@ -455,6 +459,7 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
let join_kind = if gets_data_channel { JoinKind::Publisher } else { JoinKind::Subscriber };

if join_kind == JoinKind::Publisher {
// TODO: check for all rooms?
if config.max_room_size > 0 && room_users.len() >= config.max_room_size {
return Err(From::from("Room is full."));
}
Expand All @@ -463,14 +468,16 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
}
}

if let Err(_existing) = from.join_state.set(JoinState::new(join_kind, room_id.clone(), user_id.clone())) {
if let Err(_existing) = from.join_state.set(JoinState::new(join_kind, all_rooms.clone(), user_id.clone())) {
return Err(From::from("Handles may only join once!"));
}

if join_kind == JoinKind::Publisher {
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id });
switchboard.join_publisher(Arc::clone(from), user_id.clone(), room_id.clone());
notify_except(&notification, &user_id, switchboard.publishers_occupying(&room_id));
switchboard.join_publisher(Arc::clone(from), user_id.clone(), all_rooms.clone());
for room in all_rooms {
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room });
notify_except(&notification, &user_id, switchboard.publishers_occupying(&room));
}
} else {
switchboard.join_subscriber(Arc::clone(from), user_id.clone(), room_id.clone());
}
Expand Down Expand Up @@ -534,8 +541,10 @@ fn process_block(from: &Arc<Session>, whom: UserId) -> MessageResult {
janus_info!("Processing block from {:p} to {}", from.handle, whom);
if let Some(joined) = from.join_state.get() {
let mut switchboard = SWITCHBOARD.write()?;
let event = json!({ "event": "blocked", "by": &joined.user_id });
notify_user(&event, &whom, switchboard.publishers_occupying(&joined.room_id));
if let Some(publisher) = switchboard.get_publisher(&whom) {
let event = json!({ "event": "blocked", "by": &joined.user_id });
notify_user(&event, &whom, &[publisher]);
}
switchboard.establish_block(joined.user_id.clone(), whom);
Ok(MessageResponse::msg(json!({})))
} else {
Expand All @@ -550,9 +559,9 @@ fn process_unblock(from: &Arc<Session>, whom: UserId) -> MessageResult {
switchboard.lift_block(&joined.user_id, &whom);
if let Some(publisher) = switchboard.get_publisher(&whom) {
send_fir(&[publisher]);
let event = json!({ "event": "unblocked", "by": &joined.user_id });
notify_user(&event, &whom, &[publisher]);
}
let event = json!({ "event": "unblocked", "by": &joined.user_id });
notify_user(&event, &whom, switchboard.publishers_occupying(&joined.room_id));
Ok(MessageResponse::msg(json!({})))
} else {
Err(From::from("Cannot unblock when not in a room."))
Expand Down Expand Up @@ -586,11 +595,13 @@ fn process_data(from: &Arc<Session>, whom: Option<UserId>, body: &str) -> Messag
let payload = json!({ "event": "data", "body": body });
let switchboard = SWITCHBOARD.read().expect("Switchboard lock poisoned; can't continue.");
if let Some(joined) = from.join_state.get() {
let occupants = switchboard.publishers_occupying(&joined.room_id);
if let Some(user_id) = whom {
send_data_user(&payload, &user_id, occupants);
} else {
send_data_except(&payload, &joined.user_id, occupants);
for room_id in &joined.room_ids {
let occupants = switchboard.publishers_occupying(&room_id);
if let Some(ref user_id) = whom {
send_data_user(&payload, &user_id, occupants);
} else {
send_data_except(&payload, &joined.user_id, occupants);
}
}
Ok(MessageResponse::msg(json!({})))
} else {
Expand Down
5 changes: 5 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ use std::error::Error;
/// A room ID representing a Janus multicast room.
pub type RoomId = String;

pub fn parse_all_rooms(room_id: RoomId) -> (RoomId, Vec<RoomId>) {
let rooms: Vec<String> = room_id.split("-").map(String::from).collect();
(rooms.first().unwrap().clone(), rooms)
}

/// A user ID representing a single Janus client. Used to correlate multiple Janus connections back to the same
/// conceptual user for managing subscriptions.
pub type UserId = String;
Expand Down
8 changes: 4 additions & 4 deletions src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ pub struct JoinState {
/// Whether this session is a subscriber or a publisher.
pub kind: JoinKind,

/// The room ID that this session is in.
pub room_id: RoomId,
/// The room Ids that this session is in.
pub room_ids: Vec<RoomId>,

/// An opaque ID uniquely identifying this user.
pub user_id: UserId,
}

impl JoinState {
pub fn new(kind: JoinKind, room_id: RoomId, user_id: UserId) -> Self {
Self { kind, room_id, user_id }
pub fn new(kind: JoinKind, room_ids: Vec<RoomId>, user_id: UserId) -> Self {
Self { kind, room_ids, user_id }
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/switchboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ impl Switchboard {
self.blockers_to_miscreants.disassociate(from, target);
}

pub fn join_publisher(&mut self, session: Arc<Session>, user: UserId, room: RoomId) {
pub fn join_publisher(&mut self, session: Arc<Session>, user: UserId, room_ids: Vec<RoomId>) {
self.publishers_by_user.entry(user).or_insert(session.clone());
self.publishers_by_room.entry(room).or_insert_with(Vec::new).push(session);
for room_id in room_ids {
self.publishers_by_room.entry(room_id.clone()).or_insert_with(Vec::new).push(session.clone());
}
}

pub fn join_subscriber(&mut self, session: Arc<Session>, user: UserId, _room: RoomId) {
Expand All @@ -164,10 +166,12 @@ impl Switchboard {
self.publisher_to_subscribers.remove_key(session);
if let Some(joined) = session.join_state.get() {
self.publishers_by_user.remove(&joined.user_id);
if let Entry::Occupied(mut others) = self.publishers_by_room.entry(joined.room_id.clone()) {
others.get_mut().retain(|x| x.as_ref() != session);
if others.get().is_empty() {
others.remove_entry();
for room_id in &joined.room_ids {
if let Entry::Occupied(mut others) = self.publishers_by_room.entry(room_id.clone()) {
others.get_mut().retain(|x| x.as_ref() != session);
if others.get().is_empty() {
others.remove_entry();
}
}
}
}
Expand Down Expand Up @@ -243,7 +247,7 @@ impl Switchboard {
Some(joined) => (
self.blockers_to_miscreants.get_keys(&joined.user_id),
self.blockers_to_miscreants.get_values(&joined.user_id),
self.publishers_occupying(&joined.room_id),
self.publishers_occupying(&joined.room_ids[0]),
),
};
cohabitators.iter().filter(move |cohabitator| {
Expand Down