Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add eth 68 protocol #117

Merged
merged 11 commits into from
Sep 11, 2024
23 changes: 18 additions & 5 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,17 +755,20 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
}

// generate 50 txs
hashMap, _, err := generateTxs(s, 50)
_, txs, err := generateTxs(s, 50)
if err != nil {
t.Fatalf("failed to generate transactions: %v", err)
}

// create new pooled tx hashes announcement
hashes := make([]common.Hash, 0)
for _, hash := range hashMap {
hashes = append(hashes, hash)
hashes := make([]common.Hash, len(txs))
types := make([]byte, len(txs))
sizes := make([]uint32, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
types[i] = tx.Type()
sizes[i] = uint32(tx.Size())
}
announce := NewPooledTransactionHashes(hashes)

// send announcement
conn, err := s.dial66()
Expand All @@ -776,6 +779,14 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
if err = conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}

var announce Message
if conn.negotiatedProtoVersion == eth.ETH68 {
announce = NewPooledTransactionHashes68{Types: types}
} else {
announce = NewPooledTransactionHashes(hashes)
}

if err = conn.Write(announce); err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
Expand All @@ -792,6 +803,8 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *NewPooledTransactionHashes68:
continue
// ignore block announcements from previous tests
case *NewBlockHashes:
continue
Expand Down
4 changes: 4 additions & 0 deletions cmd/devp2p/internal/ethtest/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket

func (nb NewPooledTransactionHashes) Code() int { return 24 }

type NewPooledTransactionHashes68 eth.NewPooledTransactionHashesPacket68

func (nb NewPooledTransactionHashes68) Code() int { return 24 }

type GetPooledTransactions eth.GetPooledTransactionsPacket

func (gpt GetPooledTransactions) Code() int { return 25 }
Expand Down
3 changes: 3 additions & 0 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
case *eth.NewPooledTransactionHashesPacket:
return h.txFetcher.Notify(peer.ID(), *packet)

case *eth.NewPooledTransactionHashesPacket68:
return h.txFetcher.Notify(peer.ID(), packet.Hashes)

case *eth.TransactionsPacket:
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

Expand Down
11 changes: 10 additions & 1 deletion eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
h.txAnnounces.Send(([]common.Hash)(*packet))
return nil

case *eth.NewPooledTransactionHashesPacket68:
h.txAnnounces.Send(packet.Hashes)
return nil

case *eth.TransactionsPacket:
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
return nil
Expand All @@ -82,6 +86,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// fork IDs in the protocol handshake.
func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) }
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }

func testForkIDSplit(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -240,6 +245,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
// Tests that received transactions are added to the local pool.
func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) }
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }

func testRecvTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -297,6 +303,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {

// This test checks that pending transactions are sent.
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }

func testSendTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -355,7 +362,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
seen := make(map[common.Hash]struct{})
for len(seen) < len(insert) {
switch protocol {
case 66:
case 66, 68:
select {
case hashes := <-anns:
for _, hash := range hashes {
Expand Down Expand Up @@ -383,6 +390,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
// broadcasts or via announcements/retrievals.
func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) }
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }

func testTransactionPropagation(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -690,6 +698,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// with the hashes in the header) gets discarded and not broadcast forward.
func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) }
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }

