Skip to content

Commit

Permalink
Refactor SIP service similar to ingress/egress.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Nov 30, 2023
1 parent 9ca88b9 commit ca0f9c3
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 53 deletions.
21 changes: 19 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (

"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/errors"
"github.com/livekit/sip/pkg/service"
"github.com/livekit/sip/pkg/sip"
"github.com/livekit/sip/version"
)

Expand Down Expand Up @@ -68,9 +70,9 @@ func runService(c *cli.Context) error {
if err != nil {
return err
}
bus := psrpc.NewRedisMessageBus(rc)

svc, err := service.NewService(conf, bus)
bus := psrpc.NewRedisMessageBus(rc)
psrpcClient, err := rpc.NewIOInfoClient(bus)
if err != nil {
return err
}
Expand All @@ -81,14 +83,29 @@ func runService(c *cli.Context) error {
killChan := make(chan os.Signal, 1)
signal.Notify(killChan, syscall.SIGINT)

sipsrv, err := sip.NewService(conf)
if err != nil {
return err
}

svc := service.NewService(conf, sipsrv.InternalServerImpl(), psrpcClient, bus)
sipsrv.SetServerHandler(svc)

if err = sipsrv.Start(); err != nil {
return err
}

go func() {
select {
case sig := <-stopChan:
logger.Infow("exit requested, finishing all SIP then shutting down", "signal", sig)
svc.Stop(false)
sipsrv.Stop(false)

case sig := <-killChan:
logger.Infow("exit requested, stopping all SIP and shutting down", "signal", sig)
svc.Stop(true)
sipsrv.Stop(true)
}
}()

Expand Down
41 changes: 6 additions & 35 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,73 +18,44 @@ import (
"context"
"log"

"github.com/emiago/sipgo"
"github.com/frostbyte73/core"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/sip"
"github.com/livekit/sip/version"
)

type Service struct {
conf *config.Config
cli *sip.Client
srv *sip.Server

psrpcServer rpc.SIPInternalServerImpl
psrpcClient rpc.IOInfoClient
bus psrpc.MessageBus

shutdown core.Fuse
}

func NewService(conf *config.Config, bus psrpc.MessageBus) (*Service, error) {
psrpcClient, err := rpc.NewIOInfoClient(bus)
if err != nil {
return nil, err
}
cli := sip.NewClient(conf)
func NewService(conf *config.Config, srv rpc.SIPInternalServerImpl, cli rpc.IOInfoClient, bus psrpc.MessageBus) *Service {
s := &Service{
conf: conf,
cli: cli,
psrpcClient: psrpcClient,
psrpcServer: srv,
psrpcClient: cli,
bus: bus,
shutdown: core.NewFuse(),
}
s.srv = sip.NewServer(conf, s.HandleTrunkAuthentication, s.HandleDispatchRules)
return s, nil
return s
}

func (s *Service) Stop(kill bool) {
s.shutdown.Break()
if kill {
if err := s.cli.Stop(); err != nil {
log.Println(err)
}
if err := s.srv.Stop(); err != nil {
log.Println(err)
}
}
}

func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(sip.UserAgent),
)
if err != nil {
return err
}
if err = s.cli.Start(ua); err != nil {
return err
}
if err = s.srv.Start(ua); err != nil {
return err
}

srv, err := rpc.NewSIPInternalServer(s.cli, s.bus)
srv, err := rpc.NewSIPInternalServer(s.psrpcServer, s.bus)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}
src := req.Source()

username, password, err := s.authenticationHandler(from.Address.User, to.Address.User, src)
username, password, err := s.handler.HandleTrunkAuthentication(from.Address.User, to.Address.User, src)
if err != nil {
sipErrorResponse(tx, req)
return
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead
func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction) {
// Send initial request. In the best case scenario, we will immediately get a room name to join.
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.src, "", false)
roomName, identity, requirePin, rejectInvite := c.s.handler.HandleDispatchRules(c.from.Address.User, c.to.Address.User, c.src, "", false)
if rejectInvite {
sipErrorResponse(tx, req)
return
Expand Down Expand Up @@ -226,7 +226,7 @@ func (c *inboundCall) pinPrompt() {
noPin = pin == ""

log.Printf("Checking Pin for SIP call %q -> %q = %q (noPin = %v)\n", c.from.Address.User, c.to.Address.User, pin, noPin)
roomName, identity, requirePin, reject := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.src, pin, noPin)
roomName, identity, requirePin, reject := c.s.handler.HandleDispatchRules(c.from.Address.User, c.to.Address.User, c.src, pin, noPin)
if reject || requirePin || roomName == "" {
c.playAudio(c.s.res.wrongPin)
c.Close()
Expand Down
37 changes: 24 additions & 13 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ var (
)

type (
authenticationHandlerFunc func(from, to, srcAddress string) (username, password string, err error)
dispatchRuleHandlerFunc func(callingNumber, calledNumber, srcAddress, pin string, skipPin bool) (joinRoom, identity string, pinRequired, hangup bool)

Server struct {
sipSrv *sipgo.Server
publicIp string
Expand All @@ -50,9 +47,8 @@ type (
cmu sync.RWMutex
activeCalls map[string]*inboundCall

authenticationHandler authenticationHandlerFunc
dispatchRuleHandler dispatchRuleHandlerFunc
conf *config.Config
handler ServerHandler
conf *config.Config

res mediaRes
}
Expand All @@ -63,19 +59,34 @@ type (
}
)

func NewServer(conf *config.Config, authenticationHandler authenticationHandlerFunc, dispatchRuleHandler dispatchRuleHandlerFunc) *Server {
type AuthHandler interface {
HandleTrunkAuthentication(from, to, srcAddress string) (username, password string, err error)
}

type DispatchRuleHandler interface {
HandleDispatchRules(callingNumber, calledNumber, srcAddress string, pin string, noPin bool) (joinRoom, identity string, requestPin, rejectInvite bool)
}

type ServerHandler interface {
AuthHandler
DispatchRuleHandler
}

func NewServer(conf *config.Config) *Server {
s := &Server{
conf: conf,
publicIp: getPublicIP(),
activeCalls: make(map[string]*inboundCall),
inProgressInvites: []*inProgressInvite{},
authenticationHandler: authenticationHandler,
dispatchRuleHandler: dispatchRuleHandler,
conf: conf,
publicIp: getPublicIP(),
activeCalls: make(map[string]*inboundCall),
inProgressInvites: []*inProgressInvite{},
}
s.initMediaRes()
return s
}

func (s *Server) SetHandler(handler ServerHandler) {
s.handler = handler
}

func getTagValue(req *sip.Request) (string, error) {
from, ok := req.From()
if !ok {
Expand Down
76 changes: 76 additions & 0 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sip

import (
"log"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/version"
)

type Service struct {
cli *Client
srv *Server
}

func NewService(conf *config.Config) (*Service, error) {
cli := NewClient(conf)
s := &Service{
cli: cli,
}
s.srv = NewServer(conf)
return s, nil
}

func (s *Service) Stop(kill bool) {
if kill {
if err := s.cli.Stop(); err != nil {
log.Println(err)
}
if err := s.srv.Stop(); err != nil {
log.Println(err)
}
}
}

func (s *Service) SetServerHandler(handler ServerHandler) {
s.srv.SetHandler(handler)
}

func (s *Service) InternalServerImpl() rpc.SIPInternalServerImpl {
return s.cli
}

func (s *Service) Start() error {
logger.Debugw("starting sip service", "version", version.Version)
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
if err = s.cli.Start(ua); err != nil {
return err
}
if err = s.srv.Start(ua); err != nil {
return err
}
logger.Debugw("sip service ready")
return nil
}

0 comments on commit ca0f9c3

Please sign in to comment.