Skip to content

Commit

Permalink
implement throttling feedback loop based on the amount of data pendin…
Browse files Browse the repository at this point in the history
…g for DA settlement
  • Loading branch information
roberto-bayardo committed Nov 5, 2024
1 parent 5e209a5 commit 4b6487b
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 51 deletions.
14 changes: 14 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"math"
"sync"

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
Expand Down Expand Up @@ -517,3 +518,16 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) {
// to pick up the new ChannelConfig
s.defaultCfg = newCfg
}

// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
// but not yet in a channel).
func (s *channelManager) PendingDABytes() int64 {
f := s.metr.PendingDABytes()
if f >= math.MaxInt64 {
return math.MaxInt64
}
if f <= math.MinInt64 {
return math.MinInt64
}
return int64(f)
}
20 changes: 20 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ type CLIConfig struct {
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration

// ThrottleThreshold is the number of pending bytes beyond which the batcher will start throttling future bytes
// written to DA. 0 means never throttle.
ThrottleThreshold uint64
// ThrottleInterval is the interval between performing DA throttling actions.
ThrottleInterval time.Duration
// ThrottleTxSize is the DA size of a transaction to start throttling when we are over the throttling threshold.
ThrottleTxSize uint64
// ThrottleBlockSize is the total per-block DA limit to start imposing on block building when we are over the throttling threshold.
ThrottleBlockSize uint64
// ThrottleAlwaysBlockSize is the total per-block DA limit to always imposing on block building.
ThrottleAlwaysBlockSize uint64

// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
TestUseMaxTxSizeForBlobs bool
Expand Down Expand Up @@ -145,6 +157,9 @@ func (c *CLIConfig) Check() error {
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
return fmt.Errorf("unknown data availability type: %q", c.DataAvailabilityType)
}
if c.ThrottleInterval == 0 {
return errors.New("throttling interval must be non-zero")
}
// we want to enforce it for both blobs and auto
if c.DataAvailabilityType != flags.CalldataType && c.TargetNumFrames > eth.MaxBlobsPerBlobTx {
return fmt.Errorf("too many frames for blob transactions, max %d", eth.MaxBlobsPerBlobTx)
Expand Down Expand Up @@ -195,5 +210,10 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
AltDA: altda.ReadCLIConfig(ctx),
ThrottleThreshold: ctx.Uint64(flags.ThrottleThresholdFlag.Name),
ThrottleInterval: ctx.Duration(flags.ThrottleIntervalFlag.Name),
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name),
}
}
7 changes: 5 additions & 2 deletions op-batcher/batcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ func validBatcherConfig() batcher.CLIConfig {
MetricsConfig: metrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
// The compressor config is not checked in config.Check()
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
ThrottleThreshold: 0, // no DA throttling
ThrottleInterval: 12 * time.Second,
ThrottleTxSize: 0,
}
}

Expand Down
130 changes: 101 additions & 29 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -34,6 +35,7 @@ var (
},
},
}
SetMaxDASizeMethod = "miner_setMaxDASize"
)

type txRef struct {
Expand Down Expand Up @@ -100,6 +102,8 @@ type BatchSubmitter struct {
killCtx context.Context
cancelKillCtx context.CancelFunc

l2BlockAdded chan struct{} // notifies the throttling loop whenever an l2 block is added

mutex sync.Mutex
running bool

Expand Down Expand Up @@ -290,6 +294,12 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
return nil, fmt.Errorf("adding L2 block to state: %w", err)
}

// notify the throttling loop it may be time to initiate throttling without blocking
select {
case l.l2BlockAdded <- struct{}{}:
default:
}

l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
return block, nil
}
Expand Down Expand Up @@ -384,7 +394,6 @@ const (
func (l *BatchSubmitter) loop() {
defer l.wg.Done()

receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
daGroup := &errgroup.Group{}
// errgroup with limit of 0 means no goroutine is able to run concurrently,
Expand All @@ -393,37 +402,22 @@ func (l *BatchSubmitter) loop() {
daGroup.SetLimit(int(l.Config.MaxConcurrentDARequests))
}

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop

l.txpoolMutex.Lock()
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()
go func() {
for {
select {
case r := <-receiptsCh:
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
l.Log.Info("Receipt processing loop done")
return
}
}
}()

// start the receipt/result processing loop
receiptsLoopDone := make(chan struct{})
defer close(receiptsLoopDone) // shut down receipt loop
l.l2BlockAdded = make(chan struct{})
defer close(l.l2BlockAdded)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
go l.processReceiptsLoop(receiptsCh, receiptsLoopDone)

// start the DA throttling loop
throttlingLoopDone := make(chan struct{})
defer close(throttlingLoopDone)
go l.throttlingLoop(throttlingLoopDone)

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -492,6 +486,84 @@ func (l *BatchSubmitter) loop() {
}
}

func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) {
l.Log.Info("Starting receipts processing loop")
for {
select {
case r := <-receiptsCh:
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptsLoopDone:
l.Log.Info("Receipts processing loop done")
return
}
}
}

// throttlingLoop monitors the backlog in bytes we need to make available, and throttles incoming data appropriately to
// keep it under a threshold. Note that it's important to start this loop even if throttling is disabled
// (ThrottleThreshold == 0) just in case we fail over to another sequencer that was previously configured
// differently. By looping & calling the miner API setter continuously we ensure the engine currently in use is always
// going to be reset to the proper throttling settings.
func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
l.Log.Info("Starting DA throttling loop")
ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop()

