Skip to content

Commit

Permalink
refactor(nexus): add the new methods for storing general messages in …
Browse files Browse the repository at this point in the history
…Nexus and set them to be processing (#2006)

* refactor(nexus): add the new methods for storing general messages in Nexus and set them to be processing

* add tests

* add more comments
  • Loading branch information
fish-sammy authored Oct 25, 2023
1 parent c4dd592 commit 1d0bf42
Show file tree
Hide file tree
Showing 3 changed files with 384 additions and 19 deletions.
117 changes: 103 additions & 14 deletions x/nexus/keeper/general_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"

"github.com/CosmWasm/wasmd/x/wasm"
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -37,6 +38,7 @@ func (k Keeper) GenerateMessageID(ctx sdk.Context) (string, []byte, uint64) {
}

// SetNewWasmMessage sets the given general message from a wasm contract.
// Deprecated: use SetNewMessage_ instead
func (k Keeper) SetNewWasmMessage(ctx sdk.Context, msg exported.GeneralMessage) error {
if msg.Asset != nil {
return fmt.Errorf("asset transfer is not supported")
Expand Down Expand Up @@ -84,6 +86,7 @@ func (k Keeper) SetNewWasmMessage(ctx sdk.Context, msg exported.GeneralMessage)
}

// SetNewMessage sets the given general message. If the messages is approved, adds the message ID to approved messages store
// Deprecated: use SetNewMessage_ instead
func (k Keeper) SetNewMessage(ctx sdk.Context, m exported.GeneralMessage) error {
sourceChain, ok := k.GetChain(ctx, m.GetSourceChain())
if !ok {
Expand All @@ -104,11 +107,11 @@ func (k Keeper) SetNewMessage(ctx sdk.Context, m exported.GeneralMessage) error
}

if m.Asset != nil {
if err := k.validateTransferAsset(ctx, sourceChain, m.Asset.Denom); err != nil {
if err := k.validateAsset(ctx, sourceChain, m.Asset.Denom); err != nil {
return err
}

if err := k.validateTransferAsset(ctx, destChain, m.Asset.Denom); err != nil {
if err := k.validateAsset(ctx, destChain, m.Asset.Denom); err != nil {
return err
}
}
Expand Down Expand Up @@ -142,6 +145,7 @@ func (k Keeper) SetNewMessage(ctx sdk.Context, m exported.GeneralMessage) error
*/

// SetMessageProcessing sets the general message as processing
// Deprecated: use SetMessageProcessing_ instead
func (k Keeper) SetMessageProcessing(ctx sdk.Context, id string) error {
m, found := k.GetMessage(ctx, id)
if !found {
Expand Down Expand Up @@ -174,7 +178,7 @@ func (k Keeper) SetMessageExecuted(ctx sdk.Context, id string) error {
return fmt.Errorf("general message is not processing")
}

k.deleteSentMessageID(ctx, m)
k.deleteProcessingMessageID(ctx, m)

m.Status = exported.Executed

Expand All @@ -194,7 +198,7 @@ func (k Keeper) SetMessageFailed(ctx sdk.Context, id string) error {
return fmt.Errorf("general message is not processing")
}

k.deleteSentMessageID(ctx, m)
k.deleteProcessingMessageID(ctx, m)

m.Status = exported.Failed

Expand All @@ -203,15 +207,6 @@ func (k Keeper) SetMessageFailed(ctx sdk.Context, id string) error {
return k.setMessage(ctx, m)
}

// DeleteMessage deletes the general message with associated ID, and also deletes the message ID from the approved messages store
func (k Keeper) DeleteMessage(ctx sdk.Context, id string) {
m, found := k.GetMessage(ctx, id)
if found {
k.deleteSentMessageID(ctx, m)
k.getStore(ctx).DeleteNew(getMessageKey(id))
}
}

// GetMessage returns the general message by ID
func (k Keeper) GetMessage(ctx sdk.Context, id string) (m exported.GeneralMessage, found bool) {
return m, k.getStore(ctx).GetNew(getMessageKey(id), &m)
Expand All @@ -227,10 +222,11 @@ func (k Keeper) setProcessingMessageID(ctx sdk.Context, m exported.GeneralMessag
}

k.getStore(ctx).SetRawNew(getProcessingMessageKey(m.GetDestinationChain(), m.ID), []byte(m.ID))

return nil
}

func (k Keeper) deleteSentMessageID(ctx sdk.Context, m exported.GeneralMessage) {
func (k Keeper) deleteProcessingMessageID(ctx sdk.Context, m exported.GeneralMessage) {
k.getStore(ctx).DeleteNew(getProcessingMessageKey(m.GetDestinationChain(), m.ID))
}

Expand Down Expand Up @@ -272,3 +268,96 @@ func (k Keeper) GetProcessingMessages(ctx sdk.Context, chain exported.ChainName,
return funcs.MustOk(k.GetMessage(ctx, id))
})
}

// SetNewMessage_ sets the given general messsage as approved
func (k Keeper) SetNewMessage_(ctx sdk.Context, msg exported.GeneralMessage) error {
if _, ok := k.GetMessage(ctx, msg.ID); ok {
return fmt.Errorf("general message %s already exists", msg.ID)
}

if !msg.Is(exported.Approved) {
return fmt.Errorf("new general message has to be approved")
}

funcs.MustNoErr(ctx.EventManager().EmitTypedEvent(&types.MessageReceived{
ID: msg.ID,
PayloadHash: msg.PayloadHash,
Sender: msg.Sender,
Recipient: msg.Recipient,
}))

return k.setMessage(ctx, msg)
}

// SetMessageProcessing_ sets the given general message as processing and perform
// validations on the message
func (k Keeper) SetMessageProcessing_(ctx sdk.Context, id string) error {
msg, ok := k.GetMessage(ctx, id)
if !ok {
return fmt.Errorf("general message %s not found", id)
}

if !(msg.Is(exported.Approved) || msg.Is(exported.Failed)) {
return fmt.Errorf("general message has to be approved or failed")
}

if err := k.validateMessage(ctx, msg); err != nil {
return err
}

msg.Status = exported.Processing
if err := k.setMessage(ctx, msg); err != nil {
return err
}

funcs.MustNoErr(k.setProcessingMessageID(ctx, msg))
funcs.MustNoErr(ctx.EventManager().EmitTypedEvent(&types.MessageProcessing{ID: msg.ID}))

return nil
}

func (k Keeper) validateMessage(ctx sdk.Context, msg exported.GeneralMessage) error {
// only validate sender and asset if it's not from wasm.
// the nexus module doesn't know how to validate wasm chains and addresses.
if !msg.Sender.Chain.IsFrom(wasm.ModuleName) {
if err := k.validateAddressAndAsset(ctx, msg.Sender, msg.Asset); err != nil {
return err
}
}

// only validate recipient and asset if it's not to wasm.
// the nexus module doesn't know how to validate wasm chains and addresses.
if !msg.Recipient.Chain.IsFrom(wasm.ModuleName) {
if err := k.validateAddressAndAsset(ctx, msg.Recipient, msg.Asset); err != nil {
return err
}
}

// asset is not supported for wasm messages
if (msg.Sender.Chain.IsFrom(wasm.ModuleName) || msg.Recipient.Chain.IsFrom(wasm.ModuleName)) && msg.Asset != nil {
return fmt.Errorf("asset transfer is not supported for wasm messages")
}

return nil
}

// validateAddressAndAsset validates 1) chain existence, 2) chain activation, 3) address, 4) asset
func (k Keeper) validateAddressAndAsset(ctx sdk.Context, address exported.CrossChainAddress, asset *sdk.Coin) error {
if _, ok := k.GetChain(ctx, address.Chain.Name); !ok {
return fmt.Errorf("chain %s is not registered", address.Chain.Name)
}

if !k.IsChainActivated(ctx, address.Chain) {
return fmt.Errorf("chain %s is not activated", address.Chain.Name)
}

if err := k.ValidateAddress(ctx, address); err != nil {
return err
}

if asset == nil {
return nil
}

return k.validateAsset(ctx, address.Chain, asset.Denom)
}
Loading

0 comments on commit 1d0bf42

Please sign in to comment.