func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
25 changes: 18 additions & 7 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
if done == nil && len(queue) > 0 {
// Pile transaction hashes until we reach our allowed network limit
var (
count int
pending []common.Hash
size common.StorageSize
count int
pending []common.Hash
pendingTypes []byte
pendingSizes []uint32
size common.StorageSize
)
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
if p.txpool.Get(queue[count]) != nil {
if tx := p.txpool.Get(queue[count]); tx != nil {
pending = append(pending, queue[count])
pendingTypes = append(pendingTypes, tx.Type())
pendingSizes = append(pendingSizes, uint32(tx.Size()))
size += common.HashLength
}
}
Expand All @@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
if len(pending) > 0 {
done = make(chan struct{})
go func() {
if err := p.sendPooledTransactionHashes(pending); err != nil {
fail <- err
return
if p.version >= ETH68 {
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
fail <- err
return
}
} else {
if err := p.sendPooledTransactionHashes(pending); err != nil {
fail <- err
return
}
}
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
Expand Down
26 changes: 24 additions & 2 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2
for i, version := range ProtocolVersions {
version := version // Closure

protocolName := ProtocolName
if version == ETH68 {
protocolName = Protocol68Name
}
protocols[i] = p2p.Protocol{
Name: ProtocolName,
Name: protocolName,
Version: version,
Length: protocolLengths[version],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
Expand Down Expand Up @@ -216,6 +220,21 @@ var eth66 = map[uint64]msgHandler{
TransactionsExMsg: handleTransactionsEx,
}

var eth68 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
Expand All @@ -230,9 +249,12 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()

var handlers = eth65
if peer.Version() >= ETH66 {
if peer.Version() == ETH66 {
handlers = eth66
}
if peer.Version() == ETH68 {
jennwiederholen marked this conversation as resolved.
Show resolved Hide resolved
handlers = eth68
}

// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
Expand Down
21 changes: 16 additions & 5 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (b *testBackend) Handle(*Peer, Packet) error {
// Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) }
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }

func testGetBlockHeaders(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -312,6 +313,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
// Tests that block contents can be retrieved from a remote chain based on their hashes.
func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) }
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }

func testGetBlockBodies(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -401,10 +403,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}

// Tests that the state trie nodes can be retrieved based on hashes.
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65, false) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) }
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) }

func testGetNodeData(t *testing.T, protocol uint) {
func testGetNodeData(t *testing.T, protocol uint, drop bool) {
t.Parallel()

// Define three accounts to simulate transactions with
Expand Down Expand Up @@ -469,8 +472,15 @@ func testGetNodeData(t *testing.T, protocol uint) {
})
}
msg, err := peer.app.ReadMsg()
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
if !drop {
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
}
} else {
if err != nil {
return
}
t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg)
}
if msg.Code != NodeDataMsg {
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
Expand Down Expand Up @@ -524,6 +534,7 @@ func testGetNodeData(t *testing.T, protocol uint) {
// Tests that the transaction receipts can be retrieved based on hashes.
func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) }
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }

func testGetBlockReceipts(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
27 changes: 27 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,33 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return nil
}

func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket68)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
}
f := func() error {
// Schedule all the unknown hashes for retrieval
for _, hash := range ann.Hashes {
peer.markTransaction(hash)
}
return backend.Handle(peer, ann)
}
if params.ConsensusMethod == params.ConsensusPoW {
return f()
}
go f()
return nil
}

func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
}

func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
}

// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
// announce to a remote peer. The number of pending sends are capped (new ones
// will force old sends to be dropped)
Expand Down
2 changes: 2 additions & 0 deletions eth/protocols/eth/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
errc := make(chan error, 1)
go func() {
defer app.Close()

errc <- backend.RunPeer(peer, func(peer *Peer) error {
return Handle(backend, peer)
})
Expand Down
16 changes: 14 additions & 2 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ import (
const (
ETH65 = 65
ETH66 = 66
ETH68 = 68
)

// ProtocolName is the official short name of the `eth` protocol used during
// devp2p capability negotiation.
const ProtocolName = "mir"
const Protocol68Name = "eth"

// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH66, ETH65}
var ProtocolVersions = []uint{ETH68, ETH66, ETH65}

// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH66: 23, ETH65: 23}
var protocolLengths = map[uint]uint64{ETH66: 23, ETH65: 23, ETH68: 17}

// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 100 * 1024 * 1024
Expand Down Expand Up @@ -316,6 +318,13 @@ type ReceiptsRLPPacket66 struct {
// NewPooledTransactionHashesPacket represents a transaction announcement packet.
type NewPooledTransactionHashesPacket []common.Hash

// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer.
type NewPooledTransactionHashesPacket68 struct {
Types []byte
Sizes []uint32
Hashes []common.Hash
}

// GetPooledTransactionsPacket represents a transaction query.
type GetPooledTransactionsPacket []common.Hash

Expand Down Expand Up @@ -421,6 +430,9 @@ func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg }
func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" }
func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg }

func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" }
func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg }

func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" }
func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }

Expand Down
Loading