From a3194a75730183a2bbeaebb2bfd79640ce73f935 Mon Sep 17 00:00:00 2001 From: Sylvain Bossut Date: Wed, 28 Feb 2024 10:04:29 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20add=20validator=20retention=20by=20?= =?UTF-8?q?pubkey?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 22 ++++++++++++++++++ main.go | 1 + .../postgresql/validatorepochsummaries.go | 22 ++++++++++++++---- services/chaindb/postgresql/validators.go | 22 ++++++++++++++---- services/chaindb/service.go | 4 ++-- services/summarizer/standard/handler.go | 2 +- services/summarizer/standard/parameters.go | 23 +++++++++++++++++++ services/summarizer/standard/prune.go | 4 ++-- services/summarizer/standard/service.go | 2 ++ 9 files changed, 87 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index ac8ccdd..6c24451 100644 --- a/README.md +++ b/README.md @@ -114,11 +114,16 @@ These tables, along with their indices, take over 90% of the space required for Pruning data removes data older than a certain age from the two database tables mentioned above. Pruning is set for each table individually and so it is possible to prune either or both of the tables, and to have different retentions for each. For example, the following configuration: +It is also possible to retain data for specific validator pubkey. ```yaml summarizer: validators: balance-retention: "P6M" epoch-retention: "P1Y" + retain: + - 0xab0bdda0f85f842f431beaccf1250bf1fd7ba51b4100fd64364b6401fda85bb0069b3e715b58819684e7fc0b10a72a34 + - 0x876dd4705157eb66dc71bc2e07fb151ea53e1a62a0bb980a7ce72d15f58944a8a3752d754f52f4a60dbfc7b18169f268 + - 0x9314c6de0386635e2799af798884c2ea09c63b9f079e572acc00b06a7faccce501ea4dfc0b1a23b8603680a5e3481327 ``` This will store 6 month's worth of balances, and 1 year's worth of epoch summaries. Retention periods are [ISO 8601 durations](https://en.wikipedia.org/wiki/ISO_8601#Durations). Note that if it is not desired to retain any balance or epoch summary data then the retention can be set to "PT0s". @@ -204,6 +209,23 @@ eth1deposits: # keep track of this itself, however if you wish to start from a different block this # can be set. # start-block: 500 +summarizer: + enable: true + epochs: + enable: true + blocks: + enable: true + validators: + enable: true + # retention period for validator balances (ISO_8601 duration format) + balance-retention: "P6M" + # retention period for validator epoch summaries (ISO_8601 duration format) + epoch-retention: "P1Y" + # validator indices to retain (it won't prune validator balances and summaries) + retain: + - 0xab0bdda0f85f842f431beaccf1250bf1fd7ba51b4100fd64364b6401fda85bb0069b3e715b58819684e7fc0b10a72a34 + - 0x876dd4705157eb66dc71bc2e07fb151ea53e1a62a0bb980a7ce72d15f58944a8a3752d754f52f4a60dbfc7b18169f268 + - 0x9314c6de0386635e2799af798884c2ea09c63b9f079e572acc00b06a7faccce501ea4dfc0b1a23b8603680a5e3481327 ``` ## Support diff --git a/main.go b/main.go index fe24aaf..e290f7d 100644 --- a/main.go +++ b/main.go @@ -578,6 +578,7 @@ func startSummarizer( standardsummarizer.WithMaxDaysPerRun(viper.GetUint64("summarizer.max-days-per-run")), standardsummarizer.WithValidatorEpochRetention(viper.GetString("summarizer.validators.epoch-retention")), standardsummarizer.WithValidatorBalanceRetention(viper.GetString("summarizer.validators.balance-retention")), + standardsummarizer.WithvalidatorRetainPubkeys(viper.GetStringSlice("summarizer.validators.retain")), ) if err != nil { return nil, errors.Wrap(err, "failed to create summarizer service") diff --git a/services/chaindb/postgresql/validatorepochsummaries.go b/services/chaindb/postgresql/validatorepochsummaries.go index e14d32f..f79ebda 100644 --- a/services/chaindb/postgresql/validatorepochsummaries.go +++ b/services/chaindb/postgresql/validatorepochsummaries.go @@ -518,7 +518,7 @@ WHERE f_validator_index = $1 } // PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point. -func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error { +func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retainPubkeys []phase0.BLSPubKey) error { ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorEpochSummaries") defer span.End() @@ -533,15 +533,27 @@ func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Ep queryBuilder.WriteString(` DELETE FROM t_validator_epoch_summaries -WHERE f_epoch <= $1 +USING t_validators +WHERE t_validator_epoch_summaries.f_validator_index = t_validators.f_index +AND f_epoch <= $1 `) queryVals = append(queryVals, to) - if len(retain) > 0 { + if len(retainPubkeys) > 0 { queryBuilder.WriteString(` -AND f_validator_index NOT IN($2) +AND NOT (t_validators.f_public_key = ANY($2)) `) - queryVals = append(queryVals, retain) + + retainPubkeysBytes := make([][]byte, 0, len(retainPubkeys)) + + for _, pubkey := range retainPubkeys { + pubkeyBytes := make([]byte, len(pubkey)) + copy(pubkeyBytes, pubkey[:]) + + retainPubkeysBytes = append(retainPubkeysBytes, pubkeyBytes) + } + + queryVals = append(queryVals, retainPubkeysBytes) } if e := log.Trace(); e.Enabled() { diff --git a/services/chaindb/postgresql/validators.go b/services/chaindb/postgresql/validators.go index 573be6b..a755f4f 100644 --- a/services/chaindb/postgresql/validators.go +++ b/services/chaindb/postgresql/validators.go @@ -716,7 +716,7 @@ func validatorBalanceFromRow(rows pgx.Rows) (*chaindb.ValidatorBalance, error) { } // PruneValidatorBalances prunes validator balances up to (but not including) the given epoch. -func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error { +func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retainPubkeys []phase0.BLSPubKey) error { ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorBalances") defer span.End() @@ -731,15 +731,27 @@ func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, r queryBuilder.WriteString(` DELETE FROM t_validator_balances -WHERE f_epoch <= $1 +USING t_validators +WHERE t_validator_balances.f_validator_index = t_validators.f_index +AND f_epoch <= $1 `) queryVals = append(queryVals, to) - if len(retain) > 0 { + if len(retainPubkeys) > 0 { queryBuilder.WriteString(` -AND f_validator_index NOT IN($2) +AND NOT (t_validators.f_public_key = ANY($2)) `) - queryVals = append(queryVals, retain) + + retainPubkeysBytes := make([][]byte, 0, len(retainPubkeys)) + + for _, pubkey := range retainPubkeys { + pubkeyBytes := make([]byte, len(pubkey)) + copy(pubkeyBytes, pubkey[:]) + + retainPubkeysBytes = append(retainPubkeysBytes, pubkeyBytes) + } + + queryVals = append(queryVals, retainPubkeysBytes) } if e := log.Trace(); e.Enabled() { diff --git a/services/chaindb/service.go b/services/chaindb/service.go index c780e9a..4142cdc 100644 --- a/services/chaindb/service.go +++ b/services/chaindb/service.go @@ -343,7 +343,7 @@ type AggregateValidatorBalancesProvider interface { // ValidatorBalancesPruner defines functions to prune validator balances. type ValidatorBalancesPruner interface { // PruneValidatorBalances prunes validator balances up to (but not including) the given epoch. - PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error + PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error } // ValidatorsSetter defines functions to create and update validator information. @@ -410,7 +410,7 @@ type ValidatorEpochSummariesProvider interface { // ValidatorEpochSummariesPruner defines functions to prune validator epoch summaries. type ValidatorEpochSummariesPruner interface { // PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point. - PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error + PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error } // ValidatorEpochSummariesSetter defines functions to create and update validator epoch summaries. diff --git a/services/summarizer/standard/handler.go b/services/summarizer/standard/handler.go index dd93c8c..0173766 100644 --- a/services/summarizer/standard/handler.go +++ b/services/summarizer/standard/handler.go @@ -222,7 +222,7 @@ func (s *Service) summarizeValidators(ctx context.Context, targetEpoch phase0.Ep log.Trace().Uint64("first_epoch", uint64(firstEpoch)).Uint64("target_epoch", uint64(targetEpoch)).Msg("Validators catchup bounds") for epoch := firstEpoch; epoch <= targetEpoch; epoch++ { - log.Trace().Uint64("epoch", uint64(epoch)).Msg("Summarizing epoch") + log.Trace().Uint64("epoch", uint64(epoch)).Msg("Summarizing validators in epoch") if err := s.summarizeValidatorsInEpoch(ctx, md, epoch); err != nil { return errors.Wrap(err, fmt.Sprintf("failed to update validator summaries in epoch %d", epoch)) } diff --git a/services/summarizer/standard/parameters.go b/services/summarizer/standard/parameters.go index 7e33acf..6c2a7a3 100644 --- a/services/summarizer/standard/parameters.go +++ b/services/summarizer/standard/parameters.go @@ -14,7 +14,10 @@ package standard import ( + "encoding/hex" "errors" + "github.com/attestantio/go-eth2-client/spec/phase0" + "strings" eth2client "github.com/attestantio/go-eth2-client" "github.com/rs/zerolog" @@ -32,6 +35,7 @@ type parameters struct { epochSummaries bool blockSummaries bool validatorSummaries bool + validatorRetainPubkeys []phase0.BLSPubKey validatorEpochRetention string maxDaysPerRun uint64 validatorBalanceRetention string @@ -104,6 +108,25 @@ func WithValidatorSummaries(enabled bool) Parameter { }) } +// WithvalidatorRetainPubkeys states if the module should retain balance and epoch summaries for a subset of validator. +func WithvalidatorRetainPubkeys(validatorRetainPubkeys []string) Parameter { + return parameterFunc(func(p *parameters) { + var validatorRetainPubkeysTyped []phase0.BLSPubKey + for _, stringPubkey := range validatorRetainPubkeys { + stringPubkey = strings.TrimPrefix(stringPubkey, "0x") + + bytes, _ := hex.DecodeString(stringPubkey) + + var pubKey phase0.BLSPubKey + copy(pubKey[:], bytes) + + validatorRetainPubkeysTyped = append(validatorRetainPubkeysTyped, pubKey) + + } + p.validatorRetainPubkeys = validatorRetainPubkeysTyped + }) +} + // WithMaxDaysPerRun provides the maximum number of days to process in a single run of the summarizer. func WithMaxDaysPerRun(maxDaysPerRun uint64) Parameter { return parameterFunc(func(p *parameters) { diff --git a/services/summarizer/standard/prune.go b/services/summarizer/standard/prune.go index 529ea76..22970d9 100644 --- a/services/summarizer/standard/prune.go +++ b/services/summarizer/standard/prune.go @@ -70,7 +70,7 @@ func (s *Service) pruneBalances(ctx context.Context, summaryEpoch phase0.Epoch) return errors.Wrap(err, "failed to begin transaction to prune validator balances") } - if err := s.chainDB.(chaindb.ValidatorBalancesPruner).PruneValidatorBalances(ctx, pruneEpoch, nil); err != nil { + if err := s.chainDB.(chaindb.ValidatorBalancesPruner).PruneValidatorBalances(ctx, pruneEpoch, s.validatorRetainPubkeys); err != nil { cancel() return errors.Wrap(err, "failed to prune validator balances") } @@ -114,7 +114,7 @@ func (s *Service) pruneEpochs(ctx context.Context, summaryEpoch phase0.Epoch) er return errors.Wrap(err, "failed to begin transaction to prune validator epoch summaries") } - if err := s.chainDB.(chaindb.ValidatorEpochSummariesPruner).PruneValidatorEpochSummaries(ctx, pruneEpoch, nil); err != nil { + if err := s.chainDB.(chaindb.ValidatorEpochSummariesPruner).PruneValidatorEpochSummaries(ctx, pruneEpoch, s.validatorRetainPubkeys); err != nil { cancel() return errors.Wrap(err, "failed to prune validator epoch summaries") } diff --git a/services/summarizer/standard/service.go b/services/summarizer/standard/service.go index 8a38b2d..2656176 100644 --- a/services/summarizer/standard/service.go +++ b/services/summarizer/standard/service.go @@ -50,6 +50,7 @@ type Service struct { blockSummaries bool validatorSummaries bool maxDaysPerRun uint64 + validatorRetainPubkeys []phase0.BLSPubKey validatorEpochRetention *util.CalendarDuration validatorBalanceRetention *util.CalendarDuration activitySem *semaphore.Weighted @@ -172,6 +173,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { blockSummaries: parameters.blockSummaries, validatorSummaries: parameters.validatorSummaries, maxDaysPerRun: parameters.maxDaysPerRun, + validatorRetainPubkeys: parameters.validatorRetainPubkeys, validatorEpochRetention: validatorEpochRetention, validatorBalanceRetention: validatorBalanceRetention, activitySem: semaphore.NewWeighted(1),