Skip to content

Commit

Permalink
add mutex in websocket connection, in case of concurrent write (#887)
Browse files Browse the repository at this point in the history
* add mutex in websocket connection, in case of concurrent write

* adpat the header format

* inital refactor

* add mutex in EventSystem

Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com>
  • Loading branch information
Ray Green and KamiD authored May 27, 2021
1 parent f45594e commit 4b67a15
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 143 deletions.
96 changes: 8 additions & 88 deletions app/rpc/namespaces/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package filters
import (
"context"
"fmt"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/server"
Expand Down Expand Up @@ -42,27 +43,12 @@ type EventSystem struct {
// light client mode
lightMode bool

index filterIndex

// Subscriptions
txsSub *Subscription // Subscription for new transaction event
logsSub *Subscription // Subscription for new log event
// rmLogsSub *Subscription // Subscription for removed log event

pendingLogsSub *Subscription // Subscription for pending log event
chainSub *Subscription // Subscription for new chain event
index filterIndex
indexMux *sync.RWMutex

// Channels
install chan *Subscription // install filter for event notification
uninstall chan *Subscription // remove filter for event notification

// Unidirectional channels to receive Tendermint ResultEvents
txsCh <-chan coretypes.ResultEvent // Channel to receive new pending transactions event
logsCh <-chan coretypes.ResultEvent // Channel to receive new log event
pendingLogsCh <-chan coretypes.ResultEvent // Channel to receive new pending log event
// rmLogsCh <-chan coretypes.ResultEvent // Channel to receive removed log event

chainCh <-chan coretypes.ResultEvent // Channel to receive new chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -83,13 +69,9 @@ func NewEventSystem(client rpcclient.Client) *EventSystem {
channelLength: viper.GetInt(server.FlagWsSubChannelLength),
lightMode: false,
index: index,
indexMux: new(sync.RWMutex),
install: make(chan *Subscription),
uninstall: make(chan *Subscription),
txsCh: make(<-chan coretypes.ResultEvent),
logsCh: make(<-chan coretypes.ResultEvent),
pendingLogsCh: make(<-chan coretypes.ResultEvent),
// rmLogsCh: make(<-chan coretypes.ResultEvent),
chainCh: make(<-chan coretypes.ResultEvent),
}

go es.eventLoop()
Expand Down Expand Up @@ -293,93 +275,31 @@ func (es *EventSystem) handleChainEvent(ev coretypes.ResultEvent) {

// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
var (
err error
cancelPendingTxsSubs, cancelLogsSubs, cancelPendingLogsSubs, cancelHeaderSubs context.CancelFunc
)

// Subscribe events
es.txsSub, cancelPendingTxsSubs, err = es.SubscribePendingTxs()
if err != nil {
panic(fmt.Errorf("failed to subscribe pending txs: %w", err))
}

defer cancelPendingTxsSubs()

es.logsSub, cancelLogsSubs, err = es.SubscribeLogs(filters.FilterCriteria{})
if err != nil {
panic(fmt.Errorf("failed to subscribe logs: %w", err))
}

defer cancelLogsSubs()

es.pendingLogsSub, cancelPendingLogsSubs, err = es.subscribePendingLogs(filters.FilterCriteria{})
if err != nil {
panic(fmt.Errorf("failed to subscribe pending logs: %w", err))
}

defer cancelPendingLogsSubs()

es.chainSub, cancelHeaderSubs, err = es.SubscribeNewHeads()
if err != nil {
panic(fmt.Errorf("failed to subscribe headers: %w", err))
}

defer cancelHeaderSubs()

// Ensure all subscriptions get cleaned up
defer func() {
es.txsSub.Unsubscribe(es)
es.logsSub.Unsubscribe(es)
// es.rmLogsSub.Unsubscribe(es)
es.pendingLogsSub.Unsubscribe(es)
es.chainSub.Unsubscribe(es)
}()

for {
select {
case txEvent := <-es.txsSub.eventCh:
es.handleTxsEvent(txEvent)
case headerEv := <-es.chainSub.eventCh:
es.handleChainEvent(headerEv)
case logsEv := <-es.logsSub.eventCh:
es.handleLogs(logsEv)
// TODO: figure out how to handle removed logs
// case logsEv := <-es.rmLogsSub.eventCh:
// es.handleLogs(logsEv)
case logsEv := <-es.pendingLogsSub.eventCh:
es.handleLogs(logsEv)

case f := <-es.install:
es.indexMux.Lock()
if f.typ == filters.MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
es.index[filters.LogsSubscription][f.id] = f
es.index[filters.PendingLogsSubscription][f.id] = f
} else {
es.index[f.typ][f.id] = f
}
es.indexMux.Unlock()
close(f.installed)

case f := <-es.uninstall:
es.indexMux.Lock()
if f.typ == filters.MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(es.index[filters.LogsSubscription], f.id)
delete(es.index[filters.PendingLogsSubscription], f.id)
} else {
delete(es.index[f.typ], f.id)
}
es.indexMux.Unlock()
close(f.err)
// System stopped
case <-es.txsSub.Err():
return
case <-es.logsSub.Err():
return
// case <-es.rmLogsSub.Err():
// return
case <-es.pendingLogsSub.Err():
return
case <-es.chainSub.Err():
return
}
}
// }()
Expand Down
5 changes: 0 additions & 5 deletions app/rpc/namespaces/eth/filters/subscription.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filters

import (
"log"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -39,10 +38,6 @@ func (s *Subscription) Unsubscribe(es *EventSystem) {
}

go func() {
defer func() {
log.Println("successfully unsubscribed to event", s.event)
}()

uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
Expand Down
14 changes: 8 additions & 6 deletions app/rpc/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func EthHeaderFromTendermint(header tmtypes.Header) *ethtypes.Header {
Coinbase: common.BytesToAddress(header.ProposerAddress),
Root: common.BytesToHash(header.AppHash),
TxHash: common.BytesToHash(header.DataHash),
ReceiptHash: common.Hash{},
ReceiptHash: ethtypes.EmptyRootHash,
Difficulty: nil,
Number: big.NewInt(header.Height),
Time: uint64(header.Time.Unix()),
Expand Down Expand Up @@ -258,11 +258,13 @@ func EthHeaderWithBlockHashFromTendermint(tmHeader *tmtypes.Header) (header *Eth
}

header = &EthHeaderWithBlockHash{
ParentHash: common.BytesToHash(tmHeader.LastBlockID.Hash.Bytes()),
Coinbase: common.BytesToAddress(tmHeader.ProposerAddress),
Root: common.BytesToHash(tmHeader.AppHash),
TxHash: common.BytesToHash(tmHeader.DataHash),
Number: (*hexutil.Big)(big.NewInt(tmHeader.Height)),
ParentHash: common.BytesToHash(tmHeader.LastBlockID.Hash.Bytes()),
UncleHash: ethtypes.EmptyUncleHash,
Coinbase: common.BytesToAddress(tmHeader.ProposerAddress),
Root: common.BytesToHash(tmHeader.AppHash),
TxHash: common.BytesToHash(tmHeader.DataHash),
ReceiptHash: ethtypes.EmptyRootHash,
Number: (*hexutil.Big)(big.NewInt(tmHeader.Height)),
// difficulty is not available for DPOS
Difficulty: defaultDifficulty,
GasLimit: defaultGasLimit,
Expand Down
67 changes: 31 additions & 36 deletions app/rpc/websockets/pubsub_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"sync"

"github.com/gorilla/websocket"

"github.com/tendermint/tendermint/libs/log"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
Expand All @@ -25,7 +23,7 @@ import (
type PubSubAPI struct {
clientCtx context.CLIContext
events *rpcfilters.EventSystem
filtersMu sync.Mutex
filtersMu *sync.RWMutex
filters map[rpc.ID]*wsSubscription
logger log.Logger
}
Expand All @@ -35,12 +33,13 @@ func NewAPI(clientCtx context.CLIContext, log log.Logger) *PubSubAPI {
return &PubSubAPI{
clientCtx: clientCtx,
events: rpcfilters.NewEventSystem(clientCtx.Client),
filtersMu: new(sync.RWMutex),
filters: make(map[rpc.ID]*wsSubscription),
logger: log.With("module", "websocket-client"),
}
}

func (api *PubSubAPI) subscribe(conn *websocket.Conn, params []interface{}) (rpc.ID, error) {
func (api *PubSubAPI) subscribe(conn *wsConn, params []interface{}) (rpc.ID, error) {
method, ok := params[0].(string)
if !ok {
return "0", fmt.Errorf("invalid parameters")
Expand Down Expand Up @@ -82,7 +81,7 @@ func (api *PubSubAPI) unsubscribe(id rpc.ID) bool {
return true
}

func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
func (api *PubSubAPI) subscribeNewHeads(conn *wsConn) (rpc.ID, error) {
sub, _, err := api.events.SubscribeNewHeads()
if err != nil {
return "", fmt.Errorf("error creating block filter: %s", err.Error())
Expand Down Expand Up @@ -112,7 +111,7 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
continue
}

api.filtersMu.Lock()
api.filtersMu.RLock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Expand All @@ -131,17 +130,16 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
api.logger.Debug("successfully write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number)
}
}
api.filtersMu.Unlock()
api.filtersMu.RUnlock()

if err != nil {
api.unsubscribe(sub.ID())
}
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
if err != nil {
api.unsubscribe(sub.ID())
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
}
return
case <-unsubscribed:
api.logger.Debug("NewHeads channel is closed", "ID", sub.ID())
Expand All @@ -153,7 +151,7 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
return sub.ID(), nil
}

func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rpc.ID, error) {
func (api *PubSubAPI) subscribeLogs(conn *wsConn, extra interface{}) (rpc.ID, error) {
crit := filters.FilterCriteria{}

if extra != nil {
Expand Down Expand Up @@ -239,7 +237,7 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp
return
}

api.filtersMu.Lock()
api.filtersMu.RLock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Expand All @@ -259,18 +257,17 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp
api.logger.Debug("successfully write log", "ID", sub.ID(), "height", singleLog.BlockNumber, "txhash", singleLog.TxHash)
}
}
api.filtersMu.Unlock()
api.filtersMu.RUnlock()

if err != nil {
api.unsubscribe(sub.ID())
}
}(event)
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
if err != nil {
api.unsubscribe(sub.ID())
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
}
return
case <-unsubscribed:
api.logger.Debug("Logs channel is closed", "ID", sub.ID())
Expand Down Expand Up @@ -356,7 +353,7 @@ func isHex(str string) bool {
return true
}

func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID, error) {
func (api *PubSubAPI) subscribePendingTransactions(conn *wsConn) (rpc.ID, error) {
sub, _, err := api.events.SubscribePendingTxs()
if err != nil {
return "", fmt.Errorf("error creating block filter: %s", err.Error())
Expand All @@ -382,7 +379,7 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID
}
txHash := common.BytesToHash(data.Tx.Hash())

api.filtersMu.Lock()
api.filtersMu.RLock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Expand All @@ -401,17 +398,16 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID
api.logger.Debug("successfully write pending tx", "ID", sub.ID(), "txhash", txHash)
}
}
api.filtersMu.Unlock()
api.filtersMu.RUnlock()

if err != nil {
api.unsubscribe(sub.ID())
}
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
if err != nil {
api.unsubscribe(sub.ID())
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
}
return
case <-unsubscribed:
api.logger.Debug("PendingTransactions channel is closed", "ID", sub.ID())
Expand All @@ -423,7 +419,7 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID
return sub.ID(), nil
}

func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
func (api *PubSubAPI) subscribeSyncing(conn *wsConn) (rpc.ID, error) {
sub, _, err := api.events.SubscribeNewHeads()
if err != nil {
return "", fmt.Errorf("error creating block filter: %s", err.Error())
Expand Down Expand Up @@ -468,7 +464,7 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
}
}

api.filtersMu.Lock()
api.filtersMu.RLock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Expand All @@ -487,18 +483,17 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
api.logger.Debug("successfully write syncing status", "ID", sub.ID())
}
}
api.filtersMu.Unlock()
api.filtersMu.RUnlock()

if err != nil {
api.unsubscribe(sub.ID())
}

case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
if err != nil {
api.unsubscribe(sub.ID())
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
}
return
case <-unsubscribed:
api.logger.Debug("Syncing channel is closed", "ID", sub.ID())
Expand Down
Loading

0 comments on commit 4b67a15

Please sign in to comment.