Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of outbound calls. #11

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/errors"
"github.com/livekit/sip/pkg/service"
Expand Down Expand Up @@ -83,7 +84,12 @@ func runService(c *cli.Context) error {
killChan := make(chan os.Signal, 1)
signal.Notify(killChan, syscall.SIGINT)

svc := service.NewService(conf, psrpcClient, bus)
sipCli := sip.NewClient(conf)
if err = sipCli.Start(); err != nil {
return err
}

svc := service.NewService(conf, sipCli, psrpcClient, bus)

sipSrv := sip.NewServer(conf, svc.HandleTrunkAuthentication, svc.HandleDispatchRules)
if err = sipSrv.Start(); err != nil {
Expand All @@ -99,6 +105,9 @@ func runService(c *cli.Context) error {
case sig := <-killChan:
logger.Infow("exit requested, stopping all SIP and shutting down", "signal", sig)
svc.Stop(true)
if err = sipCli.Stop(); err != nil {
log.Println(err)
}
if err = sipSrv.Stop(); err != nil {
log.Println(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/icholy/digest v0.1.22
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7
github.com/livekit/psrpc v0.5.1
github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd
github.com/livekit/psrpc v0.5.2
github.com/livekit/server-sdk-go v1.1.1
github.com/pion/interceptor v0.1.25
github.com/pion/rtp v1.8.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4=
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M=
github.com/livekit/psrpc v0.5.1 h1:ihN5uKIvbU69UsFS4HdYmou5GuK0Dt4hix4eOmRS7o8=
github.com/livekit/psrpc v0.5.1/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd h1:DlyhqC/Ge176SQFNCP/VgSabh3Gwf0cjEKqooznbO1E=
github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY=
github.com/livekit/server-sdk-go v1.1.1/go.mod h1:724BcsVjpsQu8zK9VX2TfdEWt+DtsBeT3EnMrDbyT3I=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down
13 changes: 10 additions & 3 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ import (
type Service struct {
conf *config.Config

psrpcServer rpc.SIPInternalServerImpl
psrpcClient rpc.IOInfoClient
bus psrpc.MessageBus

shutdown core.Fuse
}

func NewService(conf *config.Config, psrpcClient rpc.IOInfoClient, bus psrpc.MessageBus) *Service {
func NewService(conf *config.Config, srv rpc.SIPInternalServerImpl, cli rpc.IOInfoClient, bus psrpc.MessageBus) *Service {
s := &Service{
conf: conf,
psrpcClient: psrpcClient,
psrpcServer: srv,
psrpcClient: cli,
bus: bus,
shutdown: core.NewFuse(),
}

return s
}

Expand All @@ -54,6 +55,12 @@ func (s *Service) Stop(kill bool) {
func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)

srv, err := rpc.NewSIPInternalServer(s.psrpcServer, s.bus)
if err != nil {
return err
}
defer srv.Shutdown()

logger.Debugw("service ready")

for { //nolint: gosimple
Expand Down
130 changes: 130 additions & 0 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sip

import (
"context"
"fmt"
"sync"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/rpc"
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
)

type Client struct {
conf *config.Config

sipCli *sipgo.Client
publicIp string

cmu sync.RWMutex
activeCalls map[string]*outboundCall
}

func NewClient(conf *config.Config) *Client {
c := &Client{
conf: conf,
publicIp: getPublicIP(),
activeCalls: make(map[string]*outboundCall),
}
return c
}

func (c *Client) Start() error {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(userAgent),
)
if err != nil {
return err
}

c.sipCli, err = sipgo.NewClient(ua, sipgo.WithClientHostname(c.publicIp))
if err != nil {
return err
}
// FIXME: read existing SIP participants from the store?
return nil
}

func (c *Client) Stop() error {
c.cmu.Lock()
calls := maps.Values(c.activeCalls)
c.activeCalls = make(map[string]*outboundCall)
c.cmu.Unlock()
for _, call := range calls {
call.Close()
}
if c.sipCli != nil {
c.sipCli.Close()
c.sipCli = nil
}
// FIXME: anything else?
return nil
}

func (c *Client) UpdateSIPParticipant(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) (*rpc.InternalUpdateSIPParticipantResponse, error) {
if req.CallTo == "" {
// Disconnect participant
if call := c.getCall(req.ParticipantId); call != nil {
call.Close()
}
return &rpc.InternalUpdateSIPParticipantResponse{}, nil
}
err := c.getOrCreateCall(req.ParticipantId).Update(ctx, sipOutboundConfig{
address: req.Address,
from: req.Number,
to: req.CallTo,
user: req.Username,
pass: req.Password,
}, lkRoomConfig{
roomName: req.RoomName,
identity: req.ParticipantIdentity,
})
if err != nil {
return nil, err
}
return &rpc.InternalUpdateSIPParticipantResponse{}, nil
}

func (c *Client) UpdateSIPParticipantAffinity(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) float32 {
call := c.getCall(req.ParticipantId)
if call != nil {
return 1 // Existing participant
}
// TODO: scale affinity based on a number or active calls?
return 0.5
}

func (c *Client) SendSIPParticipantDTMF(ctx context.Context, req *rpc.InternalSendSIPParticipantDTMFRequest) (*rpc.InternalSendSIPParticipantDTMFResponse, error) {
call := c.getCall(req.ParticipantId)
if call == nil {
return nil, fmt.Errorf("Cannot send DTMF: participant not connected.")
}
if err := call.SendDTMF(ctx, req.Digits); err != nil {
return nil, err
}
return &rpc.InternalSendSIPParticipantDTMFResponse{}, nil
}

func (c *Client) SendSIPParticipantDTMFAffinity(ctx context.Context, req *rpc.InternalSendSIPParticipantDTMFRequest) float32 {
call := c.getCall(req.ParticipantId)
if call != nil {
return 1 // Only existing participants
}
return 0
}
79 changes: 78 additions & 1 deletion pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/icholy/digest"
"github.com/pion/sdp/v2"

"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/media/ulaw"
)

func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
Expand Down Expand Up @@ -193,7 +195,7 @@ func (c *inboundCall) runMediaConn(offerData []byte) (answerData []byte, _ error
if err := offer.Unmarshal(offerData); err != nil {
return nil, err
}
return generateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port)
return sdpGenerateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port)
}

func (c *inboundCall) pinPrompt() {
Expand Down Expand Up @@ -248,3 +250,78 @@ func (c *inboundCall) Close() error {
// FIXME: drop the actual call
return nil
}

func (c *inboundCall) closeMedia() {
c.audioHandler.Store(nil)
if p := c.lkRoom.Load(); p != nil {
p.Close()
c.lkRoom.Store(nil)
}
c.rtpConn.Close()
close(c.dtmf)
}

func (c *inboundCall) HandleRTP(p *rtp.Packet) error {
if p.Marker && p.PayloadType == 101 {
c.handleDTMF(p.Payload)
return nil
}
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
}
return nil
}

func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error {
room, err := ConnectToRoom(c.s.conf, roomName, participantIdentity)
if err != nil {
return err
}
local, err := room.NewParticipant()
if err != nil {
_ = room.Close()
return err
}
c.lkRoom.Store(room)

// Decoding pipeline (SIP -> LK)
lpcm := media.DecodePCM(local)
law := ulaw.Encode(lpcm)
var h rtp.Handler = rtp.NewMediaStreamIn(law)
c.audioHandler.Store(&h)

// Encoding pipeline (LK -> SIP)
s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur)
room.SetOutput(ulaw.Decode(s))

return nil
}

func (c *inboundCall) joinRoom(roomName, identity string) {
log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity)
c.playAudio(c.s.res.roomJoin)
if err := c.createLiveKitParticipant(roomName, identity); err != nil {
log.Println(err)
}
}

func (c *inboundCall) playAudio(frames []media.PCM16Sample) {
r := c.lkRoom.Load()
t := r.NewTrack()
defer t.Close()
t.PlayAudio(frames)
}

func (c *inboundCall) handleDTMF(data []byte) { // RFC2833
if len(data) < 4 {
return
}
ev := data[0]
b := dtmfEventToChar[ev]
// We should have enough buffer here.
select {
case c.dtmf <- b:
default:
}
}
Loading
Loading