From 5800764b5cd2698bf9999fa94573b3436b2b3893 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Thu, 23 Nov 2023 14:22:50 +0200 Subject: [PATCH] Initial implementation of outbound calls. --- cmd/server/main.go | 11 +- go.mod | 4 +- go.sum | 8 +- pkg/service/service.go | 13 +- pkg/sip/client.go | 130 ++++++++++++++++ pkg/sip/inbound.go | 79 +++++++++- pkg/sip/media.go | 100 ------------ pkg/sip/media_dtmf.go | 22 +++ pkg/sip/media_file.go | 13 ++ pkg/sip/media_sip.go | 15 +- pkg/sip/outbound.go | 345 +++++++++++++++++++++++++++++++++++++++++ pkg/sip/room.go | 8 + pkg/sip/signaling.go | 87 ++++++++--- 13 files changed, 697 insertions(+), 138 deletions(-) create mode 100644 pkg/sip/client.go create mode 100644 pkg/sip/media_dtmf.go create mode 100644 pkg/sip/outbound.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 1ed04bc2..bc462d0a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -27,6 +27,7 @@ import ( "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" + "github.com/livekit/sip/pkg/config" "github.com/livekit/sip/pkg/errors" "github.com/livekit/sip/pkg/service" @@ -83,7 +84,12 @@ func runService(c *cli.Context) error { killChan := make(chan os.Signal, 1) signal.Notify(killChan, syscall.SIGINT) - svc := service.NewService(conf, psrpcClient, bus) + sipCli := sip.NewClient(conf) + if err = sipCli.Start(); err != nil { + return err + } + + svc := service.NewService(conf, sipCli, psrpcClient, bus) sipSrv := sip.NewServer(conf, svc.HandleTrunkAuthentication, svc.HandleDispatchRules) if err = sipSrv.Start(); err != nil { @@ -99,6 +105,9 @@ func runService(c *cli.Context) error { case sig := <-killChan: logger.Infow("exit requested, stopping all SIP and shutting down", "signal", sig) svc.Stop(true) + if err = sipCli.Stop(); err != nil { + log.Println(err) + } if err = sipSrv.Stop(); err != nil { log.Println(err) } diff --git a/go.mod b/go.mod index a54825b6..0e469b80 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/icholy/digest v0.1.22 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 - github.com/livekit/psrpc v0.5.1 + github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd + github.com/livekit/psrpc v0.5.2 github.com/livekit/server-sdk-go v1.1.1 github.com/pion/interceptor v0.1.25 github.com/pion/rtp v1.8.3 diff --git a/go.sum b/go.sum index 8941650a..2f50d8b2 100644 --- a/go.sum +++ b/go.sum @@ -90,10 +90,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4= -github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M= -github.com/livekit/psrpc v0.5.1 h1:ihN5uKIvbU69UsFS4HdYmou5GuK0Dt4hix4eOmRS7o8= -github.com/livekit/psrpc v0.5.1/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= +github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd h1:DlyhqC/Ge176SQFNCP/VgSabh3Gwf0cjEKqooznbO1E= +github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= +github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= +github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY= github.com/livekit/server-sdk-go v1.1.1/go.mod h1:724BcsVjpsQu8zK9VX2TfdEWt+DtsBeT3EnMrDbyT3I= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/service/service.go b/pkg/service/service.go index bbc99a96..094fee5b 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -30,20 +30,21 @@ import ( type Service struct { conf *config.Config + psrpcServer rpc.SIPInternalServerImpl psrpcClient rpc.IOInfoClient bus psrpc.MessageBus shutdown core.Fuse } -func NewService(conf *config.Config, psrpcClient rpc.IOInfoClient, bus psrpc.MessageBus) *Service { +func NewService(conf *config.Config, srv rpc.SIPInternalServerImpl, cli rpc.IOInfoClient, bus psrpc.MessageBus) *Service { s := &Service{ conf: conf, - psrpcClient: psrpcClient, + psrpcServer: srv, + psrpcClient: cli, bus: bus, shutdown: core.NewFuse(), } - return s } @@ -54,6 +55,12 @@ func (s *Service) Stop(kill bool) { func (s *Service) Run() error { logger.Debugw("starting service", "version", version.Version) + srv, err := rpc.NewSIPInternalServer(s.psrpcServer, s.bus) + if err != nil { + return err + } + defer srv.Shutdown() + logger.Debugw("service ready") for { //nolint: gosimple diff --git a/pkg/sip/client.go b/pkg/sip/client.go new file mode 100644 index 00000000..a8389e4c --- /dev/null +++ b/pkg/sip/client.go @@ -0,0 +1,130 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "context" + "fmt" + "sync" + + "github.com/emiago/sipgo" + "github.com/livekit/protocol/rpc" + "golang.org/x/exp/maps" + + "github.com/livekit/sip/pkg/config" +) + +type Client struct { + conf *config.Config + + sipCli *sipgo.Client + publicIp string + + cmu sync.RWMutex + activeCalls map[string]*outboundCall +} + +func NewClient(conf *config.Config) *Client { + c := &Client{ + conf: conf, + publicIp: getPublicIP(), + activeCalls: make(map[string]*outboundCall), + } + return c +} + +func (c *Client) Start() error { + ua, err := sipgo.NewUA( + sipgo.WithUserAgent(userAgent), + ) + if err != nil { + return err + } + + c.sipCli, err = sipgo.NewClient(ua, sipgo.WithClientHostname(c.publicIp)) + if err != nil { + return err + } + // FIXME: read existing SIP participants from the store? + return nil +} + +func (c *Client) Stop() error { + c.cmu.Lock() + calls := maps.Values(c.activeCalls) + c.activeCalls = make(map[string]*outboundCall) + c.cmu.Unlock() + for _, call := range calls { + call.Close() + } + if c.sipCli != nil { + c.sipCli.Close() + c.sipCli = nil + } + // FIXME: anything else? + return nil +} + +func (c *Client) UpdateSIPParticipant(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) (*rpc.InternalUpdateSIPParticipantResponse, error) { + if req.CallTo == "" { + // Disconnect participant + if call := c.getCall(req.ParticipantId); call != nil { + call.Close() + } + return &rpc.InternalUpdateSIPParticipantResponse{}, nil + } + err := c.getOrCreateCall(req.ParticipantId).Update(ctx, sipOutboundConfig{ + address: req.Address, + from: req.Number, + to: req.CallTo, + user: req.Username, + pass: req.Password, + }, lkRoomConfig{ + roomName: req.RoomName, + identity: req.ParticipantIdentity, + }) + if err != nil { + return nil, err + } + return &rpc.InternalUpdateSIPParticipantResponse{}, nil +} + +func (c *Client) UpdateSIPParticipantAffinity(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) float32 { + call := c.getCall(req.ParticipantId) + if call != nil { + return 1 // Existing participant + } + // TODO: scale affinity based on a number or active calls? + return 0.5 +} + +func (c *Client) SendSIPParticipantDTMF(ctx context.Context, req *rpc.InternalSendSIPParticipantDTMFRequest) (*rpc.InternalSendSIPParticipantDTMFResponse, error) { + call := c.getCall(req.ParticipantId) + if call == nil { + return nil, fmt.Errorf("Cannot send DTMF: participant not connected.") + } + if err := call.SendDTMF(ctx, req.Digits); err != nil { + return nil, err + } + return &rpc.InternalSendSIPParticipantDTMFResponse{}, nil +} + +func (c *Client) SendSIPParticipantDTMFAffinity(ctx context.Context, req *rpc.InternalSendSIPParticipantDTMFRequest) float32 { + call := c.getCall(req.ParticipantId) + if call != nil { + return 1 // Only existing participants + } + return 0 +} diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 4d043159..931cde58 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -24,7 +24,9 @@ import ( "github.com/icholy/digest" "github.com/pion/sdp/v2" + "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/media/ulaw" ) func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { @@ -193,7 +195,7 @@ func (c *inboundCall) runMediaConn(offerData []byte) (answerData []byte, _ error if err := offer.Unmarshal(offerData); err != nil { return nil, err } - return generateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port) + return sdpGenerateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port) } func (c *inboundCall) pinPrompt() { @@ -248,3 +250,78 @@ func (c *inboundCall) Close() error { // FIXME: drop the actual call return nil } + +func (c *inboundCall) closeMedia() { + c.audioHandler.Store(nil) + if p := c.lkRoom.Load(); p != nil { + p.Close() + c.lkRoom.Store(nil) + } + c.rtpConn.Close() + close(c.dtmf) +} + +func (c *inboundCall) HandleRTP(p *rtp.Packet) error { + if p.Marker && p.PayloadType == 101 { + c.handleDTMF(p.Payload) + return nil + } + // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? + if h := c.audioHandler.Load(); h != nil { + return (*h).HandleRTP(p) + } + return nil +} + +func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error { + room, err := ConnectToRoom(c.s.conf, roomName, participantIdentity) + if err != nil { + return err + } + local, err := room.NewParticipant() + if err != nil { + _ = room.Close() + return err + } + c.lkRoom.Store(room) + + // Decoding pipeline (SIP -> LK) + lpcm := media.DecodePCM(local) + law := ulaw.Encode(lpcm) + var h rtp.Handler = rtp.NewMediaStreamIn(law) + c.audioHandler.Store(&h) + + // Encoding pipeline (LK -> SIP) + s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur) + room.SetOutput(ulaw.Decode(s)) + + return nil +} + +func (c *inboundCall) joinRoom(roomName, identity string) { + log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity) + c.playAudio(c.s.res.roomJoin) + if err := c.createLiveKitParticipant(roomName, identity); err != nil { + log.Println(err) + } +} + +func (c *inboundCall) playAudio(frames []media.PCM16Sample) { + r := c.lkRoom.Load() + t := r.NewTrack() + defer t.Close() + t.PlayAudio(frames) +} + +func (c *inboundCall) handleDTMF(data []byte) { // RFC2833 + if len(data) < 4 { + return + } + ev := data[0] + b := dtmfEventToChar[ev] + // We should have enough buffer here. + select { + case c.dtmf <- b: + default: + } +} diff --git a/pkg/sip/media.go b/pkg/sip/media.go index a0271733..0de72437 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -15,13 +15,7 @@ package sip import ( - "log" "time" - - "github.com/livekit/sip/pkg/media" - "github.com/livekit/sip/pkg/media/rtp" - "github.com/livekit/sip/pkg/media/ulaw" - "github.com/livekit/sip/res" ) const ( @@ -31,97 +25,3 @@ const ( sampleDurPart = int(time.Second / sampleDur) rtpPacketDur = uint32(sampleRate / sampleDurPart) ) - -type mediaRes struct { - enterPin []media.PCM16Sample - roomJoin []media.PCM16Sample - wrongPin []media.PCM16Sample -} - -func (s *Server) initMediaRes() { - s.res.enterPin = readMkvAudioFile(res.EnterPinMkv) - s.res.roomJoin = readMkvAudioFile(res.RoomJoinMkv) - s.res.wrongPin = readMkvAudioFile(res.WrongPinMkv) -} - -func (c *inboundCall) closeMedia() { - c.audioHandler.Store(nil) - if p := c.lkRoom.Load(); p != nil { - p.Close() - c.lkRoom.Store(nil) - } - c.rtpConn.Close() - close(c.dtmf) -} - -func (c *inboundCall) HandleRTP(p *rtp.Packet) error { - if p.Marker && p.PayloadType == 101 { - c.handleDTMF(p.Payload) - return nil - } - // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? - if h := c.audioHandler.Load(); h != nil { - return (*h).HandleRTP(p) - } - return nil -} - -var dtmfEventToChar = [256]byte{ - 0: '0', 1: '1', 2: '2', 3: '3', 4: '4', - 5: '5', 6: '6', 7: '7', 8: '8', 9: '9', - 10: '*', 11: '#', - 12: 'a', 13: 'b', 14: 'c', 15: 'd', -} - -func (c *inboundCall) handleDTMF(data []byte) { // RFC2833 - if len(data) < 4 { - return - } - ev := data[0] - b := dtmfEventToChar[ev] - // We should have enough buffer here. - select { - case c.dtmf <- b: - default: - } -} - -func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error { - room, err := ConnectToRoom(c.s.conf, roomName, participantIdentity) - if err != nil { - return err - } - local, err := room.NewParticipant() - if err != nil { - _ = room.Close() - return err - } - c.lkRoom.Store(room) - - // Decoding pipeline (SIP -> LK) - lpcm := media.DecodePCM(local) - law := ulaw.Encode(lpcm) - var h rtp.Handler = rtp.NewMediaStreamIn(law) - c.audioHandler.Store(&h) - - // Encoding pipeline (LK -> SIP) - s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur) - room.SetOutput(ulaw.Decode(s)) - - return nil -} - -func (c *inboundCall) joinRoom(roomName, identity string) { - log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity) - c.playAudio(c.s.res.roomJoin) - if err := c.createLiveKitParticipant(roomName, identity); err != nil { - log.Println(err) - } -} - -func (c *inboundCall) playAudio(frames []media.PCM16Sample) { - r := c.lkRoom.Load() - t := r.NewTrack() - defer t.Close() - t.PlayAudio(frames) -} diff --git a/pkg/sip/media_dtmf.go b/pkg/sip/media_dtmf.go new file mode 100644 index 00000000..134b617e --- /dev/null +++ b/pkg/sip/media_dtmf.go @@ -0,0 +1,22 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +var dtmfEventToChar = [256]byte{ + 0: '0', 1: '1', 2: '2', 3: '3', 4: '4', + 5: '5', 6: '6', 7: '7', 8: '8', 9: '9', + 10: '*', 11: '#', + 12: 'a', 13: 'b', 14: 'c', 15: 'd', +} diff --git a/pkg/sip/media_file.go b/pkg/sip/media_file.go index 55a2edfc..942bea23 100644 --- a/pkg/sip/media_file.go +++ b/pkg/sip/media_file.go @@ -22,8 +22,21 @@ import ( "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/media/ulaw" + "github.com/livekit/sip/res" ) +type mediaRes struct { + enterPin []media.PCM16Sample + roomJoin []media.PCM16Sample + wrongPin []media.PCM16Sample +} + +func (s *Server) initMediaRes() { + s.res.enterPin = readMkvAudioFile(res.EnterPinMkv) + s.res.roomJoin = readMkvAudioFile(res.RoomJoinMkv) + s.res.wrongPin = readMkvAudioFile(res.WrongPinMkv) +} + func readMkvAudioFile(data []byte) []media.PCM16Sample { var ret struct { Header webm.EBMLHeader `ebml:"EBML"` diff --git a/pkg/sip/media_sip.go b/pkg/sip/media_sip.go index f5d31197..2ecfffe1 100644 --- a/pkg/sip/media_sip.go +++ b/pkg/sip/media_sip.go @@ -33,7 +33,7 @@ type MediaConn struct { conn *net.UDPConn dest atomic.Pointer[net.UDPAddr] - onRTP rtp.Handler + onRTP atomic.Pointer[rtp.Handler] } func (c *MediaConn) LocalAddr() *net.UDPAddr { @@ -49,7 +49,14 @@ func (c *MediaConn) SetDestAddr(addr *net.UDPAddr) { } func (c *MediaConn) OnRTP(h rtp.Handler) { - c.onRTP = h + if c == nil { + return + } + if h == nil { + c.onRTP.Store(nil) + } else { + c.onRTP.Store(&h) + } } func (c *MediaConn) Close() error { @@ -90,8 +97,8 @@ func (c *MediaConn) readLoop() { if err := p.Unmarshal(buf[:n]); err != nil { continue } - if c.onRTP != nil { - _ = c.onRTP.HandleRTP(&p) + if h := c.onRTP.Load(); h != nil { + _ = (*h).HandleRTP(&p) } } } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go new file mode 100644 index 00000000..6ecc1b7c --- /dev/null +++ b/pkg/sip/outbound.go @@ -0,0 +1,345 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "context" + "fmt" + "sync" + + "github.com/emiago/sipgo/sip" + "github.com/icholy/digest" + "github.com/livekit/protocol/logger" + + "github.com/livekit/sip/pkg/media" + "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/media/ulaw" +) + +type sipOutboundConfig struct { + address string + from string + to string + user string + pass string +} + +type outboundCall struct { + c *Client + participantID string + rtpConn *MediaConn + + mu sync.RWMutex + mediaRunning bool + lkCur lkRoomConfig + lkRoom *Room + lkRoomIn media.Writer[media.PCM16Sample] + sipCur sipOutboundConfig + sipInviteReq *sip.Request + sipInviteResp *sip.Response + sipRunning bool +} + +func (c *Client) getCall(participantId string) *outboundCall { + c.cmu.RLock() + defer c.cmu.RUnlock() + return c.activeCalls[participantId] +} + +func (c *Client) getOrCreateCall(participantId string) *outboundCall { + // Fast path + if call := c.getCall(participantId); call != nil { + return call + } + // Slow path + c.cmu.Lock() + defer c.cmu.Unlock() + if call := c.activeCalls[participantId]; call != nil { + return call + } + call := c.newCall(participantId) + c.activeCalls[participantId] = call + return call +} + +func (c *Client) newCall(participantId string) *outboundCall { + call := &outboundCall{ + c: c, + participantID: participantId, + rtpConn: NewMediaConn(), + } + return call +} + +func (c *outboundCall) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + c.close() + return nil +} + +func (c *outboundCall) close() { + c.rtpConn.OnRTP(nil) + c.lkRoom.SetOutput(nil) + + if c.mediaRunning { + _ = c.rtpConn.Close() + } + c.mediaRunning = false + + if c.lkRoom != nil { + _ = c.lkRoom.Close() + } + c.lkRoom = nil + c.lkRoomIn = nil + c.lkCur = lkRoomConfig{} + + c.stopSIP() + c.sipCur = sipOutboundConfig{} + + // FIXME: remove call from the client map? +} + +func (c *outboundCall) Update(ctx context.Context, sipNew sipOutboundConfig, lkNew lkRoomConfig) error { + c.mu.RLock() + sipCur, lkCur := c.sipCur, c.lkCur + c.mu.RUnlock() + if sipCur == sipNew && lkCur == lkNew { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + if c.sipCur == sipNew && c.lkCur == lkNew { + return nil + } + if c.sipCur.address == "" || c.sipCur.to == "" { + // shutdown the call + c.close() + return nil + } + if err := c.startMedia(); err != nil { + c.close() + return err + } + if err := c.updateRoom(lkNew); err != nil { + c.close() + return err + } + if err := c.updateSIP(sipNew); err != nil { + c.close() + return err + } + c.relinkMedia() + return nil +} + +func (c *outboundCall) startMedia() error { + if c.mediaRunning { + return nil + } + if err := c.rtpConn.Start("0.0.0.0"); err != nil { + return err + } + c.mediaRunning = true + return nil +} + +func (c *outboundCall) updateRoom(lkNew lkRoomConfig) error { + if c.lkRoom != nil && c.lkCur == lkNew { + return nil + } + if c.lkRoom != nil { + _ = c.lkRoom.Close() + c.lkRoom = nil + c.lkRoomIn = nil + } + r, err := ConnectToRoom(c.c.conf, lkNew.roomName, lkNew.identity) + if err != nil { + return err + } + local, err := r.NewParticipant() + if err != nil { + _ = r.Close() + return err + } + c.lkRoom = r + c.lkRoomIn = local + c.lkCur = lkNew + return nil +} + +func (c *outboundCall) updateSIP(sipNew sipOutboundConfig) error { + if c.sipCur == sipNew { + return nil + } + c.stopSIP() + if err := c.sipSignal(sipNew); err != nil { + return err + } + c.sipRunning = true + c.sipCur = sipNew + return nil +} + +func (c *outboundCall) relinkMedia() { + if c.lkRoom == nil || !c.mediaRunning { + c.lkRoom.SetOutput(nil) + c.rtpConn.OnRTP(nil) + return + } + // Encoding pipeline (LK -> SIP) + s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur) + c.lkRoom.SetOutput(ulaw.Decode(s)) + + // Decoding pipeline (SIP -> LK) + lpcm := media.DecodePCM(c.lkRoomIn) + law := ulaw.Encode(lpcm) + c.rtpConn.OnRTP(rtp.NewMediaStreamIn(law)) +} + +func (c *outboundCall) SendDTMF(ctx context.Context, digits string) error { + c.mu.RLock() + running := c.mediaRunning + c.mu.RUnlock() + if !running { + return fmt.Errorf("call is not active") + } + // FIXME: c.media.WriteRTP() + return nil +} + +func sipResponse(tx sip.ClientTransaction) (*sip.Response, error) { + select { + case <-tx.Done(): + return nil, fmt.Errorf("transaction failed to complete") + case res := <-tx.Responses(): + if res.StatusCode == 100 || res.StatusCode == 180 || res.StatusCode == 183 { + return sipResponse(tx) + } + return res, nil + } +} + +func (c *outboundCall) stopSIP() { + if c.sipInviteReq != nil { + c.sipBye() + } + c.sipInviteReq = nil + c.sipInviteResp = nil + c.sipCur = sipOutboundConfig{} + c.sipRunning = false +} + +func (c *outboundCall) sipSignal(conf sipOutboundConfig) error { + offer, err := sdpGenerateOffer(c.c.publicIp, c.rtpConn.LocalAddr().Port) + if err != nil { + return err + } + inviteReq, inviteResp, err := c.sipInvite(offer, conf) + if err != nil { + logger.Errorw("SIP invite failed", err) + return err // TODO: should we retry? maybe new offer will work + } + err = c.sipAccept(inviteReq, inviteResp) + if err != nil { + logger.Errorw("SIP accept failed", err) + return err + } + c.sipInviteReq, c.sipInviteResp = inviteReq, inviteResp + return nil +} + +func (c *outboundCall) sipAttemptInvite(offer []byte, conf sipOutboundConfig, authHeader string) (*sip.Request, *sip.Response, error) { + req := sip.NewRequest(sip.INVITE, &sip.Uri{User: conf.to, Host: conf.address}) + req.SetDestination("") // FIXME: what should be here + req.SetBody(offer) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Contact", fmt.Sprintf("", c.c.publicIp))) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + + if authHeader != "" { + req.AppendHeader(sip.NewHeader("Proxy-Authorization", authHeader)) + } + + tx, err := c.c.sipCli.TransactionRequest(req) + if err != nil { + return nil, nil, err + } + defer tx.Terminate() + + resp, err := sipResponse(tx) + return req, resp, err +} + +func (c *outboundCall) sipInvite(offer []byte, conf sipOutboundConfig) (*sip.Request, *sip.Response, error) { + authHeader := "" + for { + req, resp, err := c.sipAttemptInvite(offer, conf, authHeader) + if err != nil { + return nil, nil, err + } else if resp.StatusCode == 200 { + return req, resp, nil + } else if resp.StatusCode != 407 { + return nil, nil, fmt.Errorf("Unexpected StatusCode from INVITE response %d", resp.StatusCode) + } + if conf.user == "" || conf.pass == "" { + return nil, nil, fmt.Errorf("Server responded with 407, but no username or password was provided") + } + + headerVal := resp.GetHeader("Proxy-Authenticate") + challenge, err := digest.ParseChallenge(headerVal.Value()) + if err != nil { + return nil, nil, err + } + + toHeader, ok := resp.To() + if !ok { + return nil, nil, fmt.Errorf("No To Header on Request") + } + + cred, err := digest.Digest(challenge, digest.Options{ + Method: req.Method.String(), + URI: toHeader.Address.String(), + Username: conf.user, + Password: conf.pass, + }) + if err != nil { + return nil, nil, err + } + authHeader = cred.String() + // Try again with a computed digest + } +} + +func (c *outboundCall) sipAccept(inviteReq *sip.Request, inviteResp *sip.Response) error { + if cont, ok := inviteResp.Contact(); ok { + inviteReq.Recipient = &cont.Address + inviteReq.Recipient.Port = 5060 + } + return c.c.sipCli.WriteRequest(sip.NewAckRequest(inviteReq, inviteResp, nil)) +} + +func (c *outboundCall) sipBye() error { + req := sip.NewByeRequest(c.sipInviteReq, c.sipInviteResp, nil) + c.sipInviteReq.AppendHeader(sip.NewHeader("User-Agent", "LiveKit")) + + tx, err := c.c.sipCli.TransactionRequest(req) + if err != nil { + return err + } + _, err = sipResponse(tx) + return err +} diff --git a/pkg/sip/room.go b/pkg/sip/room.go index a5625045..3cb47d3b 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -35,6 +35,11 @@ type Room struct { identity string } +type lkRoomConfig struct { + roomName string + identity string +} + func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room, error) { r := &Room{ identity: identity, @@ -92,6 +97,9 @@ func (r *Room) Output() media.Writer[media.LPCM16Sample] { } func (r *Room) SetOutput(out media.Writer[media.LPCM16Sample]) { + if r == nil { + return + } if out == nil { r.out.Store(nil) } else { diff --git a/pkg/sip/signaling.go b/pkg/sip/signaling.go index c6ef3483..30fd81d0 100644 --- a/pkg/sip/signaling.go +++ b/pkg/sip/signaling.go @@ -14,16 +14,45 @@ package sip -import "github.com/pion/sdp/v2" +import ( + "math/rand" -func generateAnswer(offer sdp.SessionDescription, publicIp string, rtpListenerPort int) ([]byte, error) { + "github.com/pion/sdp/v2" +) + +func sdpMediaDesc(rtpListenerPort int) []*sdp.MediaDescription { + // Static compiler check for sample rate hardcoded below. + var _ = [1]struct{}{}[8000-sampleRate] + + return []*sdp.MediaDescription{ + { + MediaName: sdp.MediaName{ + Media: "audio", + Port: sdp.RangedPort{Value: rtpListenerPort}, + Protos: []string{"RTP", "AVP"}, + Formats: []string{"0", "101"}, + }, + Attributes: []sdp.Attribute{ + {Key: "rtpmap", Value: "0 PCMU/8000"}, + {Key: "rtpmap", Value: "101 telephone-event/8000"}, + {Key: "fmtp", Value: "101 0-16"}, + {Key: "ptime", Value: "20"}, + {Key: "maxptime", Value: "150"}, + {Key: "sendrecv"}, + }, + }, + } +} + +func sdpGenerateOffer(publicIp string, rtpListenerPort int) ([]byte, error) { + sessId := rand.Uint64() // TODO: do we need to track these? answer := sdp.SessionDescription{ Version: 0, Origin: sdp.Origin{ Username: "-", - SessionID: offer.Origin.SessionID, - SessionVersion: offer.Origin.SessionID + 2, + SessionID: sessId, + SessionVersion: sessId, NetworkType: "IN", AddressType: "IP4", UnicastAddress: publicIp, @@ -35,35 +64,47 @@ func generateAnswer(offer sdp.SessionDescription, publicIp string, rtpListenerPo Address: &sdp.Address{Address: publicIp}, }, TimeDescriptions: []sdp.TimeDescription{ - sdp.TimeDescription{ + { Timing: sdp.Timing{ StartTime: 0, StopTime: 0, }, }, }, - MediaDescriptions: []*sdp.MediaDescription{ - &sdp.MediaDescription{ - MediaName: sdp.MediaName{ - Media: "audio", - Port: sdp.RangedPort{Value: rtpListenerPort}, - Protos: []string{"RTP", "AVP"}, - Formats: []string{"0", "101"}, - }, - Attributes: []sdp.Attribute{ - sdp.Attribute{Key: "rtpmap", Value: "0 PCMU/8000"}, - sdp.Attribute{Key: "rtpmap", Value: "101 telephone-event/8000"}, - sdp.Attribute{Key: "fmtp", Value: "101 0-16"}, - sdp.Attribute{Key: "ptime", Value: "20"}, - sdp.Attribute{Key: "maxptime", Value: "150"}, - sdp.Attribute{Key: "sendrecv"}, + MediaDescriptions: sdpMediaDesc(rtpListenerPort), + } + + return answer.Marshal() +} + +func sdpGenerateAnswer(offer sdp.SessionDescription, publicIp string, rtpListenerPort int) ([]byte, error) { + + answer := sdp.SessionDescription{ + Version: 0, + Origin: sdp.Origin{ + Username: "-", + SessionID: offer.Origin.SessionID, + SessionVersion: offer.Origin.SessionID + 2, + NetworkType: "IN", + AddressType: "IP4", + UnicastAddress: publicIp, + }, + SessionName: "LiveKit", + ConnectionInformation: &sdp.ConnectionInformation{ + NetworkType: "IN", + AddressType: "IP4", + Address: &sdp.Address{Address: publicIp}, + }, + TimeDescriptions: []sdp.TimeDescription{ + { + Timing: sdp.Timing{ + StartTime: 0, + StopTime: 0, }, }, }, + MediaDescriptions: sdpMediaDesc(rtpListenerPort), } - // Static compiler check for sample rate hardcoded above. - var _ = [1]struct{}{}[8000-sampleRate] - return answer.Marshal() }