Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decrease success rate if validator fails to relay #1295

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ func New(
consensusRegistry,
&app.TreasuryKeeper,
)
app.ConsensusKeeper.AddMessageConsensusAttestedListener(&app.MetrixKeeper)

app.EvmKeeper = *evmmodulekeeper.NewKeeper(
appCodec,
Expand Down
76 changes: 65 additions & 11 deletions x/consensus/keeper/concensus_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/palomachain/paloma/x/consensus/keeper/filters"
"github.com/palomachain/paloma/x/consensus/types"
evmtypes "github.com/palomachain/paloma/x/evm/types"
metrixtypes "github.com/palomachain/paloma/x/metrix/types"
valsettypes "github.com/palomachain/paloma/x/valset/types"
)

Expand Down Expand Up @@ -287,35 +288,88 @@ func (k Keeper) GetMessagesFromQueue(ctx context.Context, queueTypeName string,
// Any message that actually reached consensus will be removed from the queue during
// attestation, other messages like superfluous valset updates will get removed
// in their respective logic flows, but none of them should be using this function.
func (k Keeper) PruneJob(ctx sdk.Context, queueTypeName string, id uint64) (err error) {
if err := k.jailValidatorsWhichMissedAttestation(ctx, queueTypeName, id); err != nil {
liblog.FromSDKLogger(k.Logger(ctx)).
func (k Keeper) PruneJob(sdkCtx sdk.Context, queueTypeName string, id uint64) error {
err := k.jailValidatorsIfNecessary(sdkCtx, queueTypeName, id)
if err != nil {
liblog.FromSDKLogger(k.Logger(sdkCtx)).
WithError(err).
WithFields("msg-id", id).
WithFields("queue-type-name", queueTypeName).
Error("Failed to jail validators that missed attestation.")
Error("Failed to jail validators.")
}

return k.DeleteJob(ctx, queueTypeName, id)
return k.DeleteJob(sdkCtx, queueTypeName, id)
}

func (k Keeper) jailValidatorsWhichMissedAttestation(ctx sdk.Context, queueTypeName string, id uint64) error {
cq, err := k.getConsensusQueue(ctx, queueTypeName)
func (k Keeper) jailValidatorsIfNecessary(
sdkCtx sdk.Context,
queueTypeName string,
id uint64,
) error {
cq, err := k.getConsensusQueue(sdkCtx, queueTypeName)
if err != nil {
return fmt.Errorf("getConsensusQueue: %w", err)
}

msg, err := cq.GetMsgByID(ctx, id)
msg, err := cq.GetMsgByID(sdkCtx, id)
if err != nil {
return fmt.Errorf("getMsgByID: %w", err)
}

if msg.GetPublicAccessData() == nil && msg.GetErrorData() == nil {
// This message was never successfully handled, attestation flock
// should not be punished for this.
// The message was never delivered, so we need to update the validator
// metrics with a failure
return k.punishValidatorForMissingRelay(sdkCtx, msg)
}

// Otherwise, there was a delivery attempt, so only jail validators that
// missed attestation
return k.jailValidatorsWhichMissedAttestation(sdkCtx, msg)
}

func (k Keeper) punishValidatorForMissingRelay(
sdkCtx sdk.Context,
msg types.QueuedSignedMessageI,
) error {
if msg.GetGasEstimate() == 0 {
// If we don't have a gas estimate, this was probably not the
// validator's fault, so we do nothing
return nil
}

consensusMsg, err := msg.ConsensusMsg(k.cdc)
if err != nil {
return err
}

message, ok := consensusMsg.(*evmtypes.Message)
if !ok {
// If this is not a turnstone message, we don't want it
return nil
}

valAddr, err := sdk.ValAddressFromBech32(message.Assignee)
if err != nil {
return err
}

for _, v := range k.onMessageAttestedListeners {
v.OnConsensusMessageAttested(sdkCtx, metrixtypes.MessageAttestedEvent{
AssignedAtBlockHeight: message.AssignedAtBlockHeight,
HandledAtBlockHeight: math.NewInt(sdkCtx.BlockHeight()),
Assignee: valAddr,
MessageID: msg.GetId(),
WasRelayedSuccessfully: false,
})
}

return nil
}

func (k Keeper) jailValidatorsWhichMissedAttestation(
ctx sdk.Context,
msg types.QueuedSignedMessageI,
) error {
r, err := k.consensusChecker.VerifyEvidence(ctx,
slice.Map(msg.GetEvidence(), func(evidence *types.Evidence) libcons.Evidence {
return evidence
Expand Down Expand Up @@ -371,7 +425,7 @@ func (k Keeper) jailValidatorsWhichMissedAttestation(ctx sdk.Context, queueTypeN
// This validator is part of the active valset but did not supply evidence.
// That's not very nice. Let's jail them.
if err := k.valset.Jail(ctx, v.GetAddress(), fmt.Sprintf("No evidence supplied for contentious message %d", msg.GetId())); err != nil {
liblog.FromSDKLogger(k.Logger(ctx)).WithError(err).WithValidator(v.GetAddress().String()).WithFields("msg-id", id).Error("Failed to jail validator.")
liblog.FromSDKLogger(k.Logger(ctx)).WithError(err).WithValidator(v.GetAddress().String()).WithFields("msg-id", msg.GetId()).Error("Failed to jail validator.")
}
}
}
Expand Down
51 changes: 37 additions & 14 deletions x/consensus/keeper/concensus_keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func TestEndToEndTestingOfPuttingAndGettingMessagesOfTheConsensusQueue(t *testin
})
}

func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
func TestJailValidatorsIfNecessary(t *testing.T) {
queue := types.Queue(defaultQueueName, chainType, chainReferenceID)
keeper, ms, ctx := newConsensusKeeper(t)
msgType := &types.SimpleMessage{}
msgType := &evmtypes.Message{}
serializedTx, err := hex.DecodeString("02f87201108405f5e100850b68a0aa00825208941f9c2e67dbbe4c457a5e2be0bc31e67ce5953a2d87470de4df82000080c001a0e05de0771f8d577ec5aa440612c0e8f560d732d5162db0187cfaf56ac50c3716a0147565f4b0924a5adda25f55330c385448e0507d1219d4dac0950e2872682124")
require.NoError(t, err)

Expand All @@ -111,21 +111,24 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
)

t.Run("with unsupported queue name", func(t *testing.T) {
err := keeper.jailValidatorsWhichMissedAttestation(ctx, "no support", 0)
err := keeper.jailValidatorsIfNecessary(ctx, "no support", 0)
require.ErrorContains(t, err, "getConsensusQueue", "returns an error")
})

t.Run("with unknown message ID", func(t *testing.T) {
err := keeper.jailValidatorsWhichMissedAttestation(ctx, queue, 42)
err := keeper.jailValidatorsIfNecessary(ctx, queue, 42)
require.ErrorContains(t, err, "getMsgByID", "returns an error")
})

t.Run("with unknown message ID", func(t *testing.T) {
err := keeper.jailValidatorsWhichMissedAttestation(ctx, queue, 42)
err := keeper.jailValidatorsIfNecessary(ctx, queue, 42)
require.ErrorContains(t, err, "getMsgByID", "returns an error")
})

testMsg := types.SimpleMessage{Sender: "user", Hello: "foo", World: "bar"}
assignee, _ := sdk.ValAddressFromBech32("palomavaloper1tsu8nthuspe4zlkejtj3v27rtq8qz7q6983zt2")
testMsg := evmtypes.Message{
Assignee: assignee.String(),
}
t.Run("with message that actually forms consensus", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -159,7 +162,7 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(4000),
}, nil).Times(1)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.Error(t, err)
require.ErrorContains(t, err, "unexpected message with valid consensus found, skipping jailing steps")
})
Expand Down Expand Up @@ -198,9 +201,10 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(10000),
}, nil).Times(2)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})

