Skip to content

Commit

Permalink
Initial implementation of outbound calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Nov 30, 2023
1 parent 06cd534 commit 5800764
Show file tree
Hide file tree
Showing 13 changed files with 697 additions and 138 deletions.
11 changes: 10 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/errors"
"github.com/livekit/sip/pkg/service"
Expand Down Expand Up @@ -83,7 +84,12 @@ func runService(c *cli.Context) error {
killChan := make(chan os.Signal, 1)
signal.Notify(killChan, syscall.SIGINT)

svc := service.NewService(conf, psrpcClient, bus)
sipCli := sip.NewClient(conf)
if err = sipCli.Start(); err != nil {
return err
}

svc := service.NewService(conf, sipCli, psrpcClient, bus)

sipSrv := sip.NewServer(conf, svc.HandleTrunkAuthentication, svc.HandleDispatchRules)
if err = sipSrv.Start(); err != nil {
Expand All @@ -99,6 +105,9 @@ func runService(c *cli.Context) error {
case sig := <-killChan:
logger.Infow("exit requested, stopping all SIP and shutting down", "signal", sig)
svc.Stop(true)
if err = sipCli.Stop(); err != nil {
log.Println(err)
}
if err = sipSrv.Stop(); err != nil {
log.Println(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/icholy/digest v0.1.22
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7
github.com/livekit/psrpc v0.5.1
github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd
github.com/livekit/psrpc v0.5.2
github.com/livekit/server-sdk-go v1.1.1
github.com/pion/interceptor v0.1.25
github.com/pion/rtp v1.8.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M=
github.com/livekit/psrpc v0.5.1 h1:ihN5uKIvbU69UsFS4HdYmou5GuK0Dt4hix4eOmRS7o8=
github.com/livekit/psrpc v0.5.1/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd h1:DlyhqC/Ge176SQFNCP/VgSabh3Gwf0cjEKqooznbO1E=
github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY=
github.com/livekit/server-sdk-go v1.1.1/go.mod h1:724BcsVjpsQu8zK9VX2TfdEWt+DtsBeT3EnMrDbyT3I=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down
13 changes: 10 additions & 3 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ import (
type Service struct {
conf *config.Config

psrpcServer rpc.SIPInternalServerImpl
psrpcClient rpc.IOInfoClient
bus psrpc.MessageBus

shutdown core.Fuse
}

func NewService(conf *config.Config, psrpcClient rpc.IOInfoClient, bus psrpc.MessageBus) *Service {
func NewService(conf *config.Config, srv rpc.SIPInternalServerImpl, cli rpc.IOInfoClient, bus psrpc.MessageBus) *Service {
s := &Service{
conf: conf,
psrpcClient: psrpcClient,
psrpcServer: srv,
psrpcClient: cli,
bus: bus,
shutdown: core.NewFuse(),
}

return s
}

Expand All @@ -54,6 +55,12 @@ func (s *Service) Stop(kill bool) {
func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)

srv, err := rpc.NewSIPInternalServer(s.psrpcServer, s.bus)
if err != nil {
return err
}
defer srv.Shutdown()

logger.Debugw("service ready")

for { //nolint: gosimple
Expand Down
130 changes: 130 additions & 0 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sip

import (
"context"
"fmt"
"sync"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/rpc"
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
)

type Client struct {
conf *config.Config

sipCli *sipgo.Client
publicIp string

cmu sync.RWMutex
activeCalls map[string]*outboundCall
}

func NewClient(conf *config.Config) *Client {
c := &Client{
conf: conf,
publicIp: getPublicIP(),
activeCalls: make(map[string]*outboundCall),
}
return c
}

func (c *Client) Start() error {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(userAgent),
)
if err != nil {
return err
}

c.sipCli, err = sipgo.NewClient(ua, sipgo.WithClientHostname(c.publicIp))
if err != nil {
return err
}
// FIXME: read existing SIP participants from the store?
return nil
}

func (c *Client) Stop() error {
c.cmu.Lock()
calls := maps.Values(c.activeCalls)
c.activeCalls = make(map[string]*outboundCall)
c.cmu.Unlock()
for _, call := range calls {
call.Close()
}
if c.sipCli != nil {
c.sipCli.Close()
c.sipCli = nil
}
// FIXME: anything else?
return nil
}

func (c *Client) UpdateSIPParticipant(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) (*rpc.InternalUpdateSIPParticipantResponse, error) {
if req.CallTo == "" {
// Disconnect participant
if call := c.getCall(req.ParticipantId); call != nil {
call.Close()
}
return &rpc.InternalUpdateSIPParticipantResponse{}, nil
}
err := c.getOrCreateCall(req.ParticipantId).Update(ctx, sipOutboundConfig{
address: req.Address,
from: req.Number,
to: req.CallTo,
user: req.Username,
pass: req.Password,
}, lkRoomConfig{
roomName: req.RoomName,
identity: req.ParticipantIdentity,
})
if err != nil {
return nil, err
}
return &rpc.InternalUpdateSIPParticipantResponse{}, nil
}

func (c *Client) UpdateSIPParticipantAffinity(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) float32 {
call := c.getCall(req.ParticipantId)
if call != nil {
return 1 // Existing participant
}
// TODO: scale affinity based on a number or active calls?
return 0.5
}

func (c *Client) SendSIPParticipantDTMF(ctx context.Context, req *rpc.InternalSendSIPParticipantDTMFRequest) (*rpc.InternalSendSIPParticipantDTMFResponse, error) {
call := c.getCall(req.ParticipantId)
if call == nil {
return nil, fmt.Errorf("Cannot send DTMF: participant not connected.")
}
if err := call.SendDTMF(ctx, req.Digits); err != nil {
return nil, err
}
return &rpc.InternalSendSIPParticipantDTMFResponse{}, nil
}

func (c *Client) SendSIPParticipantDTMFAffinity(ctx context.Context, req *rpc.InternalSendSIPParticipantDTMFRequest) float32 {
call := c.getCall(req.ParticipantId)
if call != nil {
return 1 // Only existing participants
}
return 0
}
79 changes: 78 additions & 1 deletion pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/icholy/digest"
"github.com/pion/sdp/v2"

"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/media/ulaw"
)

func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
Expand Down Expand Up @@ -193,7 +195,7 @@ func (c *inboundCall) runMediaConn(offerData []byte) (answerData []byte, _ error
if err := offer.Unmarshal(offerData); err != nil {
return nil, err
}
return generateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port)
return sdpGenerateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port)
}

func (c *inboundCall) pinPrompt() {
Expand Down Expand Up @@ -248,3 +250,78 @@ func (c *inboundCall) Close() error {
// FIXME: drop the actual call
return nil
}

func (c *inboundCall) closeMedia() {
c.audioHandler.Store(nil)
if p := c.lkRoom.Load(); p != nil {
p.Close()
c.lkRoom.Store(nil)
}
c.rtpConn.Close()
close(c.dtmf)
}

func (c *inboundCall) HandleRTP(p *rtp.Packet) error {
if p.Marker && p.PayloadType == 101 {
c.handleDTMF(p.Payload)
return nil
}
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
}
return nil
}

func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error {
room, err := ConnectToRoom(c.s.conf, roomName, participantIdentity)
if err != nil {
return err
}
local, err := room.NewParticipant()
if err != nil {
_ = room.Close()
return err
}
c.lkRoom.Store(room)

// Decoding pipeline (SIP -> LK)
lpcm := media.DecodePCM(local)
law := ulaw.Encode(lpcm)
var h rtp.Handler = rtp.NewMediaStreamIn(law)
c.audioHandler.Store(&h)

// Encoding pipeline (LK -> SIP)
s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur)
room.SetOutput(ulaw.Decode(s))

return nil
}

func (c *inboundCall) joinRoom(roomName, identity string) {
log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity)
c.playAudio(c.s.res.roomJoin)
if err := c.createLiveKitParticipant(roomName, identity); err != nil {
log.Println(err)
}
}

func (c *inboundCall) playAudio(frames []media.PCM16Sample) {
r := c.lkRoom.Load()
t := r.NewTrack()
defer t.Close()
t.PlayAudio(frames)
}

func (c *inboundCall) handleDTMF(data []byte) { // RFC2833
if len(data) < 4 {
return
}
ev := data[0]
b := dtmfEventToChar[ev]
// We should have enough buffer here.
select {
case c.dtmf <- b:
default:
}
}
Loading

0 comments on commit 5800764

Please sign in to comment.