Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inbound media processors #185

Merged
merged 10 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/jfreymuth/oggvorbis v1.0.5
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49
github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241003085414-b42e5a1da639
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49 h1:mk33tsjwZM8czksJbAj+xQfPfjnPo/RcGUYJLLfltOY=
github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264 h1:jj0lLMRFhk1Y7X1Ugi8wd47wNtgIoju36qic6mSjGPE=
github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241003085414-b42e5a1da639 h1:5+iT4OaIukZ4TJwbOXAN+uh/wuAGArUoJyk5vmfQMY0=
Expand Down
19 changes: 19 additions & 0 deletions pkg/media/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package media

type Processor[T any] func(w WriteCloser[T]) WriteCloser[T]

type PCM16Processor = Processor[PCM16Sample]
2 changes: 2 additions & 0 deletions pkg/service/psrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger.
DispatchRuleID: resp.SipDispatchRuleId,
Headers: resp.Headers,
HeadersToAttributes: resp.HeadersToAttributes,
EnabledFeatures: resp.EnabledFeatures,
}
case rpc.SIPDispatchResult_ACCEPT:
return sip.CallDispatch{
Expand All @@ -121,6 +122,7 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger.
DispatchRuleID: resp.SipDispatchRuleId,
Headers: resp.Headers,
HeadersToAttributes: resp.HeadersToAttributes,
EnabledFeatures: resp.EnabledFeatures,
}
case rpc.SIPDispatchResult_REQUEST_PIN:
return sip.CallDispatch{
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/livekit/sip/pkg/media"

"github.com/livekit/sip/pkg/stats"

Expand Down Expand Up @@ -176,6 +177,10 @@ func (s *Service) DispatchCall(ctx context.Context, info *sip.CallInfo) sip.Call
return DispatchCall(ctx, s.psrpcClient, s.log, info)
}

func (s *Service) GetMediaProcessor(_ []rpc.SIPFeature) media.PCM16Processor {
return nil
}