t.Run("with expected validators missing", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -237,17 +241,35 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
ms.ValsetKeeper.On("Jail", mock.Anything, validators[0].GetAddress(), mock.Anything).Return(nil)
ms.ValsetKeeper.On("Jail", mock.Anything, validators[1].GetAddress(), mock.Anything).Return(nil)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})
t.Run("with neither error nor public access data set without gas estimate", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg,
&consensus.PutOptions{RequireGasEstimation: true})
require.NoError(t, err)

err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})
t.Run("with neither error nor public access data set", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, nil)
t.Run("with neither error nor public access data set with gas estimate", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg,
&consensus.PutOptions{RequireGasEstimation: true})
require.NoError(t, err)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
cq, err := keeper.getConsensusQueue(ctx, queue)
require.NoError(t, err)

err = cq.SetElectedGasEstimate(ctx, mID, 1)
require.NoError(t, err)

ms.MetrixKeeper.On("OnConsensusMessageAttested", mock.Anything, mock.Anything).Return(nil)

err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})
})

t.Run("with expected validators missing, but less than 10% share supplied evidence", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -281,10 +303,11 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(11000),
}, nil).Times(1)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.Error(t, err, "should return error")
require.ErrorContains(t, err, "message consensus failure likely caused by faulty response data")
})

