Skip to content

Commit

Permalink
throttling feedback loop
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto-bayardo committed Oct 30, 2024
1 parent cface7b commit 92c6c45
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)

replace github.com/ethereum/go-ethereum v1.14.11 => github.com/roberto-bayardo/op-geth v0.0.0-20241028225047-706b5a3ee843
replace github.com/ethereum/go-ethereum v1.14.11 => github.com/roberto-bayardo/op-geth v0.0.0-20241029195704-9c45d4792207

//replace github.com/ethereum/go-ethereum => ../op-geth

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,8 @@ github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtD
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/roberto-bayardo/op-geth v0.0.0-20241028225047-706b5a3ee843 h1:Kzf4Pw7lKqluu/CZKbr7MVux3MdqZUks5NofgZr6KOo=
github.com/roberto-bayardo/op-geth v0.0.0-20241028225047-706b5a3ee843/go.mod h1:7S4pp8KHBmEmKkRjL1BPOc6jY9hW+64YeMUjR3RVLw4=
github.com/roberto-bayardo/op-geth v0.0.0-20241029195704-9c45d4792207 h1:ozQjBXbRk9vpv9Uh4xB6W4rsJEVI2Nq6rg6LaDkuvIo=
github.com/roberto-bayardo/op-geth v0.0.0-20241029195704-9c45d4792207/go.mod h1:7S4pp8KHBmEmKkRjL1BPOc6jY9hW+64YeMUjR3RVLw4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down
6 changes: 6 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,9 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) {
// to pick up the new ChannelConfig
s.defaultCfg = newCfg
}

// PendingBytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
// but not yet in a channel).
func (s *channelManager) PendingBytes() int64 {
return int64(s.metr.PendingBytes())
}
16 changes: 14 additions & 2 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,15 @@ type CLIConfig struct {
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration

// ThrottleThreshold is the number of pending bytes beyond which the batcher will start
// throttling future bytes written to DA. 0 means never throttle.
// ThrottleThreshold is the number of pending bytes beyond which the batcher will start throttling future bytes
// written to DA. 0 means never throttle.
ThrottleThreshold uint64
// ThrottleInterval is the interval between performing DA throttling actions.
ThrottleInterval time.Duration
// ThrottleTxSize is the DA size of a transaction to start throttling when we are over the throttling threshold.
ThrottleTxSize uint64
// ThrottleBlockSize is the total per-block DA limit to start imposing on block building when we are over the throttling threshold.
ThrottleBlockSize uint64

// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
Expand Down Expand Up @@ -149,6 +155,9 @@ func (c *CLIConfig) Check() error {
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
return fmt.Errorf("unknown data availability type: %q", c.DataAvailabilityType)
}
if c.ThrottleThreshold != 0 && c.ThrottleInterval == 0 {
return errors.New("throttling threshold is specified, but throttling poll interval is zero")
}
// we want to enforce it for both blobs and auto
if c.DataAvailabilityType != flags.CalldataType && c.TargetNumFrames > eth.MaxBlobsPerBlobTx {
return fmt.Errorf("too many frames for blob transactions, max %d", eth.MaxBlobsPerBlobTx)
Expand Down Expand Up @@ -200,5 +209,8 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
RPC: oprpc.ReadCLIConfig(ctx),
AltDA: altda.ReadCLIConfig(ctx),
ThrottleThreshold: ctx.Uint64(flags.ThrottleThresholdFlag.Name),
ThrottleInterval: ctx.Duration(flags.ThrottleIntervalFlag.Name),
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
}
}
59 changes: 58 additions & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batcher

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -34,6 +36,7 @@ var (
},
},
}
SetMaxDASizeMethod = "miner_setMaxDASize"
)

