Skip to content

Commit

Permalink
wip/neighborMsg handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jimjbrettj committed Sep 24, 2024
1 parent 04540d1 commit ab353be
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
24 changes: 23 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,30 @@ func (s *Service) handleVoteMessage(from peer.ID, vote *VoteMessage) (err error)
return nil
}

func (s *Service) handleNeighborMessage(round uint64, setID uint64) error {
// TODO sender side of neighbor msg
highestHeader, err := s.blockState.GetHighestFinalisedHeader()
if err != nil {
return err
}
neighbourMessage := &NeighbourPacketV1{
Round: round,
SetID: setID,
Number: uint32(highestHeader.Number),
}

cm, err := neighbourMessage.ToConsensusMessage()
if err != nil {
return fmt.Errorf("converting neighbour message to network message: %w", err)
}

logger.Errorf("sending neighbour message: %v", neighbourMessage)
s.network.GossipMessage(cm)
return nil
}

func (s *Service) handleCommitMessage(commitMessage *CommitMessage) error {
logger.Debugf("received commit message: %+v", commitMessage)
logger.Warnf("received commit message: %+v", commitMessage)

err := verifyBlockHashAgainstBlockNumber(s.blockState,
commitMessage.Vote.Hash, uint(commitMessage.Vote.Number))
Expand Down
25 changes: 24 additions & 1 deletion lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MessageHandler struct {
grandpa *Service
blockState BlockState
telemetry Telemetry

isStart bool // This is a temp hacky way
}

// NewMessageHandler returns a new MessageHandler
Expand All @@ -39,6 +41,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer
grandpa: grandpa,
blockState: blockState,
telemetry: telemetryMailer,
isStart: true,
}
}

Expand Down Expand Up @@ -82,8 +85,28 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
}
}

func (h *MessageHandler) handleNeighbourMessage(_ *NeighbourPacketV1) error {
func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1) error {
// TODO(#2931)
// This should be the receiver side of the handling messages, NOT GOSSIP
if h.isStart {
logger.Errorf("Received initial neighbor msg")
neighbourMessage := &NeighbourPacketV1{
Round: packet.Round,
SetID: packet.SetID,
Number: packet.Number,
}

cm, err := neighbourMessage.ToConsensusMessage()
if err != nil {
return fmt.Errorf("converting neighbour message to network message: %w", err)
}

logger.Errorf("sending neighbour message: %v", neighbourMessage)
h.grandpa.network.GossipMessage(cm)
h.isStart = false
}

// TODO handle in normal case?
return nil
}

Expand Down
50 changes: 44 additions & 6 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type tracker struct {
in chan *types.Block // receive imported block from BlockState
stopped chan struct{}

neighborIn chan NeighbourPacketV1 // trigger the sending of a neighbor message
stoppedNeighbor chan struct{}

catchUpResponseMessageMutex sync.Mutex
// round(uint64) is used as key and *CatchUpResponse as value
catchUpResponseMessages map[uint64]*CatchUpResponse
Expand All @@ -33,22 +36,28 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker {
commitsCapacity = 1000
)
return &tracker{
blockState: bs,
handler: handler,
votes: newVotesTracker(votesCapacity),
commits: newCommitsTracker(commitsCapacity),
in: bs.GetImportedBlockNotifierChannel(),
stopped: make(chan struct{}),
blockState: bs,
handler: handler,
votes: newVotesTracker(votesCapacity),
commits: newCommitsTracker(commitsCapacity),
in: bs.GetImportedBlockNotifierChannel(),
stopped: make(chan struct{}),

neighborIn: make(chan NeighbourPacketV1),
stoppedNeighbor: make(chan struct{}),

catchUpResponseMessages: make(map[uint64]*CatchUpResponse),
}
}

func (t *tracker) start() {
go t.handleBlocks()
go t.handleNeighborMessage()
}

func (t *tracker) stop() {
close(t.stopped)
close(t.stoppedNeighbor)
t.blockState.FreeImportedBlockNotifierChannel(t.in)
}

Expand All @@ -62,6 +71,11 @@ func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) {

func (t *tracker) addCommit(cm *CommitMessage) {
t.commits.add(cm)
t.neighborIn <- NeighbourPacketV1{
Round: cm.Round + 1,
SetID: cm.SetID, // need to hceck for set changes
Number: 0, // This gets modified later
}
}

func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) {
Expand Down Expand Up @@ -92,6 +106,30 @@ func (t *tracker) handleBlocks() {
}
}

func (t *tracker) handleNeighborMessage() {
// https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73
const duration = time.Minute * 2
ticker := time.NewTicker(duration)
defer ticker.Stop()

for {
select {
case msg := <-t.neighborIn:
logger.Warnf("Event Channel handleNeighborMessage Triggered")
err := t.handler.grandpa.handleNeighborMessage(msg.Round, msg.SetID)
if err != nil {
logger.Errorf("handling neighbor message: %v", err)
}

ticker.Reset(duration)
case <-ticker.C:
logger.Warnf("Tick handleNeighborMessage")
case <-t.stoppedNeighbor:
return
}
}
}

func (t *tracker) handleBlock(b *types.Block) {
h := b.Header.Hash()
vms := t.votes.messages(h)
Expand Down

0 comments on commit ab353be

Please sign in to comment.