t.Run("with expected validators missing, but 10% of share or more supplied evidence", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -322,7 +345,7 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(10000),
}, nil).Times(2)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should jail but not return error")
})
}
Expand Down
27 changes: 17 additions & 10 deletions x/consensus/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/palomachain/paloma/util/libcons"
"github.com/palomachain/paloma/util/liblog"
"github.com/palomachain/paloma/x/consensus/types"
metrixtypes "github.com/palomachain/paloma/x/metrix/types"
)

type FeeProvider interface {
Expand All @@ -31,10 +32,11 @@ type (

valset types.ValsetKeeper

registry *registry
evmKeeper types.EvmKeeper
consensusChecker *libcons.ConsensusChecker
feeProvider FeeProvider
registry *registry
evmKeeper types.EvmKeeper
consensusChecker *libcons.ConsensusChecker
feeProvider FeeProvider
onMessageAttestedListeners []metrixtypes.OnConsensusMessageAttestedListener
}
)

Expand All @@ -47,12 +49,13 @@ func NewKeeper(
fp FeeProvider,
) *Keeper {
k := &Keeper{
cdc: cdc,
storeKey: storeKey,
paramstore: ps,
valset: valsetKeeper,
registry: reg,
feeProvider: fp,
cdc: cdc,
storeKey: storeKey,
paramstore: ps,
valset: valsetKeeper,
registry: reg,
feeProvider: fp,
onMessageAttestedListeners: make([]metrixtypes.OnConsensusMessageAttestedListener, 0),
}
ider := keeperutil.NewIDGenerator(k, nil)
k.ider = ider
Expand All @@ -61,6 +64,10 @@ func NewKeeper(
return k
}

func (k *Keeper) AddMessageConsensusAttestedListener(l metrixtypes.OnConsensusMessageAttestedListener) {
k.onMessageAttestedListeners = append(k.onMessageAttestedListeners, l)
}

func (k Keeper) Logger(ctx context.Context) log.Logger {
sdkCtx := sdk.UnwrapSDKContext(ctx)
return liblog.FromSDKLogger(sdkCtx.Logger()).With("module", fmt.Sprintf("x/%s", types.ModuleName))
Expand Down
11 changes: 11 additions & 0 deletions x/consensus/keeper/keeper_setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (

type mockedServices struct {
ValsetKeeper *mocks.ValsetKeeper
MetrixKeeper *mocks.MetrixKeeper
}

func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) {
config := sdk.GetConfig()
config.SetBech32PrefixForAccount("paloma", "pub")
config.SetBech32PrefixForValidator("palomavaloper", "valoperpub")

logger := log.NewNopLogger()

storeKey := storetypes.NewKVStoreKey(types.StoreKey)
Expand Down Expand Up @@ -52,8 +57,12 @@ func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) {
memStoreKey,
"ConsensusParams",
)

metrixKeeper := mocks.NewMetrixKeeper(t)

ms := mockedServices{
ValsetKeeper: mocks.NewValsetKeeper(t),
MetrixKeeper: metrixKeeper,
}
k := NewKeeper(
appCodec,
Expand All @@ -64,6 +73,8 @@ func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) {
nil,
)

k.AddMessageConsensusAttestedListener(metrixKeeper)

ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, logger)

// Initialize params
Expand Down
6 changes: 6 additions & 0 deletions x/consensus/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
xchain "github.com/palomachain/paloma/internal/x-chain"
metrixtypes "github.com/palomachain/paloma/x/metrix/types"
valsettypes "github.com/palomachain/paloma/x/valset/types"
)

Expand Down Expand Up @@ -33,3 +34,8 @@ type ValsetKeeper interface {
type EvmKeeper interface {
PickValidatorForMessage(ctx context.Context, chainReferenceID string, requirements *xchain.JobRequirements) (string, string, error)
}

//go:generate mockery --name=MetrixKeeper
type MetrixKeeper interface {
OnConsensusMessageAttested(context.Context, metrixtypes.MessageAttestedEvent)
}
34 changes: 34 additions & 0 deletions x/consensus/types/mocks/MetrixKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading