Skip to content

Commit

Permalink
Merge branch 'main' into message_handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel authored May 17, 2024
2 parents 4f1bc9c + b3e91a8 commit 41df235
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
4 changes: 2 additions & 2 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func desiredPubSubBaseTopics() []string {
p2p.GossipContributionAndProofMessage,
p2p.GossipSyncCommitteeMessage,
p2p.GossipBlsToExecutionChangeMessage,
// p2p.GossipBlobSidecarMessage,
p2p.GossipBlobSidecarMessage,
}
}

Expand Down Expand Up @@ -441,7 +441,7 @@ func hasSubnets(topic string) (subnets uint64, hasSubnets bool) {
}

func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string {
if subnet > 1 { // as far as I know, there aren't subnets with index 0
if subnet > 0 { // as far as I know, there aren't subnets with index 0
return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix()
} else {
return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix()
Expand Down
46 changes: 46 additions & 0 deletions eth/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler {
return p.handleSyncCommitteeMessage
case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage):
return p.handleBlsToExecutionChangeMessage
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
return p.handleBlobSidecar
default:
return p.host.TracedTopicHandler(host.NoopHandler)
}
Expand Down Expand Up @@ -428,6 +430,50 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting bls to execution change event", tele.LogAttrError(err))
}

return nil
}

func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error {
switch p.cfg.ForkVersion {
case DenebForkVersion:
var blob ethtypes.BlobSidecar
err := p.cfg.Encoder.DecodeGossip(msg.Data, &blob)
if err != nil {
slog.Error("decode blob sidecar gossip message", tele.LogAttrError(err))
return err
}

slot := blob.GetSignedBlockHeader().GetHeader().GetSlot()
slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot)
proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex()

now := time.Now()
evt := &host.TraceEvent{
Type: "HANDLE_MESSAGE",
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Slot": slot,
"ValIdx": proposerIndex,
"TimeInSlot": now.Sub(slotStart).Seconds(),
"StateRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetStateRoot()),
"BodyRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetBodyRoot()),
"ParentRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetParentRoot()),
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting topic handler event", tele.LogAttrError(err))
}
default:
return fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:])
}

return nil
Expand Down
1 change: 1 addition & 0 deletions host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) {

func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler {
return func(ctx context.Context, msg *pubsub.Message) error {
slog.Debug("Handling gossip message", "topic", msg.GetTopic())
evt := &TraceEvent{
Type: "HANDLE_MESSAGE",
PeerID: h.ID(),
Expand Down

0 comments on commit 41df235

Please sign in to comment.