From c2082a84ce50342bf08794af34e22c85bfe097a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Fri, 9 Aug 2024 16:09:22 +0200 Subject: [PATCH 1/4] feat: layer2 websocket --- pkg/api/api.go | 4 + pkg/api/layer2.go | 65 ++++++++++++++ pkg/api/router.go | 4 + pkg/layer2/layer2.go | 198 +++++++++++++++++++++++++++++++++++++++++++ pkg/layer2/ws.go | 147 ++++++++++++++++++++++++++++++++ pkg/node/node.go | 4 + 6 files changed, 422 insertions(+) create mode 100644 pkg/api/layer2.go create mode 100644 pkg/layer2/layer2.go create mode 100644 pkg/layer2/ws.go diff --git a/pkg/api/api.go b/pkg/api/api.go index f5db1459623..63292dee00c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -33,6 +33,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/file/pipeline/builder" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/layer2" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/pingpong" @@ -170,6 +171,7 @@ type Service struct { topologyDriver topology.Driver p2p p2p.DebugService + l2p2p layer2.IP2pService accounting accounting.Interface chequebook chequebook.Service pseudosettle settlement.Interface @@ -249,6 +251,7 @@ type ExtraOptions struct { SyncStatus func() (bool, error) NodeStatus *status.Service PinIntegrity PinIntegrity + Layer2P2p layer2.IP2pService } func New( @@ -337,6 +340,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti s.lightNodes = e.LightNodes s.pseudosettle = e.Pseudosettle s.blockTime = e.BlockTime + s.l2p2p = e.Layer2P2p s.statusSem = semaphore.NewWeighted(1) s.postageSem = semaphore.NewWeighted(1) diff --git a/pkg/api/layer2.go b/pkg/api/layer2.go new file mode 100644 index 00000000000..2c95c87be68 --- /dev/null +++ b/pkg/api/layer2.go @@ -0,0 +1,65 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api + +import ( + "net/http" + "time" + + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/layer2" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "golang.org/x/net/context" +) + +func (s *Service) layer2WsHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("layer2").Build() + + paths := struct { + StreamName string `map:"streamName" validate:"required"` + }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) + return + } + + if s.beeMode == DevMode { + logger.Warning("layer2 endpoint is disabled in dev mode") + jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) + } + + upgrader := websocket.Upgrader{ + ReadBufferSize: swarm.ChunkWithSpanSize * 128, + WriteBufferSize: swarm.ChunkWithSpanSize * 128, + CheckOrigin: s.checkOrigin, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Debug("upgrade failed", "error", err) + logger.Error(nil, "upgrade failed") + jsonhttp.InternalServerError(w, "upgrade failed") + return + } + + pingPeriod := 100 * time.Second //TODO pass it in header + ctx, cancel := context.WithCancel(context.Background()) + protocolService := s.l2p2p.GetProtocol(ctx, paths.StreamName) + err = s.p2p.AddProtocol(protocolService.Protocol()) + if err != nil { + logger.Error(err, "upgrade failed") + jsonhttp.InternalServerError(w, "upgrade failed") + return + } + + s.wsWg.Add(1) + go func() { + layer2.ListeningWs(ctx, conn, layer2.WsOptions{PingPeriod: pingPeriod}, logger, protocolService) + s.wsWg.Done() + cancel() + }() +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 2fc2b9afcd1..175c04dffb0 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -287,6 +287,10 @@ func (s *Service) mountAPI() { })), ) + handle("/layer2/subscribe/{streamName}", web.ChainHandlers( + web.FinalHandlerFunc(s.layer2WsHandler), + )) + handle("/pss/subscribe/{topic}", web.ChainHandlers( web.FinalHandlerFunc(s.pssWsHandler), )) diff --git a/pkg/layer2/layer2.go b/pkg/layer2/layer2.go new file mode 100644 index 00000000000..2e970b32992 --- /dev/null +++ b/pkg/layer2/layer2.go @@ -0,0 +1,198 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pingpong exposes the simple ping-pong protocol +// which measures round-trip-time with other peers. +package layer2 + +import ( + "context" + "encoding/binary" + "fmt" + "sync" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/swarm" + "golang.org/x/sync/errgroup" +) + +const ( + loggerName = "layer2" + protocolName = "layer2" + p2pMessageHeader = "layer2-message-data" + protocolVersion = "1.0.0" +) + +var p2pMessageHeaderBytes = []byte(p2pMessageHeader) + +// connection is an active p2p connection with a node with all layer 2 handlers and write interface +type connection struct { + stream p2p.Stream + handlers []msgHandler +} + +func (conn *connection) SendMessage(ctx context.Context, data []byte) error { + c := 0 + lenBytes := make([]byte, 4) + binary.BigEndian.PutUint32(lenBytes, uint32(len(data))) + n, err := conn.stream.Write(lenBytes) + if err != nil { + return err + } + + for c < len(data) { + n, err = conn.stream.Write(data[c:]) + if err != nil { + return err + } + c += n + } + + return nil +} + +type msgHandler func(swarm.Address, []byte) + +type protocolService struct { + mu sync.RWMutex + + streamName string + streamer p2p.Streamer + logger log.Logger + conns map[[32]byte]*connection // byte overlay to connections + handlers []msgHandler +} + +func NewProtocolService(streamName string, streamer p2p.Streamer, logger log.Logger) protocolService { + return protocolService{ + streamName: streamName, + streamer: streamer, + logger: logger.WithName(loggerName + "_" + streamName).Register(), + conns: make(map[[32]byte]*connection), + handlers: make([]msgHandler, 0), + } +} + +// GetConnection creates stream to the passed peer address +func (s *protocolService) GetConnection(ctx context.Context, overlay swarm.Address) (conn connection, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if v, ok := s.conns[[32]byte(overlay.Bytes())]; ok { + conn = *v + } else { + stream, err := s.streamer.NewStream(ctx, overlay, nil, protocolName, protocolVersion, s.streamName) + if err != nil { + return connection{}, err + } + conn = connection{ + stream: stream, + } + s.conns[[32]byte(overlay.Bytes())] = &conn + } + + return conn, nil +} + +func (p *protocolService) Protocol() p2p.ProtocolSpec { + return p2p.ProtocolSpec{ + Name: protocolName, + Version: protocolVersion, + StreamSpecs: []p2p.StreamSpec{ + { + Name: p.streamName, + Handler: p.handler, + }, + }, + } +} + +func (p *protocolService) Broadcast(ctx context.Context, msg []byte) { + p.mu.Lock() + defer p.mu.Unlock() + + group := errgroup.Group{} + for _, conn := range p.conns { + conn := conn + group.Go(func() error { return conn.SendMessage(ctx, msg) }) + } + + if err := group.Wait(); err != nil { + p.logger.Error(err, "broadcasting L2 message") + } +} + +func (p *protocolService) AddHandler(handler msgHandler) { + p.mu.Lock() + defer p.mu.Unlock() + + p.handlers = append(p.handlers, handler) +} + +// handler handles incoming messages from all connected peers and calls registered hook functions +func (s *protocolService) handler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { + for { + msgSizeBytes := make([]byte, 4) + n, err := stream.Read(msgSizeBytes) + if err != nil { + s.logger.Error(err, "l2 protocol read") + return err + } + if n != 4 { + return fmt.Errorf("couldn't read 4 bytes for msgSize") + } + msgSize := binary.BigEndian.Uint32(msgSizeBytes) + msg := make([]byte, msgSize) + var c int + for c < int(msgSize) { + n, err := stream.Read(msg[c:]) + if err != nil { + return err + } + c += n + } + s.mu.Lock() + for _, handler := range s.handlers { + go handler(peer.Address, msg) + } + s.mu.Unlock() + } +} + +type IP2pService interface { + GetProtocol(ctx context.Context, streamName string) (protocol *protocolService) +} + +type P2pService struct { + IP2pService + mu sync.RWMutex + + streamer p2p.Streamer + logger log.Logger + protocols map[string]*protocolService // protocolName to its instance +} + +func NewP2pService(streamer p2p.Streamer, logger log.Logger) P2pService { + return P2pService{ + streamer: streamer, + logger: logger, + protocols: make(map[string]*protocolService), + } +} + +// GetService either creates protocolService or returns initiated one +func (s *P2pService) GetProtocol(ctx context.Context, streamName string) (protocol *protocolService) { + s.mu.Lock() + defer s.mu.Unlock() + + if v, ok := s.protocols[streamName]; ok { + protocol = v + } else { + p := NewProtocolService(streamName, s.streamer, s.logger) + protocol = &p + } + + return protocol +} diff --git a/pkg/layer2/ws.go b/pkg/layer2/ws.go new file mode 100644 index 00000000000..02845970ea8 --- /dev/null +++ b/pkg/layer2/ws.go @@ -0,0 +1,147 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pingpong exposes the simple ping-pong protocol +// which measures round-trip-time with other peers. +package layer2 + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/gorilla/websocket" +) + +type WsOptions struct { + PingPeriod time.Duration +} + +type actionType uint8 + +const ( + atUnicast actionType = iota + atBroadcast + // TODO atConnect + // TODO atDisconnect +) + +type responseType uint8 + +const ( + atMsg responseType = iota +) + +func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, logger log.Logger, protocolService *protocolService) { + var ( + ticker = time.NewTicker(options.PingPeriod) + writeDeadline = options.PingPeriod + 100*time.Millisecond // write deadline. should be smaller than the shutdown timeout on api close + readDeadline = options.PingPeriod + 100*time.Millisecond // write deadline. should be smaller than the shutdown timeout on api close + err error + ) + + conn.SetCloseHandler(func(code int, text string) error { + logger.Debug("L2 ws: client gone", "protocol", protocolService.streamName, "code", code, "message", text) + // TODO remove handler + return nil + }) + + protocolService.AddHandler(func(a swarm.Address, b []byte) { + space := byte(' ') + msg := append([]byte{byte(atMsg + '0'), space}, ([]byte(a.String()))...) + msg = append(msg, append([]byte{space}, b...)...) + err := conn.WriteMessage(1, msg) + if err != nil { + logger.Error(err, "L2 ws write message") + } + // TODO messagetype 2 + }) + + go func() { + for { + err = conn.SetReadDeadline(time.Now().Add(readDeadline)) + if err != nil { + logger.Debug("L2 ws: set write deadline failed", "error", err) + return + } + messageType, p, err := conn.ReadMessage() + if err != nil { + logger.Error(err, "L2 ws read message") + return + } + if messageType == 1 { + action := actionType(p[0] - '0') + offset := 2 // + 1 delimeter + if action == atUnicast { + overlayBytes := p[offset : swarm.HashSize*2+offset] + overlay := swarm.NewAddress(common.HexToHash(string(overlayBytes)).Bytes()) + offset += swarm.HashSize*2 + 1 // + 1 delimeter + msg := p[offset:] + conn, err := protocolService.GetConnection(ctx, overlay) + if err != nil { + logger.Error(err, "L2 get connection") + } + err = conn.SendMessage(ctx, msg) + if err != nil { + logger.Error(err, "L2 write message") + } + } + } else if messageType == 2 { + action := actionType(p[0]) + offset := 1 + if action == atUnicast { + overlayBytes := p[offset : swarm.HashSize+offset] + overlay := swarm.NewAddress(overlayBytes) + offset += swarm.HashSize + msg := p[offset:] + conn, err := protocolService.GetConnection(ctx, overlay) + if err != nil { + logger.Error(err, "L2 get connection") + } + err = conn.SendMessage(ctx, msg) + if err != nil { + logger.Error(err, "L2 write message") + } + } else if action == atBroadcast { + msg := p[offset:] + + protocolService.Broadcast(ctx, msg) + } + } + } + }() + + defer func() { + ticker.Stop() + _ = conn.Close() + }() + + for { + err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + logger.Debug("L2 ws: set write deadline failed", "error", err) + return + } + select { + case <-ctx.Done(): + err = conn.WriteMessage(websocket.CloseMessage, []byte{}) + if err != nil { + logger.Debug("L2 ws: write close message failed", "error", err) + } + return + case <-ticker.C: + err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + logger.Debug("L2 ws: set write deadline failed", "error", err) + return + } + if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil { + // error encountered while pinging client. client probably gone + return + } + } + } +} diff --git a/pkg/node/node.go b/pkg/node/node.go index d2da0218179..0a32e909cb7 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -31,6 +31,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/feeds/factory" "github.com/ethersphere/bee/v2/pkg/hive" + "github.com/ethersphere/bee/v2/pkg/layer2" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" "github.com/ethersphere/bee/v2/pkg/p2p" @@ -668,6 +669,8 @@ func NewBee( return nil, err } + l2P2p := layer2.NewP2pService(p2ps, logger) + // Construct protocols. pingPong := pingpong.New(p2ps, logger, tracer) @@ -1068,6 +1071,7 @@ func NewBee( SyncStatus: syncStatusFn, NodeStatus: nodeStatus, PinIntegrity: localStore.PinIntegrity(), + Layer2P2p: &l2P2p, } if o.APIAddr != "" { From 622b49ddf512918ec2811a105fdcc34ac30c54ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 12 Aug 2024 14:07:54 +0200 Subject: [PATCH 2/4] feat: broadcast write --- pkg/layer2/ws.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/layer2/ws.go b/pkg/layer2/ws.go index 02845970ea8..fa8cdfff6fa 100644 --- a/pkg/layer2/ws.go +++ b/pkg/layer2/ws.go @@ -25,8 +25,6 @@ type actionType uint8 const ( atUnicast actionType = iota atBroadcast - // TODO atConnect - // TODO atDisconnect ) type responseType uint8 @@ -88,6 +86,9 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l if err != nil { logger.Error(err, "L2 write message") } + } else if action == atBroadcast { + msg := p[offset:] + protocolService.Broadcast(ctx, msg) } } else if messageType == 2 { action := actionType(p[0]) From 7021be82efc3b2afd05f8e8274e4dc0b91aeb30e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 12 Aug 2024 14:33:28 +0200 Subject: [PATCH 3/4] feat: removeHandler and byte based response --- pkg/layer2/layer2.go | 14 ++++++++++++++ pkg/layer2/ws.go | 37 +++++++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/pkg/layer2/layer2.go b/pkg/layer2/layer2.go index 2e970b32992..ca74957dba5 100644 --- a/pkg/layer2/layer2.go +++ b/pkg/layer2/layer2.go @@ -10,6 +10,7 @@ import ( "context" "encoding/binary" "fmt" + "reflect" "sync" "github.com/ethersphere/bee/v2/pkg/log" @@ -131,6 +132,19 @@ func (p *protocolService) AddHandler(handler msgHandler) { p.handlers = append(p.handlers, handler) } +func (p *protocolService) RemoveHandler(handler msgHandler) { + p.mu.Lock() + defer p.mu.Unlock() + handlerValue := reflect.ValueOf(handler).Pointer() + + for i, f := range p.handlers { + if reflect.ValueOf(f).Pointer() == handlerValue { + p.handlers = append(p.handlers[:i], p.handlers[i+1:]...) + return + } + } +} + // handler handles incoming messages from all connected peers and calls registered hook functions func (s *protocolService) handler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { for { diff --git a/pkg/layer2/ws.go b/pkg/layer2/ws.go index fa8cdfff6fa..ac4dd82feb5 100644 --- a/pkg/layer2/ws.go +++ b/pkg/layer2/ws.go @@ -8,6 +8,7 @@ package layer2 import ( "context" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -41,23 +42,34 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l err error ) + respMessageType := atomic.Uint32{} // 1 textbase, 2 bytebase + respMessageType.Store(1) + protocolListener := func(a swarm.Address, b []byte) { + if respMessageType.Load() == 1 { + space := byte(' ') + msg := append([]byte{byte(atMsg + '0'), space}, ([]byte(a.String()))...) + msg = append(msg, append([]byte{space}, b...)...) + err := conn.WriteMessage(1, msg) + if err != nil { + logger.Error(err, "L2 ws write message") + } + } else { + msg := append([]byte{byte(atMsg + '0')}, a.Bytes()...) + msg = append(msg, b...) + err := conn.WriteMessage(2, msg) + if err != nil { + logger.Error(err, "L2 ws write message") + } + } + } + + protocolService.AddHandler(protocolListener) conn.SetCloseHandler(func(code int, text string) error { logger.Debug("L2 ws: client gone", "protocol", protocolService.streamName, "code", code, "message", text) - // TODO remove handler + protocolService.RemoveHandler(protocolListener) return nil }) - protocolService.AddHandler(func(a swarm.Address, b []byte) { - space := byte(' ') - msg := append([]byte{byte(atMsg + '0'), space}, ([]byte(a.String()))...) - msg = append(msg, append([]byte{space}, b...)...) - err := conn.WriteMessage(1, msg) - if err != nil { - logger.Error(err, "L2 ws write message") - } - // TODO messagetype 2 - }) - go func() { for { err = conn.SetReadDeadline(time.Now().Add(readDeadline)) @@ -70,6 +82,7 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l logger.Error(err, "L2 ws read message") return } + respMessageType.Store(uint32(messageType)) if messageType == 1 { action := actionType(p[0] - '0') offset := 2 // + 1 delimeter From 4098ad21cf92b495fd9b4d5434a864a9e0013536 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 12 Aug 2024 17:20:19 +0200 Subject: [PATCH 4/4] feat: timeout --- pkg/api/layer2.go | 18 ++++++++++++++++-- pkg/layer2/ws.go | 8 +++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pkg/api/layer2.go b/pkg/api/layer2.go index 2c95c87be68..62f1bb16b48 100644 --- a/pkg/api/layer2.go +++ b/pkg/api/layer2.go @@ -27,6 +27,14 @@ func (s *Service) layer2WsHandler(w http.ResponseWriter, r *http.Request) { return } + headers := struct { + KeepAlive time.Duration `map:"Swarm-Keep-Alive"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + if s.beeMode == DevMode { logger.Warning("layer2 endpoint is disabled in dev mode") jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) @@ -46,7 +54,12 @@ func (s *Service) layer2WsHandler(w http.ResponseWriter, r *http.Request) { return } - pingPeriod := 100 * time.Second //TODO pass it in header + pingPeriod := headers.KeepAlive * time.Second + if pingPeriod == 0 { + pingPeriod = 30 * time.Second + } + + logger.Info("pingPeriod", pingPeriod) ctx, cancel := context.WithCancel(context.Background()) protocolService := s.l2p2p.GetProtocol(ctx, paths.StreamName) err = s.p2p.AddProtocol(protocolService.Protocol()) @@ -58,7 +71,8 @@ func (s *Service) layer2WsHandler(w http.ResponseWriter, r *http.Request) { s.wsWg.Add(1) go func() { - layer2.ListeningWs(ctx, conn, layer2.WsOptions{PingPeriod: pingPeriod}, logger, protocolService) + layer2.ListeningWs(ctx, conn, layer2.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, protocolService) + _ = conn.Close() s.wsWg.Done() cancel() }() diff --git a/pkg/layer2/ws.go b/pkg/layer2/ws.go index ac4dd82feb5..7df16f0e218 100644 --- a/pkg/layer2/ws.go +++ b/pkg/layer2/ws.go @@ -19,6 +19,7 @@ import ( type WsOptions struct { PingPeriod time.Duration + Cancel context.CancelFunc } type actionType uint8 @@ -75,12 +76,12 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l err = conn.SetReadDeadline(time.Now().Add(readDeadline)) if err != nil { logger.Debug("L2 ws: set write deadline failed", "error", err) - return + break } messageType, p, err := conn.ReadMessage() if err != nil { logger.Error(err, "L2 ws read message") - return + break } respMessageType.Store(uint32(messageType)) if messageType == 1 { @@ -94,6 +95,7 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l conn, err := protocolService.GetConnection(ctx, overlay) if err != nil { logger.Error(err, "L2 get connection") + break } err = conn.SendMessage(ctx, msg) if err != nil { @@ -126,6 +128,7 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l } } } + options.Cancel() }() defer func() { @@ -147,7 +150,6 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l } return case <-ticker.C: - err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) if err != nil { logger.Debug("L2 ws: set write deadline failed", "error", err) return