From 87c9513398e500abbff140ecdd5da40f2c0781d9 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Mon, 4 Dec 2023 19:23:18 +0200 Subject: [PATCH] Fix RTP session termination for inbound. Improve logging. --- pkg/service/service.go | 3 +-- pkg/sip/inbound.go | 54 ++++++++++++++++++++++++++++++------------ pkg/sip/room.go | 6 ++--- pkg/sip/server.go | 3 +++ pkg/sip/service.go | 18 ++++++++++++++ test/client/main.go | 3 ++- 6 files changed, 66 insertions(+), 21 deletions(-) diff --git a/pkg/service/service.go b/pkg/service/service.go index f7b22345..8aa478a0 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -16,7 +16,6 @@ package service import ( "context" - "log" "github.com/frostbyte73/core" "github.com/livekit/protocol/logger" @@ -99,7 +98,7 @@ func (s *Service) HandleDispatchRules(callingNumber, calledNumber, calledHost, s }) if err != nil { - log.Println(err) + logger.Warnw("SIP handle dispatch rule error", err) return "", "", false, true } diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 3689c77e..19eef59d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -16,12 +16,12 @@ package sip import ( "fmt" - "log" "sync/atomic" "time" "github.com/emiago/sipgo/sip" "github.com/icholy/digest" + "github.com/livekit/protocol/logger" "github.com/pion/sdp/v2" "github.com/livekit/sip/pkg/config" @@ -92,6 +92,8 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr } func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { + _ = tx.Respond(sip.NewResponseFromRequest(req, 180, "Ringing", nil)) + tag, err := getTagValue(req) if err != nil { sipErrorResponse(tx, req) @@ -111,9 +113,11 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { } src := req.Source() + logger.Infow("INVITE", "tag", tag, "from", from, "to", to) + username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src) if err != nil { - log.Printf("Rejecting inbound call, doesn't match any Trunks %q %q %q %q\n", from.Address.User, to.Address.User, to.Address.Host, src) + logger.Warnw("Rejecting inbound call, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host) sipErrorResponse(tx, req) return } @@ -128,6 +132,8 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { type inboundCall struct { s *Server tag string + inviteReq *sip.Request + inviteResp *sip.Response from *sip.FromHeader to *sip.ToHeader src string @@ -148,6 +154,9 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead dtmf: make(chan byte, 10), lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts } + s.cmu.Lock() + s.activeCalls[tag] = c + s.cmu.Unlock() return c } @@ -156,30 +165,30 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c // Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required. roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false) if rejectInvite { - log.Printf("Rejecting inbound call, doesn't match any Dispatch Rules %q %q %q %q\n", c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src) + logger.Infow("Rejecting inbound call, doesn't match any Dispatch Rules", "from", c.from.Address.User, "to", c.to.Address.User, "to-host", c.to.Address.Host, "src", c.src) sipErrorResponse(tx, req) + c.Close() return } // We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF. answerData, err := c.runMediaConn(req.Body(), conf) - if err != nil { sipErrorResponse(tx, req) + c.Close() return } - c.s.cmu.Lock() - c.s.activeCalls[c.tag] = c - c.s.cmu.Unlock() res := sip.NewResponseFromRequest(req, 200, "OK", answerData) res.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp, Port: c.s.conf.SIPPort}}) res.AppendHeader(&contentTypeHeaderSDP) if err = tx.Respond(res); err != nil { - log.Println(err) + logger.Errorw("Cannot respond to INVITE", err) // TODO: should we close the call in this case? return } + c.inviteReq = req + c.inviteResp = res // We own this goroutine, so can freely block. if requirePin { c.pinPrompt() @@ -188,6 +197,16 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c } } +func (c *inboundCall) sendBye() { + if c.inviteReq == nil { + return + } + res := sip.NewByeRequest(c.inviteReq, c.inviteResp, nil) + c.s.sipSrv.TransportLayer().WriteMsg(res) + c.inviteReq = nil + c.inviteResp = nil +} + func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answerData []byte, _ error) { conn := NewMediaConn() conn.OnRTP(c) @@ -209,7 +228,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe } func (c *inboundCall) pinPrompt() { - log.Printf("Requesting Pin for SIP call %q -> %q\n", c.from.Address.User, c.to.Address.User) + logger.Infow("Requesting Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User) const pinLimit = 16 c.playAudio(c.s.res.enterPin) pin := "" @@ -228,9 +247,10 @@ func (c *inboundCall) pinPrompt() { // End of the pin noPin = pin == "" - log.Printf("Checking Pin for SIP call %q -> %q = %q (noPin = %v)\n", c.from.Address.User, c.to.Address.User, pin, noPin) + logger.Infow("Checking Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin) roomName, identity, requirePin, reject := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, pin, noPin) if reject || requirePin || roomName == "" { + logger.Infow("Rejecting call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin) c.playAudio(c.s.res.wrongPin) c.Close() return @@ -250,14 +270,15 @@ func (c *inboundCall) pinPrompt() { } func (c *inboundCall) Close() error { - if c.done.CompareAndSwap(false, true) { + if !c.done.CompareAndSwap(false, true) { return nil } + logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User) c.s.cmu.Lock() delete(c.s.activeCalls, c.tag) c.s.cmu.Unlock() c.closeMedia() - // FIXME: drop the actual call + c.sendBye() return nil } @@ -267,7 +288,10 @@ func (c *inboundCall) closeMedia() { p.Close() c.lkRoom = nil } - c.rtpConn.Close() + if c.rtpConn != nil { + c.rtpConn.Close() + c.rtpConn = nil + } close(c.dtmf) } @@ -304,10 +328,10 @@ func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity str } func (c *inboundCall) joinRoom(roomName, identity string) { - log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity) + logger.Infow("Bridging SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "roomName", roomName, "identity", identity) c.playAudio(c.s.res.roomJoin) if err := c.createLiveKitParticipant(roomName, identity); err != nil { - log.Println(err) + logger.Errorw("Cannot create LiveKit participant", err, "tag", c.tag) } } diff --git a/pkg/sip/room.go b/pkg/sip/room.go index c8e25cf1..bcae9172 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -15,9 +15,9 @@ package sip import ( - "log" "sync/atomic" + "github.com/livekit/protocol/logger" lksdk "github.com/livekit/server-sdk-go" "github.com/pion/webrtc/v3" @@ -66,7 +66,7 @@ func (r *Room) Connect(conf *config.Config, roomName string, identity string) er OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { if track.Kind() != webrtc.RTPCodecTypeAudio { if err := pub.SetSubscribed(false); err != nil { - log.Println(err) + logger.Errorw("Cannot unsubscribe from the track", err) } return } @@ -119,7 +119,7 @@ func (r *Room) SetOutput(out media.Writer[media.LPCM16Sample]) { } func (r *Room) Close() error { - if r.room == nil { + if r.room != nil { r.room.Disconnect() r.room = nil } diff --git a/pkg/sip/server.go b/pkg/sip/server.go index a60c8032..7deb540d 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -23,6 +23,7 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/icholy/digest" + "github.com/livekit/protocol/logger" "golang.org/x/exp/maps" "github.com/livekit/sip/pkg/config" @@ -144,6 +145,7 @@ func (s *Server) Start(agent *sipgo.UserAgent) error { sipErrorResponse(tx, req) return } + logger.Infow("BYE", "tag", tag) s.cmu.RLock() c := s.activeCalls[tag] @@ -158,6 +160,7 @@ func (s *Server) Start(agent *sipgo.UserAgent) error { // Ignore ACKs s.sipSrv.OnAck(func(req *sip.Request, tx sip.ServerTransaction) {}) + // TODO: pass proper context here go func() { panic(s.sipSrv.ListenAndServe(context.TODO(), "udp", fmt.Sprintf("0.0.0.0:%d", s.conf.SIPPort))) }() diff --git a/pkg/sip/service.go b/pkg/sip/service.go index bb05cb5d..a3be2847 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -15,16 +15,34 @@ package sip import ( + "errors" "log" "github.com/emiago/sipgo" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/rs/zerolog" + zlog "github.com/rs/zerolog/log" "github.com/livekit/sip/pkg/config" "github.com/livekit/sip/version" ) +func init() { + zlog.Logger = zerolog.New(nil).Hook(zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { + switch level { + case zerolog.DebugLevel: + logger.Debugw(message) + case zerolog.InfoLevel: + logger.Infow(message) + case zerolog.WarnLevel: + logger.Warnw(message, errors.New(message)) + case zerolog.ErrorLevel: + logger.Errorw(message, errors.New(message)) + } + })) +} + type Service struct { cli *Client srv *Server diff --git a/test/client/main.go b/test/client/main.go index 73dfe100..9dd5d2d9 100644 --- a/test/client/main.go +++ b/test/client/main.go @@ -40,6 +40,7 @@ var ( username = flag.String("username", "", "") password = flag.String("password", "", "") sipUri = flag.String("sip-uri", "example.pstn.twilio.com", "") + filePath = flag.String("play", "audio.mkv", "") ) func startMediaListener() *net.UDPConn { @@ -140,7 +141,7 @@ func parseAnswer(in []byte) (string, int) { func sendAudioPackets(conn *net.UDPConn, body []byte) { ip, port := parseAnswer(body) - r, err := os.Open("audio.mkv") + r, err := os.Open(*filePath) if err != nil { panic(err) }