updateParams := func() {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout)
defer cancel()
cl, err := l.EndpointProvider.EthClient(ctx)
if err != nil {
l.Log.Error("Can't reach sequencer execution RPC", "err", err)
return
}
pendingBytes := l.state.PendingDABytes()
maxTxSize := uint64(0)
maxBlockSize := l.Config.ThrottleAlwaysBlockSize
if l.Config.ThrottleThreshold != 0 && pendingBytes > int64(l.Config.ThrottleThreshold) {
l.Log.Warn("Pending bytes over limit, throttling DA", "bytes", pendingBytes, "limit", l.Config.ThrottleThreshold)
maxTxSize = l.Config.ThrottleTxSize
if maxBlockSize == 0 || (l.Config.ThrottleBlockSize != 0 && l.Config.ThrottleBlockSize < maxBlockSize) {
maxBlockSize = l.Config.ThrottleBlockSize
}
}
var success bool
if err := cl.Client().CallContext(
ctx, &success, SetMaxDASizeMethod, hexutil.Uint64(maxTxSize), hexutil.Uint64(maxBlockSize)); err != nil {
l.Log.Error("SetMaxDASize rpc failed", "err", err)
return
}
if !success {
l.Log.Error("Result of SetMaxDASize was false")
}
}

for {
select {
case <-l.l2BlockAdded:
updateParams()
case <-ticker.C:
updateParams()
case <-throttlingLoopDone:
l.Log.Info("DA throttling loop done")
return
}
}
}

// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
Expand Down
22 changes: 22 additions & 0 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type BatcherConfig struct {

WaitNodeSync bool
CheckRecentTxsDepth int

// For throttling DA. See CLIConfig in config.go for details on these parameters.
ThrottleThreshold, ThrottleTxSize uint64
ThrottleBlockSize, ThrottleAlwaysBlockSize uint64
ThrottleInterval time.Duration
}

// BatcherService represents a full batch-submitter instance and its resources,
Expand Down Expand Up @@ -101,6 +106,13 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync

bs.ThrottleThreshold = cfg.ThrottleThreshold
bs.ThrottleTxSize = cfg.ThrottleTxSize
bs.ThrottleBlockSize = cfg.ThrottleBlockSize
bs.ThrottleAlwaysBlockSize = cfg.ThrottleAlwaysBlockSize
bs.ThrottleInterval = cfg.ThrottleInterval

if err := bs.initRPCClients(ctx, cfg); err != nil {
return err
}
Expand Down Expand Up @@ -457,3 +469,13 @@ func (bs *BatcherService) TestDriver() *TestBatchSubmitter {
BatchSubmitter: bs.driver,
}
}

// ThrottlingTestDriver returns a handler for the batch-submitter driver element that is in "always throttle" mode, for
// use only in testing.
func (bs *BatcherService) ThrottlingTestDriver() *TestBatchSubmitter {
tbs := &TestBatchSubmitter{
BatchSubmitter: bs.driver,
}
tbs.BatchSubmitter.state.metr = new(metrics.ThrottlingMetrics)
return tbs
}
35 changes: 35 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,36 @@ var (
Value: false,
EnvVars: prefixEnvVars("WAIT_NODE_SYNC"),
}
ThrottleThresholdFlag = &cli.IntFlag{
Name: "throttle-threshold",
Usage: "The threshold on pending-blocks-bytes-current beyond which the batcher will instruct the block builder to start throttling transactions with larger DA demands",
Value: 0, // never throttle
EnvVars: prefixEnvVars("THROTTLE_THRESHOLD"),
}
ThrottleTxSizeFlag = &cli.IntFlag{
Name: "throttle-tx-size",
Usage: "The DA size of transactions to start throttling when we are over the throttle threshold",
Value: 300, // most transactions compress to under 300 bytes. TODO: compute exact distribution
EnvVars: prefixEnvVars("THROTTLE_TX_SIZE"),
}
ThrottleBlockSizeFlag = &cli.IntFlag{
Name: "throttle-block-size",
Usage: "The total DA limit to start imposing on block building when we are over the throttle threshold",
Value: 21_000, // at least 70 transactions per block of up to 300 compressed bytes each.
EnvVars: prefixEnvVars("THROTTLE_BLOCK_SIZE"),
}
ThrottleAlwaysBlockSizeFlag = &cli.IntFlag{
Name: "throttle-always-block-size",
Usage: "The total DA limit to start imposing on block building at all times",
Value: 130_000, // should be larger than the builder's max-l2-tx-size to prevent endlessly throttling some txs
EnvVars: prefixEnvVars("THROTTLE_ALWAYS_BLOCK_SIZE"),
}
ThrottleIntervalFlag = &cli.DurationFlag{
Name: "throttle-interval",
Usage: "Interval between potential DA throttling actions",
Value: 2 * time.Second, // should be equal to the l2 block interval
EnvVars: prefixEnvVars("THROTTLE_INTERVAL"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
Expand Down Expand Up @@ -184,6 +214,11 @@ var optionalFlags = []cli.Flag{
DataAvailabilityTypeFlag,
ActiveSequencerCheckDurationFlag,
CompressionAlgoFlag,
ThrottleThresholdFlag,
ThrottleIntervalFlag,
ThrottleTxSizeFlag,
ThrottleBlockSizeFlag,
ThrottleAlwaysBlockSizeFlag,
}

func init() {
Expand Down
Loading

0 comments on commit 4b6487b

Please sign in to comment.