Skip to content

Commit

Permalink
Use HTTP 202 (Accepted) for API instead of HTTP 200 (Ok)
Browse files Browse the repository at this point in the history
  • Loading branch information
louisroyer committed Dec 17, 2024
1 parent 72fd067 commit a46e5bb
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 28 deletions.
13 changes: 13 additions & 0 deletions internal/amf/amf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Amf struct {
smf *smf.Smf
srv *http.Server
closed chan struct{}

// not exported because must not be modified
ctx context.Context
}

func NewAmf(bindAddr netip.AddrPort, control jsonapi.ControlURI, userAgent string, smf *smf.Smf) *Amf {
Expand Down Expand Up @@ -56,6 +59,9 @@ func NewAmf(bindAddr netip.AddrPort, control jsonapi.ControlURI, userAgent strin
}

func (amf *Amf) Start(ctx context.Context) error {
if ctx == nil {
return ErrNilCtx
}
l, err := net.Listen("tcp", amf.srv.Addr)
if err != nil {
return err
Expand Down Expand Up @@ -97,3 +103,10 @@ func Status(c *gin.Context) {
c.Header("Cache-Control", "no-cache")
c.JSON(http.StatusOK, status)
}

func (amf *Amf) Context() context.Context {
if amf.ctx != nil {
return amf.ctx
}
return context.Background()
}
14 changes: 14 additions & 0 deletions internal/amf/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2024 Louis Royer and the NextMN contributors. All rights reserved.
// Use of this source code is governed by a MIT-style license that can be
// found in the LICENSE file.
// SPDX-License-Identifier: MIT

package amf

import (
"errors"
)

var (
ErrNilCtx = errors.New("nil context")
)
24 changes: 13 additions & 11 deletions internal/amf/establishment_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ func (amf *Amf) EstablishmentRequest(c *gin.Context) {
"gnb": ps.Gnb.String(),
"dnn": ps.Dnn,
}).Info("New PDU Session establishment Request")
go amf.HandleEstablishmentRequest(ps)
c.JSON(http.StatusAccepted, jsonapi.Message{Message: "please refer to logs for more information"})
}

pduSession, err := amf.smf.CreateSessionUplink(c, ps.Ue, ps.Gnb, ps.Dnn)
func (amf *Amf) HandleEstablishmentRequest(ps n1n2.PduSessionEstabReqMsg) {
ctx := amf.Context()
// TODO: use ctx.WithTimeout()
pduSession, err := amf.smf.CreateSessionUplinkContext(ctx, ps.Ue, ps.Gnb, ps.Dnn)
if err != nil {
c.JSON(http.StatusInternalServerError, jsonapi.MessageWithError{Message: "could not create pdu session uplink", Error: err})
return
logrus.WithError(err).Error("Could not create PDU Session Uplink")
}

// send PseAccept to UE
Expand All @@ -48,20 +53,17 @@ func (amf *Amf) EstablishmentRequest(c *gin.Context) {
}
reqBody, err := json.Marshal(n2PsReq)
if err != nil {
c.JSON(http.StatusInternalServerError, jsonapi.MessageWithError{Message: "could not marshal json", Error: err})
logrus.WithError(err).Error("Could not marshal n1n2.N2PduSessionReqMsg")
return
}
req, err := http.NewRequestWithContext(c, http.MethodPost, ps.Gnb.JoinPath("ps/n2-establishment-request").String(), bytes.NewBuffer(reqBody))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ps.Gnb.JoinPath("ps/n2-establishment-request").String(), bytes.NewBuffer(reqBody))
if err != nil {
c.JSON(http.StatusInternalServerError, jsonapi.MessageWithError{Message: "could not create request", Error: err})
logrus.WithError(err).Error("Could not create request for ps/n2-establishment-request")
return
}
req.Header.Set("User-Agent", amf.userAgent)
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
resp, err := amf.client.Do(req)
if err != nil {
c.JSON(http.StatusInternalServerError, jsonapi.MessageWithError{Message: "no http response", Error: err})
return
if _, err := amf.client.Do(req); err != nil {
logrus.WithError(err).Error("Could not send ps/n2-establishment-request")
}
defer resp.Body.Close()
}
9 changes: 7 additions & 2 deletions internal/amf/n2_establishment_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ func (amf *Amf) N2EstablishmentResponse(c *gin.Context) {
c.JSON(http.StatusBadRequest, jsonapi.MessageWithError{Message: "could not deserialize", Error: err})
return
}
pduSession, err := amf.smf.CreateSessionDownlink(c, ps.UeInfo.Header.Ue, ps.UeInfo.Header.Dnn, ps.Gnb, ps.DownlinkTeid)
go amf.HandleN2EstablishmentResponse(ps)
c.JSON(http.StatusAccepted, jsonapi.Message{Message: "please refer to logs for more information"})
}

