Skip to content

Commit

Permalink
🎨 review related tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
aBMania committed Mar 1, 2024
1 parent 728083b commit 1c26370
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 37 deletions.
24 changes: 23 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package main

import (
"context"
"encoding/hex"
"fmt"
"github.com/attestantio/go-eth2-client/spec/phase0"
"net/http"

// #nosec G108
Expand Down Expand Up @@ -566,6 +568,26 @@ func startSummarizer(
return nil, nil
}

validatorRetainPubkeys := viper.GetStringSlice("summarizer.validators.retain")

var validatorRetain []phase0.BLSPubKey

for _, pubkey := range validatorRetainPubkeys {
pubkey = strings.TrimPrefix(pubkey, "0x")

bytes, err := hex.DecodeString(pubkey)

if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("failed to parse pubkey %v", pubkey))
}

var pubKey phase0.BLSPubKey
copy(pubKey[:], bytes)

validatorRetain = append(validatorRetain, pubKey)

}

standardSummarizer, err := standardsummarizer.New(ctx,
standardsummarizer.WithLogLevel(util.LogLevel("summarizer")),
standardsummarizer.WithMonitor(monitor),
Expand All @@ -578,7 +600,7 @@ func startSummarizer(
standardsummarizer.WithMaxDaysPerRun(viper.GetUint64("summarizer.max-days-per-run")),
standardsummarizer.WithValidatorEpochRetention(viper.GetString("summarizer.validators.epoch-retention")),
standardsummarizer.WithValidatorBalanceRetention(viper.GetString("summarizer.validators.balance-retention")),
standardsummarizer.WithvalidatorRetainPubkeys(viper.GetStringSlice("summarizer.validators.retain")),
standardsummarizer.WithValidatorRetain(validatorRetain),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create summarizer service")
Expand Down
18 changes: 12 additions & 6 deletions services/chaindb/postgresql/validatorepochsummaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ WHERE f_validator_index = $1
}

// PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point.
func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retainPubkeys []phase0.BLSPubKey) error {
func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error {
ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorEpochSummaries")
defer span.End()

Expand All @@ -533,27 +533,33 @@ func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Ep

queryBuilder.WriteString(`
DELETE FROM t_validator_epoch_summaries
`)
if len(retain) > 0 {
queryBuilder.WriteString(`
USING t_validators
`)
}
queryBuilder.WriteString(`
WHERE t_validator_epoch_summaries.f_validator_index = t_validators.f_index
AND f_epoch <= $1
`)
queryVals = append(queryVals, to)

if len(retainPubkeys) > 0 {
if len(retain) > 0 {
queryBuilder.WriteString(`
AND NOT (t_validators.f_public_key = ANY($2))
`)

retainPubkeysBytes := make([][]byte, 0, len(retainPubkeys))
pubkeysBytes := make([][]byte, 0, len(retain))

for _, pubkey := range retainPubkeys {
for _, pubkey := range retain {
pubkeyBytes := make([]byte, len(pubkey))
copy(pubkeyBytes, pubkey[:])

retainPubkeysBytes = append(retainPubkeysBytes, pubkeyBytes)
pubkeysBytes = append(pubkeysBytes, pubkeyBytes)
}

queryVals = append(queryVals, retainPubkeysBytes)
queryVals = append(queryVals, pubkeysBytes)
}

if e := log.Trace(); e.Enabled() {
Expand Down
18 changes: 12 additions & 6 deletions services/chaindb/postgresql/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func validatorBalanceFromRow(rows pgx.Rows) (*chaindb.ValidatorBalance, error) {
}

// PruneValidatorBalances prunes validator balances up to (but not including) the given epoch.
func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retainPubkeys []phase0.BLSPubKey) error {
func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error {
ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorBalances")
defer span.End()

Expand All @@ -731,27 +731,33 @@ func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, r

queryBuilder.WriteString(`
DELETE FROM t_validator_balances
`)
if len(retain) > 0 {
queryBuilder.WriteString(`
USING t_validators
`)
}
queryBuilder.WriteString(`
WHERE t_validator_balances.f_validator_index = t_validators.f_index
AND f_epoch <= $1
`)
queryVals = append(queryVals, to)

if len(retainPubkeys) > 0 {
if len(retain) > 0 {
queryBuilder.WriteString(`
AND NOT (t_validators.f_public_key = ANY($2))
`)

retainPubkeysBytes := make([][]byte, 0, len(retainPubkeys))
pubkeysBytes := make([][]byte, 0, len(retain))

for _, pubkey := range retainPubkeys {
for _, pubkey := range retain {
pubkeyBytes := make([]byte, len(pubkey))
copy(pubkeyBytes, pubkey[:])

retainPubkeysBytes = append(retainPubkeysBytes, pubkeyBytes)
pubkeysBytes = append(pubkeysBytes, pubkeyBytes)
}

queryVals = append(queryVals, retainPubkeysBytes)
queryVals = append(queryVals, pubkeysBytes)
}

if e := log.Trace(); e.Enabled() {
Expand Down
25 changes: 5 additions & 20 deletions services/summarizer/standard/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
package standard

import (
"encoding/hex"
"errors"
"github.com/attestantio/go-eth2-client/spec/phase0"
"strings"

eth2client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/rs/zerolog"
"github.com/wealdtech/chaind/services/chaindb"
"github.com/wealdtech/chaind/services/chaintime"
Expand All @@ -35,7 +32,7 @@ type parameters struct {
epochSummaries bool
blockSummaries bool
validatorSummaries bool
validatorRetainPubkeys []phase0.BLSPubKey
validatorRetain []phase0.BLSPubKey
validatorEpochRetention string
maxDaysPerRun uint64
validatorBalanceRetention string
Expand Down Expand Up @@ -108,22 +105,10 @@ func WithValidatorSummaries(enabled bool) Parameter {
})
}

// WithvalidatorRetainPubkeys states if the module should retain balance and epoch summaries for a subset of validator.
func WithvalidatorRetainPubkeys(validatorRetainPubkeys []string) Parameter {
// WithValidatorRetain states if the module should retain balance and epoch summaries for a subset of validator.
func WithValidatorRetain(validatorRetain []phase0.BLSPubKey) Parameter {
return parameterFunc(func(p *parameters) {
var validatorRetainPubkeysTyped []phase0.BLSPubKey
for _, stringPubkey := range validatorRetainPubkeys {
stringPubkey = strings.TrimPrefix(stringPubkey, "0x")

bytes, _ := hex.DecodeString(stringPubkey)

var pubKey phase0.BLSPubKey
copy(pubKey[:], bytes)

validatorRetainPubkeysTyped = append(validatorRetainPubkeysTyped, pubKey)

}
p.validatorRetainPubkeys = validatorRetainPubkeysTyped
p.validatorRetain = validatorRetain
})
}

Expand Down
4 changes: 2 additions & 2 deletions services/summarizer/standard/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Service) pruneBalances(ctx context.Context, summaryEpoch phase0.Epoch)
return errors.Wrap(err, "failed to begin transaction to prune validator balances")
}

if err := s.chainDB.(chaindb.ValidatorBalancesPruner).PruneValidatorBalances(ctx, pruneEpoch, s.validatorRetainPubkeys); err != nil {
if err := s.chainDB.(chaindb.ValidatorBalancesPruner).PruneValidatorBalances(ctx, pruneEpoch, s.validatorRetain); err != nil {
cancel()
return errors.Wrap(err, "failed to prune validator balances")
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *Service) pruneEpochs(ctx context.Context, summaryEpoch phase0.Epoch) er
return errors.Wrap(err, "failed to begin transaction to prune validator epoch summaries")
}

if err := s.chainDB.(chaindb.ValidatorEpochSummariesPruner).PruneValidatorEpochSummaries(ctx, pruneEpoch, s.validatorRetainPubkeys); err != nil {
if err := s.chainDB.(chaindb.ValidatorEpochSummariesPruner).PruneValidatorEpochSummaries(ctx, pruneEpoch, s.validatorRetain); err != nil {
cancel()
return errors.Wrap(err, "failed to prune validator epoch summaries")
}
Expand Down
4 changes: 2 additions & 2 deletions services/summarizer/standard/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Service struct {
blockSummaries bool
validatorSummaries bool
maxDaysPerRun uint64
validatorRetainPubkeys []phase0.BLSPubKey
validatorRetain []phase0.BLSPubKey
validatorEpochRetention *util.CalendarDuration
validatorBalanceRetention *util.CalendarDuration
activitySem *semaphore.Weighted
Expand Down Expand Up @@ -173,7 +173,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
blockSummaries: parameters.blockSummaries,
validatorSummaries: parameters.validatorSummaries,
maxDaysPerRun: parameters.maxDaysPerRun,
validatorRetainPubkeys: parameters.validatorRetainPubkeys,
validatorRetain: parameters.validatorRetain,
validatorEpochRetention: validatorEpochRetention,
validatorBalanceRetention: validatorBalanceRetention,
activitySem: semaphore.NewWeighted(1),
Expand Down

0 comments on commit 1c26370

Please sign in to comment.