From 92c6c451181be7f4d85dda94c6cb7cfd797cb9e7 Mon Sep 17 00:00:00 2001 From: Roberto Bayardo Date: Tue, 29 Oct 2024 18:11:02 -0700 Subject: [PATCH] throttling feedback loop --- go.mod | 2 +- go.sum | 4 +- op-batcher/batcher/channel_manager.go | 6 +++ op-batcher/batcher/config.go | 16 ++++++- op-batcher/batcher/driver.go | 59 +++++++++++++++++++++++++- op-batcher/batcher/service.go | 10 ++++- op-batcher/flags/flags.go | 18 ++++++++ op-batcher/metrics/metrics.go | 4 +- op-service/dial/ethclient_interface.go | 2 + 9 files changed, 113 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index a662ff044ac6..9d8bad479f68 100644 --- a/go.mod +++ b/go.mod @@ -250,7 +250,7 @@ require ( rsc.io/tmplfunc v0.0.3 // indirect ) -replace github.com/ethereum/go-ethereum v1.14.11 => github.com/roberto-bayardo/op-geth v0.0.0-20241028225047-706b5a3ee843 +replace github.com/ethereum/go-ethereum v1.14.11 => github.com/roberto-bayardo/op-geth v0.0.0-20241029195704-9c45d4792207 //replace github.com/ethereum/go-ethereum => ../op-geth diff --git a/go.sum b/go.sum index d81d3b6e41f7..770052d1afc5 100644 --- a/go.sum +++ b/go.sum @@ -709,8 +709,8 @@ github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtD github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/roberto-bayardo/op-geth v0.0.0-20241028225047-706b5a3ee843 h1:Kzf4Pw7lKqluu/CZKbr7MVux3MdqZUks5NofgZr6KOo= -github.com/roberto-bayardo/op-geth v0.0.0-20241028225047-706b5a3ee843/go.mod h1:7S4pp8KHBmEmKkRjL1BPOc6jY9hW+64YeMUjR3RVLw4= +github.com/roberto-bayardo/op-geth v0.0.0-20241029195704-9c45d4792207 h1:ozQjBXbRk9vpv9Uh4xB6W4rsJEVI2Nq6rg6LaDkuvIo= +github.com/roberto-bayardo/op-geth v0.0.0-20241029195704-9c45d4792207/go.mod h1:7S4pp8KHBmEmKkRjL1BPOc6jY9hW+64YeMUjR3RVLw4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 887c51f4ebf2..dfc14ac053d2 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -517,3 +517,9 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) { // to pick up the new ChannelConfig s.defaultCfg = newCfg } + +// PendingBytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2 +// but not yet in a channel). +func (s *channelManager) PendingBytes() int64 { + return int64(s.metr.PendingBytes()) +} diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 22edd3ca3b3f..7b7ae89aaf95 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -96,9 +96,15 @@ type CLIConfig struct { // ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint. ActiveSequencerCheckDuration time.Duration - // ThrottleThreshold is the number of pending bytes beyond which the batcher will start - // throttling future bytes written to DA. 0 means never throttle. + // ThrottleThreshold is the number of pending bytes beyond which the batcher will start throttling future bytes + // written to DA. 0 means never throttle. ThrottleThreshold uint64 + // ThrottleInterval is the interval between performing DA throttling actions. + ThrottleInterval time.Duration + // ThrottleTxSize is the DA size of a transaction to start throttling when we are over the throttling threshold. + ThrottleTxSize uint64 + // ThrottleBlockSize is the total per-block DA limit to start imposing on block building when we are over the throttling threshold. + ThrottleBlockSize uint64 // TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize. // Should only be used for testing purposes. @@ -149,6 +155,9 @@ func (c *CLIConfig) Check() error { if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) { return fmt.Errorf("unknown data availability type: %q", c.DataAvailabilityType) } + if c.ThrottleThreshold != 0 && c.ThrottleInterval == 0 { + return errors.New("throttling threshold is specified, but throttling poll interval is zero") + } // we want to enforce it for both blobs and auto if c.DataAvailabilityType != flags.CalldataType && c.TargetNumFrames > eth.MaxBlobsPerBlobTx { return fmt.Errorf("too many frames for blob transactions, max %d", eth.MaxBlobsPerBlobTx) @@ -200,5 +209,8 @@ func NewConfig(ctx *cli.Context) *CLIConfig { RPC: oprpc.ReadCLIConfig(ctx), AltDA: altda.ReadCLIConfig(ctx), ThrottleThreshold: ctx.Uint64(flags.ThrottleThresholdFlag.Name), + ThrottleInterval: ctx.Duration(flags.ThrottleIntervalFlag.Name), + ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name), + ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name), } } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 41b3d118c448..9e4625702a6f 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -2,6 +2,7 @@ package batcher import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -18,6 +19,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" @@ -34,6 +36,7 @@ var ( }, }, } + SetMaxDASizeMethod = "miner_setMaxDASize" ) type txRef struct { @@ -402,6 +405,13 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txRef]) go l.processReceiptsLoop(receiptsCh, receiptsLoopDone) + // start the DA throttling loop + if l.Config.ThrottleThreshold > 0 { + throttlingLoopDone := make(chan struct{}) + defer close(throttlingLoopDone) + go l.throttlingLoop(throttlingLoopDone) + } + ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() @@ -470,6 +480,7 @@ func (l *BatchSubmitter) loop() { } func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) { + l.Log.Info("Starting receipts processing loop") for { select { case r := <-receiptsCh: @@ -488,7 +499,53 @@ func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txR l.Log.Info("Handling receipt", "id", r.ID) l.handleReceipt(r) case <-receiptsLoopDone: - l.Log.Info("Receipt processing loop done") + l.Log.Info("Receipts processing loop done") + return + } + } +} + +// throttlingLoop monitors the backlog in bytes we need to make available, and throttles incoming +// data appropriately to keep it under a threshold. +func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) { + l.Log.Info("Starting DA throttling loop") + ticker := time.NewTicker(l.Config.ThrottleInterval) + defer ticker.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), l.Config.NetworkTimeout) + defer cancel() + + for { + select { + case <-ticker.C: + cl, err := l.EndpointProvider.EthClient(ctx) + if err != nil { + l.Log.Error("Can't reach sequencer execution RPC", "err", err) + continue + } + pendingBytes := l.state.PendingBytes() + var maxTxSize, maxBlockSize uint64 + if pendingBytes > int64(l.Config.ThrottleThreshold) { + l.Log.Warn("Pending bytes over limit, throttling DA", "bytes", pendingBytes, "limit", l.Config.ThrottleThreshold) + maxTxSize = l.Config.ThrottleTxSize + maxBlockSize = l.Config.ThrottleBlockSize + } + var raw json.RawMessage + if err := cl.Client().CallContext( + ctx, &raw, SetMaxDASizeMethod, hexutil.Uint64(maxTxSize), hexutil.Uint64(maxBlockSize)); err != nil { + l.Log.Error("SetMaxDASize rpc failed:", err) + } + var success bool + if err := json.Unmarshal(raw, &success); err != nil { + l.Log.Error("Can't unmarshall result of SetMaxDASize", "err", err) + continue + } + if !success { + l.Log.Error("Result of SetMaxDASize was false") + continue + } + case <-throttlingLoopDone: + l.Log.Info("DA throttling loop done") return } } diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 90a85cc4ee48..8ffa53e8c75d 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -44,6 +44,10 @@ type BatcherConfig struct { WaitNodeSync bool CheckRecentTxsDepth int + + // For throttling DA. See CLIConfig in config.go for details on these parameters. + ThrottleThreshold, ThrottleTxSize, ThrottleBlockSize uint64 + ThrottleInterval time.Duration } // BatcherService represents a full batch-submitter instance and its resources, @@ -101,6 +105,11 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth bs.WaitNodeSync = cfg.WaitNodeSync + + bs.ThrottleThreshold = cfg.ThrottleThreshold + bs.ThrottleTxSize = cfg.ThrottleTxSize + bs.ThrottleBlockSize = cfg.ThrottleBlockSize + if err := bs.initRPCClients(ctx, cfg); err != nil { return err } @@ -440,7 +449,6 @@ func (bs *BatcherService) Stop(ctx context.Context) error { if bs.EndpointProvider != nil { bs.EndpointProvider.Close() } - if result == nil { bs.stopped.Store(true) bs.Log.Info("Batch Submitter stopped") diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 1ae93f728c44..68f8101db124 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -162,6 +162,24 @@ var ( Value: 0, // never throttle EnvVars: prefixEnvVars("THROTTLE_THRESHOLD"), } + ThrottleTxSizeFlag = &cli.IntFlag{ + Name: "throttle-tx-size", + Usage: "The DA size of transactions to start throttling when we are over the throttling threshold", + Value: 300, // most transactions compress to under 300 bytes. TODO: compute exact distribution + EnvVars: prefixEnvVars("THROTTLE_TX_SIZE"), + } + ThrottleBlockSizeFlag = &cli.IntFlag{ + Name: "throttle-block-size", + Usage: "The total DA limit to start imposing on block building when we are over the throttling threshold", + Value: 21_000, // at least 70 transactions per block of up to 300 compressed bytes each. + EnvVars: prefixEnvVars("THROTTLE_TX_SIZE"), + } + ThrottleIntervalFlag = &cli.DurationFlag{ + Name: "throttle-interval", + Usage: "Interval between potential DA throttling actions", + Value: 12 * time.Second, // default to L1 block interval + EnvVars: prefixEnvVars("THROTTLE_POLL_INTERVAL"), + } // Legacy Flags SequencerHDPathFlag = txmgr.SequencerHDPathFlag ) diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go index f612c0608882..ae074c042df6 100644 --- a/op-batcher/metrics/metrics.go +++ b/op-batcher/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "io" + "math/big" "sync/atomic" "github.com/prometheus/client_golang/prometheus" @@ -340,7 +341,8 @@ func estimateBatchSize(block *types.Block) uint64 { if tx.IsDepositTx() { continue } - bigSize := types.EstimatedL1Size(tx.RollupCostData()) + bigSize := types.EstimatedL1SizeScaled(tx.RollupCostData()) + bigSize = bigSize.Div(bigSize, big.NewInt(1e6)) // unscale it if bigSize.IsUint64() { size += bigSize.Uint64() } else { diff --git a/op-service/dial/ethclient_interface.go b/op-service/dial/ethclient_interface.go index 58a2070974ee..292c7b07fa4c 100644 --- a/op-service/dial/ethclient_interface.go +++ b/op-service/dial/ethclient_interface.go @@ -5,12 +5,14 @@ import ( "math/big" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" ) // EthClientInterface is an interface for providing an ethclient.Client // It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers type EthClientInterface interface { BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + Client() *rpc.Client Close() }