func (amf *Amf) HandleN2EstablishmentResponse(ps n1n2.N2PduSessionRespMsg) {
ctx := amf.Context()
pduSession, err := amf.smf.CreateSessionDownlinkContext(ctx, ps.UeInfo.Header.Ue, ps.UeInfo.Header.Dnn, ps.Gnb, ps.DownlinkTeid)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"ue-ip-addr": ps.UeInfo.Addr,
"ue": ps.UeInfo.Header.Ue,
"gnb": ps.UeInfo.Header.Gnb,
"dnn": ps.UeInfo.Header.Dnn,
}).Error("could not create downlink path")
c.JSON(http.StatusInternalServerError, jsonapi.MessageWithError{Message: "could not create downlink path", Error: err})
return
}
logrus.WithFields(logrus.Fields{
Expand Down
4 changes: 4 additions & 0 deletions internal/smf/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ var (
ErrInterfaceNotFound = errors.New("interface not found")
ErrNoPFCPRule = errors.New("no PFCP rule to push")
ErrNoIpAvailableInPool = errors.New("no IP address available in pool")

ErrNilCtx = errors.New("nil context")
ErrSmfNotStarted = errors.New("SMF not started")
ErrSmfAlreadyStarted = errors.New("SMF already started")
)
78 changes: 71 additions & 7 deletions internal/smf/smf.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Smf struct {
srv *pfcp.PFCPEntityCP
started bool
closed chan struct{}

// not exported because must not be modified
ctx context.Context
}

func NewSmf(addr netip.Addr, slices map[string]config.Slice) *Smf {
Expand All @@ -37,13 +40,24 @@ func NewSmf(addr netip.Addr, slices map[string]config.Slice) *Smf {
slices: s,
upfs: upfs,
closed: make(chan struct{}),
ctx: nil,
}
}

func (smf *Smf) Start(ctx context.Context) error {
if smf.started {
return ErrSmfAlreadyStarted
}
if ctx == nil {
return ErrNilCtx
}
smf.ctx = ctx
logrus.Info("Starting PFCP Server")
go func() {
defer close(smf.closed)
defer func() {
smf.started = false
close(smf.closed)
}()
if err := smf.srv.ListenAndServeContext(ctx); err != nil {
logrus.WithError(err).Info("PFCP server stopped")
}
Expand All @@ -66,7 +80,10 @@ func (smf *Smf) Start(ctx context.Context) error {
failure = err
return false
}
upf.Associate(association)
if err := upf.Associate(ctx, association); err != nil {
failure = err
return false
}
return true
})
if failure != nil {
Expand All @@ -77,7 +94,33 @@ func (smf *Smf) Start(ctx context.Context) error {
return nil
}

func (smf *Smf) CreateSessionDownlink(ctx context.Context, ueCtrl jsonapi.ControlURI, dnn string, gnb netip.Addr, gnb_teid uint32) (*PduSessionN3, error) {
func (smf *Smf) Context() context.Context {
if smf.ctx != nil {
return smf.ctx
}
return context.Background()
}

func (smf *Smf) CreateSessionDownlink(ueCtrl jsonapi.ControlURI, dnn string, gnb netip.Addr, gnb_teid uint32) (*PduSessionN3, error) {
return smf.CreateSessionDownlinkContext(smf.ctx, ueCtrl, dnn, gnb, gnb_teid)
}

func (smf *Smf) CreateSessionDownlinkContext(ctx context.Context, ueCtrl jsonapi.ControlURI, dnn string, gnb netip.Addr, gnb_teid uint32) (*PduSessionN3, error) {
if !smf.started {
return nil, ErrSmfNotStarted
}
if ctx == nil {
return nil, ErrNilCtx
}
select {
case <-ctx.Done():
// if ctx is over, abort
return nil, ctx.Err()
case <-smf.ctx.Done():
// if smf.ctx is over, abort
return nil, smf.ctx.Err()
default:
}
// check for existing session
s, ok := smf.slices.Load(dnn)
if !ok {
Expand Down Expand Up @@ -116,7 +159,7 @@ func (smf *Smf) CreateSessionDownlink(ctx context.Context, ueCtrl jsonapi.Contro
if i == len(slice.Upfs)-1 {
upf.UpdateDownlinkAnchor(session.UeIpAddr, dnn, last_fteid)
} else {
last_fteid, err = upf.UpdateDownlinkIntermediate(ctx, session.UeIpAddr, dnn, upf_iface, last_fteid)
last_fteid, err = upf.UpdateDownlinkIntermediateContext(ctx, session.UeIpAddr, dnn, upf_iface, last_fteid)
if err != nil {
return nil, err
}
Expand All @@ -127,7 +170,27 @@ func (smf *Smf) CreateSessionDownlink(ctx context.Context, ueCtrl jsonapi.Contro
}
return session, nil
}
func (smf *Smf) CreateSessionUplink(ctx context.Context, ueCtrl jsonapi.ControlURI, gnbCtrl jsonapi.ControlURI, dnn string) (*PduSessionN3, error) {

func (smf *Smf) CreateSessionUplink(ueCtrl jsonapi.ControlURI, gnbCtrl jsonapi.ControlURI, dnn string) (*PduSessionN3, error) {
return smf.CreateSessionUplinkContext(smf.ctx, ueCtrl, gnbCtrl, dnn)
}

func (smf *Smf) CreateSessionUplinkContext(ctx context.Context, ueCtrl jsonapi.ControlURI, gnbCtrl jsonapi.ControlURI, dnn string) (*PduSessionN3, error) {
if !smf.started {
return nil, ErrSmfNotStarted
}
if ctx == nil {
return nil, ErrNilCtx
}
select {
case <-ctx.Done():
// if ctx is over, abort
return nil, ctx.Err()
case <-smf.ctx.Done():
// if smf.ctx is over, abort
return nil, smf.ctx.Err()
default:
}
// check for existing session
s, ok := smf.slices.Load(dnn)
if !ok {
Expand Down Expand Up @@ -177,7 +240,7 @@ func (smf *Smf) CreateSessionUplink(ctx context.Context, ueCtrl jsonapi.ControlU
return nil, err
}
}
last_fteid, err := upfa.CreateUplinkAnchor(ctx, ueIpAddr, dnn, upfa_iface)
last_fteid, err := upfa.CreateUplinkAnchorContext(ctx, ueIpAddr, dnn, upfa_iface)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -215,7 +278,7 @@ func (smf *Smf) CreateSessionUplink(ctx context.Context, ueCtrl jsonapi.ControlU
return nil, err
}
}
last_fteid, err = upf.CreateUplinkIntermediate(ctx, ueIpAddr, dnn, upf_iface, last_fteid)
last_fteid, err = upf.CreateUplinkIntermediateContext(ctx, ueIpAddr, dnn, upf_iface, last_fteid)
if err != nil {
logrus.WithError(err).Error("Could not create uplink intermediate")
return nil, err
Expand All @@ -234,6 +297,7 @@ func (smf *Smf) CreateSessionUplink(ctx context.Context, ueCtrl jsonapi.ControlU
slice.sessions.Store(ueCtrl, &session)
return &session, nil
}

func (smf *Smf) WaitShutdown(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down
18 changes: 18 additions & 0 deletions internal/smf/teids_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
type TEIDsPool struct {
teids map[uint32]struct{}
sync.Mutex

// not exported because must not be modified
ctx context.Context
}

func NewTEIDsPool() *TEIDsPool {
Expand All @@ -22,14 +25,29 @@ func NewTEIDsPool() *TEIDsPool {
}
}

func (t *TEIDsPool) Init(ctx context.Context) error {
if ctx == nil {
return ErrNilCtx
}
t.ctx = ctx
return nil
}

// Returns next TEID from the pool.
// warning: the pool must first be initialized using `Init(ctx)`
func (t *TEIDsPool) Next(ctx context.Context) (uint32, error) {
if t.ctx == nil || ctx == nil {
return 0, ErrNilCtx
}
t.Lock()
defer t.Unlock()
var teid uint32 = 0
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-t.ctx.Done():
return 0, t.ctx.Err()
default:
teid = rand.Uint32()
if teid == 0 {
Expand Down
Loading

0 comments on commit a46e5bb

Please sign in to comment.