Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RTP session termination for inbound. Improve logging. #13

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package service

import (
"context"
"log"

"github.com/frostbyte73/core"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -99,7 +98,7 @@ func (s *Service) HandleDispatchRules(callingNumber, calledNumber, calledHost, s
})

if err != nil {
log.Println(err)
logger.Warnw("SIP handle dispatch rule error", err)
return "", "", false, true
}

Expand Down
54 changes: 39 additions & 15 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package sip

import (
"fmt"
"log"
"sync/atomic"
"time"

"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/livekit/protocol/logger"
"github.com/pion/sdp/v2"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -92,6 +92,8 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr
}

func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
_ = tx.Respond(sip.NewResponseFromRequest(req, 180, "Ringing", nil))

tag, err := getTagValue(req)
if err != nil {
sipErrorResponse(tx, req)
Expand All @@ -111,9 +113,11 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}
src := req.Source()

logger.Infow("INVITE", "tag", tag, "from", from, "to", to)

username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src)
if err != nil {
log.Printf("Rejecting inbound call, doesn't match any Trunks %q %q %q %q\n", from.Address.User, to.Address.User, to.Address.Host, src)
logger.Warnw("Rejecting inbound call, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host)
sipErrorResponse(tx, req)
return
}
Expand All @@ -128,6 +132,8 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
type inboundCall struct {
s *Server
tag string
inviteReq *sip.Request
inviteResp *sip.Response
from *sip.FromHeader
to *sip.ToHeader
src string
Expand All @@ -148,6 +154,9 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead
dtmf: make(chan byte, 10),
lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts
}
s.cmu.Lock()
s.activeCalls[tag] = c
s.cmu.Unlock()
return c
}

Expand All @@ -156,30 +165,30 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false)
if rejectInvite {
log.Printf("Rejecting inbound call, doesn't match any Dispatch Rules %q %q %q %q\n", c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src)
logger.Infow("Rejecting inbound call, doesn't match any Dispatch Rules", "from", c.from.Address.User, "to", c.to.Address.User, "to-host", c.to.Address.Host, "src", c.src)
sipErrorResponse(tx, req)
c.Close()
return
}

// We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF.
answerData, err := c.runMediaConn(req.Body(), conf)

if err != nil {
sipErrorResponse(tx, req)
c.Close()
return
}
c.s.cmu.Lock()
c.s.activeCalls[c.tag] = c
c.s.cmu.Unlock()

res := sip.NewResponseFromRequest(req, 200, "OK", answerData)
res.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp, Port: c.s.conf.SIPPort}})
res.AppendHeader(&contentTypeHeaderSDP)
if err = tx.Respond(res); err != nil {
log.Println(err)
logger.Errorw("Cannot respond to INVITE", err)
// TODO: should we close the call in this case?
return
}
c.inviteReq = req
c.inviteResp = res
// We own this goroutine, so can freely block.
if requirePin {
c.pinPrompt()
Expand All @@ -188,6 +197,16 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c
}
}

func (c *inboundCall) sendBye() {
if c.inviteReq == nil {
return
}
res := sip.NewByeRequest(c.inviteReq, c.inviteResp, nil)
c.s.sipSrv.TransportLayer().WriteMsg(res)
c.inviteReq = nil
c.inviteResp = nil
}

func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answerData []byte, _ error) {
conn := NewMediaConn()
conn.OnRTP(c)
Expand All @@ -209,7 +228,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe
}

