Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audio consumer changeProducer API implementation (PoC) #768

Draft
wants to merge 4 commits into
base: v3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/lib/Consumer.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ export declare class Consumer extends EnhancedEventEmitter {
* Enable 'trace' event.
*/
enableTraceEvent(types?: ConsumerTraceEventType[]): Promise<void>;
/**
* Replaces the producer associated with this consumer.
*/
changeProducer(producerId: string): Promise<any>;
private handleWorkerNotifications;
}
//# sourceMappingURL=Consumer.d.ts.map
2 changes: 1 addition & 1 deletion node/lib/Consumer.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions node/lib/Consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,15 @@ class Consumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
const reqData = { types };
await this.#channel.request('consumer.enableTraceEvent', this.#internal, reqData);
}
/**
* Replaces the producer associated with this consumer.
*/
async changeProducer(producerId) {
logger.debug('changeProducer()');
const data = await this.#channel.request('consumer.changeProducer', this.#internal, { producerId });
this.#internal.producerId = producerId;
return data;
}
handleWorkerNotifications() {
this.#channel.on(this.#internal.consumerId, (event, data) => {
switch (event) {
Expand Down
15 changes: 15 additions & 0 deletions node/src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,21 @@ export class Consumer extends EnhancedEventEmitter
'consumer.enableTraceEvent', this.#internal, reqData);
}

/**
* Replaces the producer associated with this consumer.
*/
async changeProducer(producerId: string): Promise<any>
{
logger.debug('changeProducer()');

const data =
await this.#channel.request('consumer.changeProducer', this.#internal, { producerId });

this.#internal.producerId = producerId;

return data;
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.consumerId, (event: string, data?: any) =>
Expand Down
1 change: 1 addition & 0 deletions worker/include/Channel/ChannelRequest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace Channel
CONSUMER_SET_PRIORITY,
CONSUMER_REQUEST_KEY_FRAME,
CONSUMER_ENABLE_TRACE_EVENT,
CONSUMER_CHANGE_PRODUCER,
DATA_PRODUCER_CLOSE,
DATA_PRODUCER_DUMP,
DATA_PRODUCER_GET_STATS,
Expand Down
13 changes: 8 additions & 5 deletions worker/include/RTC/Consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ namespace RTC
public:
virtual void OnConsumerSendRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet) = 0;
virtual void OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet) = 0;
virtual void OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc) = 0;
virtual void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) = 0;
virtual void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) = 0;
virtual void OnConsumerProducerClosed(RTC::Consumer* consumer) = 0;
virtual void OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc) = 0;
virtual void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) = 0;
virtual void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) = 0;
virtual void OnConsumerProducerClosed(RTC::Consumer* consumer) = 0;
virtual void OnConsumerChangeProducer(RTC::Consumer* consumer, std::string& producerId) = 0;
};

public:
Expand Down Expand Up @@ -117,6 +118,8 @@ namespace RTC
}
void TransportConnected();
void TransportDisconnected();
void Pause();
void Resume();
bool IsPaused() const
{
return this->paused;
Expand Down Expand Up @@ -170,7 +173,7 @@ namespace RTC
public:
// Passed by argument.
const std::string id;
const std::string producerId;
std::string producerId;

protected:
// Passed by argument.
Expand Down
2 changes: 2 additions & 0 deletions worker/include/RTC/Router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ namespace RTC
uint8_t& worstRemoteFractionLost) override;
void OnTransportNewConsumer(
RTC::Transport* transport, RTC::Consumer* consumer, std::string& producerId) override;
void OnTransportConsumerChangeProducer(
RTC::Transport* transport, RTC::Consumer* consumer, std::string& producerId) override;
void OnTransportConsumerClosed(RTC::Transport* transport, RTC::Consumer* consumer) override;
void OnTransportConsumerProducerClosed(RTC::Transport* transport, RTC::Consumer* consumer) override;
void OnTransportConsumerKeyFrameRequested(
Expand Down
3 changes: 3 additions & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ namespace RTC
RTC::Transport* transport, RTC::DataConsumer* dataConsumer) = 0;
virtual void OnTransportDataConsumerDataProducerClosed(
RTC::Transport* transport, RTC::DataConsumer* dataConsumer) = 0;
virtual void OnTransportConsumerChangeProducer(
RTC::Transport* transport, RTC::Consumer* consumer, std::string& producerId) = 0;
};