func (s *Service) CanAccept() bool {
return s.mon.CanAccept()
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (

"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
lksip "github.com/livekit/protocol/sip"
"github.com/livekit/protocol/tracer"
"github.com/livekit/psrpc"
Expand Down Expand Up @@ -211,7 +213,7 @@ func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) {
if c != nil {
c.log.Infow("BYE")
c.cc.AcceptBye(req, tx)
c.Close()
_ = c.Close()
return
}
ok := false
Expand Down Expand Up @@ -272,7 +274,14 @@ type inboundCall struct {
done atomic.Bool
}

func (s *Server) newInboundCall(log logger.Logger, mon *stats.CallMonitor, cc *sipInbound, src string, extra map[string]string) *inboundCall {
func (s *Server) newInboundCall(
log logger.Logger,
mon *stats.CallMonitor,
cc *sipInbound,
src string,
extra map[string]string,
) *inboundCall {

extra = HeadersToAttrs(extra, nil, cc)
c := &inboundCall{
s: s,
Expand Down Expand Up @@ -344,7 +353,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}

// We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF.
answerData, err := c.runMediaConn(req.Body(), conf)
answerData, err := c.runMediaConn(req.Body(), conf, disp.EnabledFeatures)
if err != nil {
c.log.Errorw("Cannot start media", err)
c.cc.RespondAndDrop(sip.StatusInternalServerError, "")
Expand Down Expand Up @@ -419,7 +428,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
}

func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answerData []byte, _ error) {
func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config, features []rpc.SIPFeature) (answerData []byte, _ error) {
c.mon.SDPSize(len(offerData), true)
c.log.Debugw("SDP offer", "sdp", string(offerData))

Expand All @@ -443,6 +452,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe
c.mon.SDPSize(len(answerData), false)
c.log.Debugw("SDP answer", "sdp", string(answerData))

mconf.Processor = c.s.handler.GetMediaProcessor(features)
if err = c.media.SetConfig(mconf); err != nil {
return nil, err
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"sync/atomic"
"time"

"github.com/pion/sdp/v3"

"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/logger"
"github.com/pion/sdp/v3"

"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/media/dtmf"
Expand Down Expand Up @@ -82,9 +83,9 @@ type MediaPort struct {
dtmfOutAudio media.PCM16Writer

audioOutRTP *rtp.Stream
audioOut *media.SwitchWriter
audioIn *media.SwitchWriter
audioInHandler rtp.Handler // for debug only
audioOut *media.SwitchWriter // SIP PCM -> LK RTP
audioIn *media.SwitchWriter // LK RTP -> SIP PCM
audioInHandler rtp.Handler // for debug only
dtmfIn atomic.Pointer[func(ev dtmf.Event)]
}

Expand Down Expand Up @@ -208,8 +209,12 @@ func (p *MediaPort) setupOutput() {
// TODO: this says "audio", but actually includes DTMF too
s := rtp.NewSeqWriter(newRTPStatsWriter(p.mon, "audio", p.conn))
p.audioOutRTP = s.NewStream(p.conf.AudioType, p.conf.Audio.Info().RTPClockRate)

// Encoding pipeline (LK -> SIP)
audioOut := p.conf.Audio.EncodeRTP(p.audioOutRTP)
if processor := p.conf.Processor; processor != nil {
audioOut = processor(audioOut)
}

if p.conf.DTMFType != 0 {
p.dtmfOutRTP = s.NewStream(p.conf.DTMFType, dtmf.SampleRate)
Expand All @@ -221,8 +226,9 @@ func (p *MediaPort) setupOutput() {
p.dtmfOutAudio = mix.NewInput()
}
}

if w := p.audioOut.Swap(audioOut); w != nil {
w.Close()
_ = w.Close()
}
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (
"github.com/emiago/sipgo/sip"
"github.com/frostbyte73/core"
"github.com/icholy/digest"
"github.com/livekit/protocol/logger"
"golang.org/x/exp/maps"

"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/sip/pkg/media"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/stats"
)
Expand Down Expand Up @@ -88,11 +91,13 @@ type CallDispatch struct {
DispatchRuleID string
Headers map[string]string
HeadersToAttributes map[string]string
EnabledFeatures []rpc.SIPFeature
}

type Handler interface {
GetAuthCredentials(ctx context.Context, callID, fromUser, toUser, toHost, srcAddress string) (AuthInfo, error)
DispatchCall(ctx context.Context, info *CallInfo) CallDispatch
GetMediaProcessor(features []rpc.SIPFeature) media.PCM16Processor

RegisterTransferSIPParticipantTopic(sipCallId string) error
DeregisterTransferSIPParticipantTopic(sipCallId string)
Expand Down Expand Up @@ -263,16 +268,16 @@ func (s *Server) Stop() {
s.activeCalls = make(map[RemoteTag]*inboundCall)
s.cmu.Unlock()
for _, c := range calls {
c.Close()
_ = c.Close()
}
if s.sipSrv != nil {
s.sipSrv.Close()
_ = s.sipSrv.Close()
}
if s.sipConnUDP != nil {
s.sipConnUDP.Close()
_ = s.sipConnUDP.Close()
}
if s.sipConnTCP != nil {
s.sipConnTCP.Close()
_ = s.sipConnTCP.Close()
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sip/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/sip/pkg/media"

"github.com/livekit/sip/pkg/stats"

Expand Down Expand Up @@ -63,6 +65,10 @@ func (h TestHandler) DispatchCall(ctx context.Context, info *CallInfo) CallDispa
return h.DispatchCallFunc(ctx, info)
}

func (h TestHandler) GetMediaProcessor(_ []rpc.SIPFeature) media.PCM16Processor {
return nil
}

func (h TestHandler) RegisterTransferSIPParticipantTopic(sipCallId string) error {
// no-op
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/sip/signaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type MediaConf struct {
Audio rtp.AudioCodec
AudioType byte
DTMFType byte
Processor media.PCM16Processor
}

func sdpGetAudioCodec(offer *sdp.SessionDescription) (*MediaConf, error) {
Expand Down
Loading