type txRef struct {
Expand Down Expand Up @@ -402,6 +405,13 @@ func (l *BatchSubmitter) loop() {
receiptsCh := make(chan txmgr.TxReceipt[txRef])
go l.processReceiptsLoop(receiptsCh, receiptsLoopDone)

// start the DA throttling loop
if l.Config.ThrottleThreshold > 0 {
throttlingLoopDone := make(chan struct{})
defer close(throttlingLoopDone)
go l.throttlingLoop(throttlingLoopDone)
}

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -470,6 +480,7 @@ func (l *BatchSubmitter) loop() {
}

func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) {
l.Log.Info("Starting receipts processing loop")
for {
select {
case r := <-receiptsCh:
Expand All @@ -488,7 +499,53 @@ func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txR
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptsLoopDone:
l.Log.Info("Receipt processing loop done")
l.Log.Info("Receipts processing loop done")
return
}
}
}

// throttlingLoop monitors the backlog in bytes we need to make available, and throttles incoming
// data appropriately to keep it under a threshold.
func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
l.Log.Info("Starting DA throttling loop")
ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop()

ctx, cancel := context.WithTimeout(context.Background(), l.Config.NetworkTimeout)
defer cancel()

for {
select {
case <-ticker.C:
cl, err := l.EndpointProvider.EthClient(ctx)
if err != nil {
l.Log.Error("Can't reach sequencer execution RPC", "err", err)
continue
}
pendingBytes := l.state.PendingBytes()
var maxTxSize, maxBlockSize uint64
if pendingBytes > int64(l.Config.ThrottleThreshold) {
l.Log.Warn("Pending bytes over limit, throttling DA", "bytes", pendingBytes, "limit", l.Config.ThrottleThreshold)
maxTxSize = l.Config.ThrottleTxSize
maxBlockSize = l.Config.ThrottleBlockSize
}
var raw json.RawMessage
if err := cl.Client().CallContext(
ctx, &raw, SetMaxDASizeMethod, hexutil.Uint64(maxTxSize), hexutil.Uint64(maxBlockSize)); err != nil {
l.Log.Error("SetMaxDASize rpc failed:", err)
}
var success bool
if err := json.Unmarshal(raw, &success); err != nil {
l.Log.Error("Can't unmarshall result of SetMaxDASize", "err", err)
continue
}
if !success {
l.Log.Error("Result of SetMaxDASize was false")
continue
}
case <-throttlingLoopDone:
l.Log.Info("DA throttling loop done")
return
}
}
Expand Down
10 changes: 9 additions & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type BatcherConfig struct {

WaitNodeSync bool
CheckRecentTxsDepth int

// For throttling DA. See CLIConfig in config.go for details on these parameters.
ThrottleThreshold, ThrottleTxSize, ThrottleBlockSize uint64
ThrottleInterval time.Duration
}

// BatcherService represents a full batch-submitter instance and its resources,
Expand Down Expand Up @@ -101,6 +105,11 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync

bs.ThrottleThreshold = cfg.ThrottleThreshold
bs.ThrottleTxSize = cfg.ThrottleTxSize
bs.ThrottleBlockSize = cfg.ThrottleBlockSize

if err := bs.initRPCClients(ctx, cfg); err != nil {
return err
}
Expand Down Expand Up @@ -440,7 +449,6 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
if bs.EndpointProvider != nil {
bs.EndpointProvider.Close()
}

if result == nil {
bs.stopped.Store(true)
bs.Log.Info("Batch Submitter stopped")
Expand Down
18 changes: 18 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,24 @@ var (
Value: 0, // never throttle
EnvVars: prefixEnvVars("THROTTLE_THRESHOLD"),
}
ThrottleTxSizeFlag = &cli.IntFlag{
Name: "throttle-tx-size",
Usage: "The DA size of transactions to start throttling when we are over the throttling threshold",
Value: 300, // most transactions compress to under 300 bytes. TODO: compute exact distribution
EnvVars: prefixEnvVars("THROTTLE_TX_SIZE"),
}
ThrottleBlockSizeFlag = &cli.IntFlag{
Name: "throttle-block-size",
Usage: "The total DA limit to start imposing on block building when we are over the throttling threshold",
Value: 21_000, // at least 70 transactions per block of up to 300 compressed bytes each.
EnvVars: prefixEnvVars("THROTTLE_TX_SIZE"),
}
ThrottleIntervalFlag = &cli.DurationFlag{
Name: "throttle-interval",
Usage: "Interval between potential DA throttling actions",
Value: 12 * time.Second, // default to L1 block interval
EnvVars: prefixEnvVars("THROTTLE_POLL_INTERVAL"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
Expand Down
4 changes: 3 additions & 1 deletion op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"io"
"math/big"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -340,7 +341,8 @@ func estimateBatchSize(block *types.Block) uint64 {
if tx.IsDepositTx() {
continue
}
bigSize := types.EstimatedL1Size(tx.RollupCostData())
bigSize := types.EstimatedL1SizeScaled(tx.RollupCostData())
bigSize = bigSize.Div(bigSize, big.NewInt(1e6)) // unscale it
if bigSize.IsUint64() {
size += bigSize.Uint64()
} else {
Expand Down
2 changes: 2 additions & 0 deletions op-service/dial/ethclient_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

// EthClientInterface is an interface for providing an ethclient.Client
// It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers
type EthClientInterface interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
Client() *rpc.Client

Close()
}

0 comments on commit 92c6c45

Please sign in to comment.