private:
Expand Down Expand Up @@ -199,6 +201,7 @@ namespace RTC
void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) override;
void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) override;
void OnConsumerProducerClosed(RTC::Consumer* consumer) override;
void OnConsumerChangeProducer(RTC::Consumer* consumer, std::string& producerId) override;

/* Pure virtual methods inherited from RTC::DataProducer::Listener. */
public:
Expand Down
1 change: 1 addition & 0 deletions worker/src/Channel/ChannelRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace Channel
{ "consumer.setPriority", ChannelRequest::MethodId::CONSUMER_SET_PRIORITY },
{ "consumer.requestKeyFrame", ChannelRequest::MethodId::CONSUMER_REQUEST_KEY_FRAME },
{ "consumer.enableTraceEvent", ChannelRequest::MethodId::CONSUMER_ENABLE_TRACE_EVENT },
{ "consumer.changeProducer", ChannelRequest::MethodId::CONSUMER_CHANGE_PRODUCER },
{ "dataProducer.close", ChannelRequest::MethodId::DATA_PRODUCER_CLOSE },
{ "dataProducer.dump", ChannelRequest::MethodId::DATA_PRODUCER_DUMP },
{ "dataProducer.getStats", ChannelRequest::MethodId::DATA_PRODUCER_GET_STATS },
Expand Down
84 changes: 70 additions & 14 deletions worker/src/RTC/Consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,7 @@ namespace RTC
return;
}

bool wasActive = IsActive();

this->paused = true;

MS_DEBUG_DEV("Consumer paused [consumerId:%s]", this->id.c_str());

if (wasActive)
UserOnPaused();
this->Pause();

request->Accept();

Expand All @@ -290,12 +283,7 @@ namespace RTC
return;
}

this->paused = false;

MS_DEBUG_DEV("Consumer resumed [consumerId:%s]", this->id.c_str());

if (IsActive())
UserOnResumed();
this->Resume();

request->Accept();

Expand Down Expand Up @@ -362,13 +350,81 @@ namespace RTC
break;
}

case Channel::ChannelRequest::MethodId::CONSUMER_CHANGE_PRODUCER:
{
auto jsonProducerIdIt = request->data.find("producerId");

if (jsonProducerIdIt == request->data.end() || !jsonProducerIdIt->is_string())
MS_THROW_ERROR("missing data.producerId");

std::string producerId = jsonProducerIdIt->get<std::string>();

// Notify the listener.
// This may throw if no Producer is found.
try
{
this->listener->OnConsumerChangeProducer(this, producerId);
}
catch (const MediaSoupError& error)
{
throw;
}

MS_DEBUG_DEV(
"Consumer Producer changed [consumerId:%s producerId:%s]",
this->id.c_str(),
producerId.c_str());

json data = json::object();

request->Accept();

break;
}

default:
{
MS_THROW_ERROR("unknown method '%s'", request->method.c_str());
}
}
}

void Consumer::Pause()
{
MS_TRACE();

if (this->paused)
{
return;
}

bool wasActive = IsActive();

this->paused = true;

MS_DEBUG_DEV("Consumer paused [consumerId:%s]", this->id.c_str());

if (wasActive)
UserOnPaused();
}

void Consumer::Resume()
{
MS_TRACE();

if (!this->paused)
{
return;
}

this->paused = false;

MS_DEBUG_DEV("Consumer resumed [consumerId:%s]", this->id.c_str());

if (IsActive())
UserOnResumed();
}

void Consumer::TransportConnected()
{
MS_TRACE();
Expand Down
90 changes: 90 additions & 0 deletions worker/src/RTC/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,96 @@ namespace RTC
consumer->ProducerRtpStreamScores(producer->GetRtpStreamScores());
}