func (c *inboundCall) pinPrompt() {
log.Printf("Requesting Pin for SIP call %q -> %q\n", c.from.Address.User, c.to.Address.User)
logger.Infow("Requesting Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User)
const pinLimit = 16
c.playAudio(c.s.res.enterPin)
pin := ""
Expand All @@ -228,9 +247,10 @@ func (c *inboundCall) pinPrompt() {
// End of the pin
noPin = pin == ""

log.Printf("Checking Pin for SIP call %q -> %q = %q (noPin = %v)\n", c.from.Address.User, c.to.Address.User, pin, noPin)
logger.Infow("Checking Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin)
roomName, identity, requirePin, reject := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, pin, noPin)
if reject || requirePin || roomName == "" {
logger.Infow("Rejecting call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin)
c.playAudio(c.s.res.wrongPin)
c.Close()
return
Expand All @@ -250,14 +270,15 @@ func (c *inboundCall) pinPrompt() {
}

func (c *inboundCall) Close() error {
if c.done.CompareAndSwap(false, true) {
if !c.done.CompareAndSwap(false, true) {
return nil
}
logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User)
c.s.cmu.Lock()
delete(c.s.activeCalls, c.tag)
c.s.cmu.Unlock()
c.closeMedia()
// FIXME: drop the actual call
c.sendBye()
return nil
}

Expand All @@ -267,7 +288,10 @@ func (c *inboundCall) closeMedia() {
p.Close()
c.lkRoom = nil
}
c.rtpConn.Close()
if c.rtpConn != nil {
c.rtpConn.Close()
c.rtpConn = nil
}
close(c.dtmf)
}

Expand Down Expand Up @@ -304,10 +328,10 @@ func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity str
}

func (c *inboundCall) joinRoom(roomName, identity string) {
log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity)
logger.Infow("Bridging SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "roomName", roomName, "identity", identity)
c.playAudio(c.s.res.roomJoin)
if err := c.createLiveKitParticipant(roomName, identity); err != nil {
log.Println(err)
logger.Errorw("Cannot create LiveKit participant", err, "tag", c.tag)
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
package sip

import (
"log"
"sync/atomic"

"github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go"
"github.com/pion/webrtc/v3"

Expand Down Expand Up @@ -66,7 +66,7 @@ func (r *Room) Connect(conf *config.Config, roomName string, identity string) er
OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
if track.Kind() != webrtc.RTPCodecTypeAudio {
if err := pub.SetSubscribed(false); err != nil {
log.Println(err)
logger.Errorw("Cannot unsubscribe from the track", err)
}
return
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (r *Room) SetOutput(out media.Writer[media.LPCM16Sample]) {
}

func (r *Room) Close() error {
if r.room == nil {
if r.room != nil {
r.room.Disconnect()
r.room = nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/livekit/protocol/logger"
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -144,6 +145,7 @@ func (s *Server) Start(agent *sipgo.UserAgent) error {
sipErrorResponse(tx, req)
return
}
logger.Infow("BYE", "tag", tag)

s.cmu.RLock()
c := s.activeCalls[tag]
Expand All @@ -158,6 +160,7 @@ func (s *Server) Start(agent *sipgo.UserAgent) error {
// Ignore ACKs
s.sipSrv.OnAck(func(req *sip.Request, tx sip.ServerTransaction) {})

// TODO: pass proper context here
go func() {
panic(s.sipSrv.ListenAndServe(context.TODO(), "udp", fmt.Sprintf("0.0.0.0:%d", s.conf.SIPPort)))
}()
Expand Down
18 changes: 18 additions & 0 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,34 @@
package sip

import (
"errors"
"log"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/rs/zerolog"
zlog "github.com/rs/zerolog/log"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/version"
)

func init() {
zlog.Logger = zerolog.New(nil).Hook(zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) {
switch level {
case zerolog.DebugLevel:
logger.Debugw(message)
case zerolog.InfoLevel:
logger.Infow(message)
case zerolog.WarnLevel:
logger.Warnw(message, errors.New(message))
case zerolog.ErrorLevel:
logger.Errorw(message, errors.New(message))
}
}))
}

type Service struct {
cli *Client
srv *Server
Expand Down
3 changes: 2 additions & 1 deletion test/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
username = flag.String("username", "", "")
password = flag.String("password", "", "")
sipUri = flag.String("sip-uri", "example.pstn.twilio.com", "")
filePath = flag.String("play", "audio.mkv", "")
)

func startMediaListener() *net.UDPConn {
Expand Down Expand Up @@ -140,7 +141,7 @@ func parseAnswer(in []byte) (string, int) {
func sendAudioPackets(conn *net.UDPConn, body []byte) {
ip, port := parseAnswer(body)

r, err := os.Open("audio.mkv")
r, err := os.Open(*filePath)
if err != nil {
panic(err)
}
Expand Down
Loading