Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nexus): msg dispatcher for routing messages from the wasm connection router to the nexus module #2000

11 changes: 5 additions & 6 deletions x/axelarnet/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
ibctypes "github.com/cosmos/ibc-go/v4/modules/apps/transfer/types"
clienttypes "github.com/cosmos/ibc-go/v4/modules/core/02-client/types"
channeltypes "github.com/cosmos/ibc-go/v4/modules/core/04-channel/types"
ibcclient "github.com/cosmos/ibc-go/v4/modules/core/exported"
ibcexported "github.com/cosmos/ibc-go/v4/modules/core/exported"
ibc "github.com/cosmos/ibc-go/v4/modules/core/exported"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"

Expand Down Expand Up @@ -89,15 +88,15 @@ type IBCTransferKeeper interface {
// ChannelKeeper defines the expected IBC channel keeper
type ChannelKeeper interface {
GetNextSequenceSend(ctx sdk.Context, portID, channelID string) (uint64, bool)
GetChannelClientState(ctx sdk.Context, portID, channelID string) (string, ibcclient.ClientState, error)
GetChannelClientState(ctx sdk.Context, portID, channelID string) (string, ibc.ClientState, error)

GetChannel(ctx sdk.Context, srcPort string, srcChan string) (channel channeltypes.Channel, found bool) // used in module_test
SendPacket(ctx sdk.Context, channelCap *capabilitytypes.Capability, packet ibcexported.PacketI) error // used in module_test
SendPacket(ctx sdk.Context, channelCap *capabilitytypes.Capability, packet ibc.PacketI) error // used in module_test
WriteAcknowledgement(
ctx sdk.Context,
chanCap *capabilitytypes.Capability,
packet ibcexported.PacketI,
ack ibcexported.Acknowledgement,
packet ibc.PacketI,
ack ibc.Acknowledgement,
) error
GetAppVersion(ctx sdk.Context, portID string, channelID string) (string, bool) // used in module_test
}
Expand Down
48 changes: 48 additions & 0 deletions x/nexus/keeper/general_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/query"

"github.com/axelarnetwork/axelar-core/utils"
Expand Down Expand Up @@ -35,6 +36,53 @@ func (k Keeper) GenerateMessageID(ctx sdk.Context) (string, []byte, uint64) {
return fmt.Sprintf("0x%s-%d", hex.EncodeToString(hash[:]), nonce), hash[:], nonce
}

// SetNewMessageFromWasm sets the given general message from a wasm contract.
func (k Keeper) SetNewMessageFromWasm(ctx sdk.Context, msg exported.GeneralMessage) error {
if msg.Asset != nil {
return fmt.Errorf("asset transfer is not supported")
}

if _, ok := k.GetChain(ctx, msg.GetDestinationChain()); !ok {
return fmt.Errorf("destination chain %s is not a registered chain", msg.GetDestinationChain())
}

if !k.IsChainActivated(ctx, msg.Recipient.Chain) {
return fmt.Errorf("destination chain %s is not activated", msg.GetDestinationChain())
}

if err := k.ValidateAddress(ctx, msg.Recipient); err != nil {
return sdkerrors.Wrap(err, "invalid recipient address")
}

if _, ok := k.GetMessage(ctx, msg.ID); ok {
return fmt.Errorf("general message %s already exists", msg.ID)
}

if err := k.setMessage(ctx, msg); err != nil {
return err
}

switch msg.Status {
case exported.Approved:
funcs.MustNoErr(ctx.EventManager().EmitTypedEvent(&types.MessageReceived{
ID: msg.ID,
PayloadHash: msg.PayloadHash,
Sender: msg.Sender,
Recipient: msg.Recipient,
}))
case exported.Processing:
if err := k.setProcessingMessageID(ctx, msg); err != nil {
return err
}

funcs.MustNoErr(ctx.EventManager().EmitTypedEvent(&types.MessageProcessing{ID: msg.ID}))
cgorenflo marked this conversation as resolved.
Show resolved Hide resolved
default:
return fmt.Errorf("invalid message status %s", msg.Status)
}

return nil
}

// SetNewMessage sets the given general message. If the messages is approved, adds the message ID to approved messages store
func (k Keeper) SetNewMessage(ctx sdk.Context, m exported.GeneralMessage) error {
sourceChain, ok := k.GetChain(ctx, m.GetSourceChain())
Expand Down
120 changes: 119 additions & 1 deletion x/nexus/keeper/general_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/axelarnetwork/axelar-core/testutils/rand"
"github.com/axelarnetwork/axelar-core/utils"
axelarnet "github.com/axelarnetwork/axelar-core/x/axelarnet/exported"
evm "github.com/axelarnetwork/axelar-core/x/evm/exported"
evmtypes "github.com/axelarnetwork/axelar-core/x/evm/types"
evmtestutils "github.com/axelarnetwork/axelar-core/x/evm/types/testutils"
"github.com/axelarnetwork/axelar-core/x/nexus/exported"
Expand All @@ -24,6 +25,124 @@ import (
. "github.com/axelarnetwork/utils/test"
)

func randMsgFromWasm(status exported.GeneralMessage_Status) exported.GeneralMessage {
return exported.GeneralMessage{
ID: rand.NormalizedStr(10),
Sender: exported.CrossChainAddress{
Chain: nexustestutils.RandomChain(),
Address: rand.NormalizedStr(42),
},
Recipient: exported.CrossChainAddress{
Chain: evm.Ethereum,
Address: evmtestutils.RandomAddress().Hex(),
},
PayloadHash: evmtestutils.RandomHash().Bytes(),
Status: status,
Asset: nil,
SourceTxID: evmtestutils.RandomHash().Bytes(),
SourceTxIndex: uint64(rand.I64Between(0, 100)),
}
}

func TestSetNewMessageFromWasm(t *testing.T) {
var (
msg exported.GeneralMessage
ctx sdk.Context
keeper nexus.Keeper
)

cfg := app.MakeEncodingConfig()
givenKeeper := Given("the keeper", func() {
keeper, ctx = setup(cfg)
})

givenKeeper.
When("the message is valid", func() {
msg = randMsgFromWasm(exported.Approved)
}).
Branch(
When("the message contains token transfer", func() {
coin := rand.Coin()
msg.Asset = &coin
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "asset transfer is not supported")
}),

When("the destination chain is not registered", func() {
msg.Recipient.Chain = nexustestutils.RandomChain()
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "is not a registered chain")
}),

When("the destination chain is not activated", func() {
keeper.DeactivateChain(ctx, msg.Recipient.Chain)
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "is not activated")
}),

When("the recipient address is invalid", func() {
msg.Recipient.Address = rand.Str(20)
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "invalid recipient address")
}),

