Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: layer2 websocket #4753

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/layer2"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
"github.com/ethersphere/bee/v2/pkg/pingpong"
Expand Down Expand Up @@ -170,6 +171,7 @@ type Service struct {

topologyDriver topology.Driver
p2p p2p.DebugService
l2p2p layer2.IP2pService
accounting accounting.Interface
chequebook chequebook.Service
pseudosettle settlement.Interface
Expand Down Expand Up @@ -249,6 +251,7 @@ type ExtraOptions struct {
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
Layer2P2p layer2.IP2pService
}

func New(
Expand Down Expand Up @@ -337,6 +340,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.lightNodes = e.LightNodes
s.pseudosettle = e.Pseudosettle
s.blockTime = e.BlockTime
s.l2p2p = e.Layer2P2p

s.statusSem = semaphore.NewWeighted(1)
s.postageSem = semaphore.NewWeighted(1)
Expand Down
79 changes: 79 additions & 0 deletions pkg/api/layer2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/layer2"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
)

func (s *Service) layer2WsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("layer2").Build()

paths := struct {
StreamName string `map:"streamName" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

headers := struct {
KeepAlive time.Duration `map:"Swarm-Keep-Alive"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

if s.beeMode == DevMode {
logger.Warning("layer2 endpoint is disabled in dev mode")
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkWithSpanSize * 128,
WriteBufferSize: swarm.ChunkWithSpanSize * 128,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Debug("upgrade failed", "error", err)
logger.Error(nil, "upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

pingPeriod := headers.KeepAlive * time.Second

Check failure on line 57 in pkg/api/layer2.go

View workflow job for this annotation

GitHub Actions / Lint

Multiplication of durations: `headers.KeepAlive * time.Second` (durationcheck)
if pingPeriod == 0 {
pingPeriod = 30 * time.Second
}

logger.Info("pingPeriod", pingPeriod)
ctx, cancel := context.WithCancel(context.Background())
protocolService := s.l2p2p.GetProtocol(ctx, paths.StreamName)
err = s.p2p.AddProtocol(protocolService.Protocol())
if err != nil {
logger.Error(err, "upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

s.wsWg.Add(1)
go func() {
layer2.ListeningWs(ctx, conn, layer2.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, protocolService)
_ = conn.Close()
s.wsWg.Done()
cancel()
}()
}
4 changes: 4 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (s *Service) mountAPI() {
})),
)

handle("/layer2/subscribe/{streamName}", web.ChainHandlers(
web.FinalHandlerFunc(s.layer2WsHandler),
))

handle("/pss/subscribe/{topic}", web.ChainHandlers(
web.FinalHandlerFunc(s.pssWsHandler),
))
Expand Down
212 changes: 212 additions & 0 deletions pkg/layer2/layer2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package pingpong exposes the simple ping-pong protocol
// which measures round-trip-time with other peers.
package layer2

import (
"context"
"encoding/binary"
"fmt"
"reflect"
"sync"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
"github.com/ethersphere/bee/v2/pkg/swarm"
"golang.org/x/sync/errgroup"
)

const (
loggerName = "layer2"
protocolName = "layer2"
p2pMessageHeader = "layer2-message-data"
protocolVersion = "1.0.0"
)

var p2pMessageHeaderBytes = []byte(p2pMessageHeader)

Check failure on line 29 in pkg/layer2/layer2.go

View workflow job for this annotation

GitHub Actions / Lint

var `p2pMessageHeaderBytes` is unused (unused)

// connection is an active p2p connection with a node with all layer 2 handlers and write interface
type connection struct {
stream p2p.Stream
handlers []msgHandler

Check failure on line 34 in pkg/layer2/layer2.go

View workflow job for this annotation

GitHub Actions / Lint

field `handlers` is unused (unused)
}

func (conn *connection) SendMessage(ctx context.Context, data []byte) error {
c := 0
lenBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lenBytes, uint32(len(data)))
n, err := conn.stream.Write(lenBytes)

Check failure on line 41 in pkg/layer2/layer2.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to n (ineffassign)
if err != nil {
return err
}

for c < len(data) {
n, err = conn.stream.Write(data[c:])
if err != nil {
return err
}
c += n
}

return nil
}

type msgHandler func(swarm.Address, []byte)

type protocolService struct {
mu sync.RWMutex

streamName string
streamer p2p.Streamer
logger log.Logger
conns map[[32]byte]*connection // byte overlay to connections
handlers []msgHandler
}

func NewProtocolService(streamName string, streamer p2p.Streamer, logger log.Logger) protocolService {
return protocolService{
streamName: streamName,
streamer: streamer,
logger: logger.WithName(loggerName + "_" + streamName).Register(),
conns: make(map[[32]byte]*connection),
handlers: make([]msgHandler, 0),
}
}

// GetConnection creates stream to the passed peer address
func (s *protocolService) GetConnection(ctx context.Context, overlay swarm.Address) (conn connection, err error) {
s.mu.Lock()
defer s.mu.Unlock()

if v, ok := s.conns[[32]byte(overlay.Bytes())]; ok {
conn = *v
} else {
stream, err := s.streamer.NewStream(ctx, overlay, nil, protocolName, protocolVersion, s.streamName)
if err != nil {
return connection{}, err
}
conn = connection{
stream: stream,
}
s.conns[[32]byte(overlay.Bytes())] = &conn
}

return conn, nil
}

func (p *protocolService) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: p.streamName,
Handler: p.handler,
},
},
}
}

func (p *protocolService) Broadcast(ctx context.Context, msg []byte) {
p.mu.Lock()
defer p.mu.Unlock()

group := errgroup.Group{}
for _, conn := range p.conns {
conn := conn
group.Go(func() error { return conn.SendMessage(ctx, msg) })
}

if err := group.Wait(); err != nil {
p.logger.Error(err, "broadcasting L2 message")
}
}

func (p *protocolService) AddHandler(handler msgHandler) {
p.mu.Lock()
defer p.mu.Unlock()

p.handlers = append(p.handlers, handler)
}

func (p *protocolService) RemoveHandler(handler msgHandler) {
p.mu.Lock()
defer p.mu.Unlock()
handlerValue := reflect.ValueOf(handler).Pointer()

for i, f := range p.handlers {
if reflect.ValueOf(f).Pointer() == handlerValue {
p.handlers = append(p.handlers[:i], p.handlers[i+1:]...)
return
}
}
}

// handler handles incoming messages from all connected peers and calls registered hook functions
func (s *protocolService) handler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
for {
msgSizeBytes := make([]byte, 4)
n, err := stream.Read(msgSizeBytes)
if err != nil {
s.logger.Error(err, "l2 protocol read")
return err
}
if n != 4 {
return fmt.Errorf("couldn't read 4 bytes for msgSize")
}
msgSize := binary.BigEndian.Uint32(msgSizeBytes)
msg := make([]byte, msgSize)
var c int
for c < int(msgSize) {
n, err := stream.Read(msg[c:])
if err != nil {
return err
}
c += n
}
s.mu.Lock()
for _, handler := range s.handlers {
go handler(peer.Address, msg)
}
s.mu.Unlock()
}
}

type IP2pService interface {
GetProtocol(ctx context.Context, streamName string) (protocol *protocolService)
}

type P2pService struct {
IP2pService
mu sync.RWMutex

streamer p2p.Streamer
logger log.Logger
protocols map[string]*protocolService // protocolName to its instance
}

func NewP2pService(streamer p2p.Streamer, logger log.Logger) P2pService {
return P2pService{
streamer: streamer,
logger: logger,
protocols: make(map[string]*protocolService),
}
}

// GetService either creates protocolService or returns initiated one
func (s *P2pService) GetProtocol(ctx context.Context, streamName string) (protocol *protocolService) {
s.mu.Lock()
defer s.mu.Unlock()

if v, ok := s.protocols[streamName]; ok {
protocol = v
} else {
p := NewProtocolService(streamName, s.streamer, s.logger)
protocol = &p
}

return protocol
}
Loading
Loading