inline void Router::OnTransportConsumerChangeProducer(
RTC::Transport* /*transport*/, RTC::Consumer* consumer, std::string& producerId)
{
MS_TRACE();

MS_ASSERT(
consumer->GetType() == RTC::RtpParameters::Type::SIMPLE,
"Invalid consumer type [consumerId:%s type:%s]",
consumer->id.c_str(),
RTC::RtpParameters::GetTypeString(consumer->GetType()).c_str())

consumer->Pause();

// Remove current producer.
auto mapCurrentProducersIt = this->mapProducers.find(consumer->producerId);
if (mapCurrentProducersIt == this->mapProducers.end())
MS_THROW_ERROR("Current Producer not found [producerId:%s]", consumer->producerId.c_str());

auto* currentProducer = mapCurrentProducersIt->second;
auto mapCurrentProducerConsumersIt = this->mapProducerConsumers.find(currentProducer);

MS_ASSERT(
mapCurrentProducerConsumersIt != this->mapProducerConsumers.end(),
"Current Producer not present in mapProducerConsumers [producerId: %s]",
consumer->producerId.c_str());

// Remove the Consumer from the current consumers map.
auto& currentConsumers = mapCurrentProducerConsumersIt->second;
auto currentConsumersConsumerIt = currentConsumers.find(consumer);

if (currentConsumersConsumerIt != currentConsumers.end())
{
currentConsumers.erase(currentConsumersConsumerIt);
}
else
{
MS_WARN_TAG(
rtp, "Consumer not present in current consumers list [consumerId:%s]", consumer->id.c_str());
}

// Add new producer
auto mapProducersIt = this->mapProducers.find(producerId);
if (mapProducersIt == this->mapProducers.end())
MS_THROW_ERROR("Producer not found [producerId:%s]", producerId.c_str());

auto* producer = mapProducersIt->second;
auto mapProducerConsumersIt = this->mapProducerConsumers.find(producer);

MS_ASSERT(
mapProducerConsumersIt != this->mapProducerConsumers.end(),
"Producer not present in mapProducerConsumers [producerId:%s]",
producerId.c_str());

consumer->producerId = producerId;

// Update the Consumer status based on the Producer status.
if (producer->IsPaused())
consumer->ProducerPaused();

// Insert the Consumer in the maps.
auto& consumers = mapProducerConsumersIt->second;

auto consumersConsumerIt = consumers.find(consumer);
if (consumersConsumerIt == consumers.end())
{
consumers.insert(consumer);
}
else
{
MS_WARN_TAG(
rtp, "consumer already present in consumers list [consumerId:%s]", consumer->id.c_str());
}

this->mapConsumerProducer[consumer] = producer;

// Get all streams in the Producer and provide the Consumer with them.
for (const auto& kv : producer->GetRtpStreams())
{
auto* rtpStream = kv.first;
uint32_t mappedSsrc = kv.second;

consumer->ProducerRtpStream(rtpStream, mappedSsrc);
}

// Provide the Consumer with the scores of all streams in the Producer.
consumer->ProducerRtpStreamScores(producer->GetRtpStreamScores());

consumer->Resume();
}

inline void Router::OnTransportConsumerClosed(RTC::Transport* /*transport*/, RTC::Consumer* consumer)
{
MS_TRACE();
Expand Down
8 changes: 8 additions & 0 deletions worker/src/RTC/Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,7 @@ namespace RTC
case Channel::ChannelRequest::MethodId::CONSUMER_SET_PRIORITY:
case Channel::ChannelRequest::MethodId::CONSUMER_REQUEST_KEY_FRAME:
case Channel::ChannelRequest::MethodId::CONSUMER_ENABLE_TRACE_EVENT:
case Channel::ChannelRequest::MethodId::CONSUMER_CHANGE_PRODUCER:
{
// This may throw.
RTC::Consumer* consumer = GetConsumerFromInternal(request->internal);
Expand Down Expand Up @@ -2739,6 +2740,13 @@ namespace RTC
ComputeOutgoingDesiredBitrate(/*forceBitrate*/ true);
}

inline void Transport::OnConsumerChangeProducer(RTC::Consumer* consumer, std::string& producerId)
{
MS_TRACE();

this->listener->OnTransportConsumerChangeProducer(this, consumer, producerId);
}

inline void Transport::OnConsumerProducerClosed(RTC::Consumer* consumer)
{
MS_TRACE();
Expand Down