When("the message already exists", func() {
keeper.SetNewMessageFromWasm(ctx, msg)
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "already exists")
}),

When("the message is invalid", func() {
msg.Sender.Address = ""
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "invalid source chain: invalid address: string is empty")
}),

When("the message status is invalid", func() {
msg.Status = exported.Failed
}).
Then("should return error", func(t *testing.T) {
assert.ErrorContains(t, keeper.SetNewMessageFromWasm(ctx, msg), "invalid message status")
}),
).
Run(t)

givenKeeper.
Branch(
When("the message status is approved", func() {
msg = randMsgFromWasm(exported.Approved)
}).
Then("should be stored as approved and emit MessageReceived event", func(t *testing.T) {
assert.NoError(t, keeper.SetNewMessageFromWasm(ctx, msg))

actual, ok := keeper.GetMessage(ctx, msg.ID)
assert.True(t, ok)
assert.Equal(t, msg, actual)
assert.Equal(t, "axelar.nexus.v1beta1.MessageReceived", ctx.EventManager().Events()[len(ctx.EventManager().Events())-1].Type)
}),

When("the message status is processing", func() {
msg = randMsgFromWasm(exported.Processing)
}).
Then("should be stored as processing and emit MessageProcessing event", func(t *testing.T) {
assert.NoError(t, keeper.SetNewMessageFromWasm(ctx, msg))

actual, ok := keeper.GetMessage(ctx, msg.ID)
assert.True(t, ok)
assert.Equal(t, msg, actual)
assert.Equal(t, "axelar.nexus.v1beta1.MessageProcessing", ctx.EventManager().Events()[len(ctx.EventManager().Events())-1].Type)
assert.Equal(t, msg, keeper.GetProcessingMessages(ctx, msg.GetDestinationChain(), 1)[0])
}),
).
Run(t)
}

func TestSetNewGeneralMessage(t *testing.T) {
var (
generalMessage exported.GeneralMessage
Expand Down Expand Up @@ -53,7 +172,6 @@ func TestSetNewGeneralMessage(t *testing.T) {
PayloadHash: crypto.Keccak256Hash(rand.Bytes(int(rand.I64Between(1, 100)))).Bytes(),
Asset: &asset,
}

})

whenChainsAreRegistered := givenContractCallEvent.
Expand Down
97 changes: 97 additions & 0 deletions x/nexus/keeper/msg_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package keeper

import (
"encoding/json"
"fmt"

wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper"
wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
wasmvmtypes "github.com/CosmWasm/wasmvm/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"

axelarnet "github.com/axelarnetwork/axelar-core/x/axelarnet/exported"
"github.com/axelarnetwork/axelar-core/x/nexus/exported"
"github.com/axelarnetwork/axelar-core/x/nexus/types"
tss "github.com/axelarnetwork/axelar-core/x/tss/exported"
)

var _ wasmkeeper.Messenger = (*Messenger)(nil)

type message struct {
SenderChain exported.ChainName `json:"sender_chain"`
SenderAddress string `json:"sender_address"`
RecipientChain exported.ChainName `json:"recipient_chain"`
RecipientAddress string `json:"recipient_address"`
PayloadHash []byte `json:"payload_hash"`
SourceTxID []byte `json:"source_tx_id"`
SourceTxIndex uint64 `json:"source_tx_index"`
}

type request = []message

type Messenger struct {
types.Nexus
}

// NewMessenger returns a new Messenger
func NewMessenger(nexus types.Nexus) Messenger {
return Messenger{nexus}
}

// DispatchMsg decodes the messages from the cosmowasm connection router and routes them to the nexus module if possible
func (m Messenger) DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, _ string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error) {
fish-sammy marked this conversation as resolved.
Show resolved Hide resolved
req := request{}
if err := json.Unmarshal(msg.Custom, &req); err != nil {
return nil, nil, sdkerrors.Wrap(wasmtypes.ErrUnknownMsg, err.Error())
}

connectionRouter := m.GetParams(ctx).ConnectionRouter

if len(connectionRouter) == 0 {
return nil, nil, fmt.Errorf("connection router is not set")
}

if !connectionRouter.Equals(contractAddr) {
return nil, nil, fmt.Errorf("contract address %s is not the connection router", contractAddr)
}

// TODO: consider routing messages that can be routed instead of failing the
// whole batch whenever one message fails and returning the ones that
// succeeded/failed in the response
// TODO: consider handling only one message at a time instead of a batch
fish-sammy marked this conversation as resolved.
Show resolved Hide resolved
for _, msg := range req {
recipientChain, ok := m.GetChain(ctx, msg.RecipientChain)
if !ok {
return nil, nil, fmt.Errorf("recipient chain %s is not a registered chain", msg.RecipientChain)
}

msgID, _, _ := m.GenerateMessageID(ctx)
senderChain := exported.Chain{Name: msg.SenderChain, SupportsForeignAssets: false, KeyType: tss.None, Module: wasmtypes.ModuleName}
sender := exported.CrossChainAddress{Chain: senderChain, Address: msg.SenderAddress}
recipient := exported.CrossChainAddress{Chain: recipientChain, Address: msg.RecipientAddress}

status := exported.Approved
if !recipientChain.IsFrom(axelarnet.ModuleName) {
status = exported.Processing
}
msg := exported.NewGeneralMessage(
msgID,
sender,
recipient,
msg.PayloadHash,
status,
msg.SourceTxID,
msg.SourceTxIndex,
nil,
)

if err := m.Nexus.SetNewMessageFromWasm(ctx, msg); err != nil {
return nil, nil, err
}

}

// TODO: return events
fish-sammy marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil, nil
}
Loading
Loading