Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to github.com/go-redis/redis #235

Closed
wants to merge 13 commits into from
235 changes: 89 additions & 146 deletions broker_redis.go

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/centrifugal/protocol"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -78,7 +77,7 @@ func NewTestRedisBrokerWithPrefix(tb testing.TB, n *Node, prefix string, useStre

func NewTestRedisBrokerClusterWithPrefix(tb testing.TB, n *Node, prefix string, useStreams bool) *RedisBroker {
redisConf := RedisShardConfig{
ClusterAddresses: []string{"localhost:7000", "localhost:7001", "localhost:7002"},
ClusterAddresses: []string{"127.0.0.1:7000", "127.0.0.1:7001", "127.0.0.1:7002"},
Password: testRedisPassword,
ReadTimeout: 100 * time.Second,
}
Expand Down Expand Up @@ -341,9 +340,8 @@ func TestRedisBrokerRecover(t *testing.T) {
}

func pubSubChannels(t *testing.T, e *RedisBroker) ([]string, error) {
conn := e.shards[0].pool.Get()
defer func() { require.NoError(t, conn.Close()) }()
return redis.Strings(conn.Do("PUBSUB", "channels", e.messagePrefix+"*"))
client := e.shards[0].client
return client.PubSubChannels(context.Background(), e.messagePrefix+"*").Result()
}

func TestRedisBrokerSubscribeUnsubscribe(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ go 1.17

require (
github.com/FZambia/eagle v0.0.2
github.com/FZambia/sentinel v1.1.0
github.com/centrifugal/protocol v0.8.11
github.com/gomodule/redigo v1.8.9
github.com/go-redis/redis/v9 v9.0.0-beta.3
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/igm/sockjs-go/v3 v3.0.2
github.com/mna/redisc v1.3.2
github.com/prometheus/client_golang v1.13.0
github.com/stretchr/testify v1.8.0
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
Expand All @@ -21,6 +19,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -32,6 +31,6 @@ require (
github.com/segmentio/asm v1.1.4 // indirect
github.com/segmentio/encoding v0.3.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
72 changes: 64 additions & 8 deletions go.sum

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions internal/util/safe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//go:build appengine

package util

func BytesToString(b []byte) string {
return string(b)
}

func StringToBytes(s string) []byte {
return []byte(s)
}
20 changes: 20 additions & 0 deletions internal/util/unsafe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//go:build !appengine

package util

import "unsafe"

// BytesToString converts byte slice to string.
func BytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// StringToBytes converts string to byte slice.
func StringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}
25 changes: 25 additions & 0 deletions internal/util/unsafe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package util

import "testing"

var tests = []string{
"abc",
"hello world",
"tests",
}

func TestBytesToString(t *testing.T) {
for _, want := range tests {
if got := BytesToString([]byte(want)); got != want {
t.Errorf("BytesToString() = %s, want %s", got, want)
}
}
}

func TestStringToBytes(t *testing.T) {
for _, want := range tests {
if got := StringToBytes(want); string(got) != want {
t.Errorf("StringToBytes() = %s, want %s", got, want)
}
}
}
3 changes: 2 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/centrifugal/centrifuge/internal/controlproto"
"github.com/centrifugal/centrifuge/internal/dissolve"
"github.com/centrifugal/centrifuge/internal/nowtime"
"github.com/centrifugal/centrifuge/internal/util"

"github.com/FZambia/eagle"
"github.com/centrifugal/protocol"
Expand Down Expand Up @@ -191,7 +192,7 @@ func index(s string, numBuckets int) int {
return 0
}
hash := fnv.New64a()
_, _ = hash.Write([]byte(s))
_, _ = hash.Write(util.StringToBytes(s))
return int(hash.Sum64() % uint64(numBuckets))
}

Expand Down
39 changes: 25 additions & 14 deletions presence_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"time"

"github.com/centrifugal/protocol"
"github.com/gomodule/redigo/redis"
"github.com/go-redis/redis/v9"

"github.com/centrifugal/centrifuge/internal/util"
)

var _ PresenceManager = (*RedisPresenceManager)(nil)
Expand Down Expand Up @@ -112,9 +114,9 @@ func NewRedisPresenceManager(n *Node, config RedisPresenceManagerConfig) (*Redis
shards: config.Shards,
config: config,
sharding: len(config.Shards) > 1,
addPresenceScript: redis.NewScript(2, addPresenceSource),
remPresenceScript: redis.NewScript(2, remPresenceSource),
presenceScript: redis.NewScript(2, presenceSource),
addPresenceScript: redis.NewScript(addPresenceSource),
remPresenceScript: redis.NewScript(remPresenceSource),
presenceScript: redis.NewScript(presenceSource),
closeCh: make(chan struct{}),
}

Expand Down Expand Up @@ -157,8 +159,12 @@ func (m *RedisPresenceManager) addPresence(s *RedisShard, ch string, uid string,
expireAt := time.Now().Unix() + int64(expire)
hashKey := m.presenceHashKey(s, ch)
setKey := m.presenceSetKey(s, ch)
dr := s.newDataRequest("", m.addPresenceScript, setKey, []interface{}{setKey, hashKey, expire, expireAt, uid, infoBytes})
dr := s.newDataRequest(m.addPresenceScript, []string{string(setKey), string(hashKey)}, []interface{}{expire, expireAt, uid, infoBytes})
resp := s.getDataResponse(dr, m.closeCh)
// redis.Nil is expected, since addPresenceScript doesn't have a return value.
if resp.err == redis.Nil {
return nil
}
return resp.err
}

Expand All @@ -170,8 +176,12 @@ func (m *RedisPresenceManager) RemovePresence(ch string, uid string) error {
func (m *RedisPresenceManager) removePresence(s *RedisShard, ch string, uid string) error {
hashKey := m.presenceHashKey(s, ch)
setKey := m.presenceSetKey(s, ch)
dr := s.newDataRequest("", m.remPresenceScript, setKey, []interface{}{setKey, hashKey, uid})
dr := s.newDataRequest(m.remPresenceScript, []string{string(setKey), string(hashKey)}, []interface{}{uid})
resp := s.getDataResponse(dr, m.closeCh)
// redis.Nil is expected, since remPresenceScript doesn't have a return value.
if resp.err == redis.Nil {
return nil
}
return resp.err
}

Expand All @@ -185,16 +195,17 @@ func (m *RedisPresenceManager) presence(s *RedisShard, ch string) (map[string]*C
hashKey := m.presenceHashKey(s, ch)
setKey := m.presenceSetKey(s, ch)
now := int(time.Now().Unix())
dr := s.newDataRequest("", m.presenceScript, setKey, []interface{}{setKey, hashKey, now})
dr := s.newDataRequest(m.presenceScript, []string{string(setKey), string(hashKey)}, []interface{}{now})
resp := s.getDataResponse(dr, m.closeCh)
if resp.err != nil {
return nil, resp.err
}
return mapStringClientInfo(resp.reply, nil)
return mapStringClientInfo(resp.reply)
}

func mapStringClientInfo(result interface{}, err error) (map[string]*ClientInfo, error) {
values, err := redis.Values(result, err)
func mapStringClientInfo(result interface{}) (map[string]*ClientInfo, error) {
cmd := redis.NewCmdResult(result, nil)
values, err := cmd.Slice()
if err != nil {
return nil, err
}
Expand All @@ -203,17 +214,17 @@ func mapStringClientInfo(result interface{}, err error) (map[string]*ClientInfo,
}
m := make(map[string]*ClientInfo, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, okKey := values[i].([]byte)
value, okValue := values[i+1].([]byte)
key, okKey := values[i].(string)
value, okValue := values[i+1].(string)
if !okKey || !okValue {
return nil, errors.New("scanMap key not a bulk string value")
}
var f protocol.ClientInfo
err = f.UnmarshalVT(value)
err = f.UnmarshalVT(util.StringToBytes(value))
if err != nil {
return nil, errors.New("can not unmarshal value to ClientInfo")
}
m[string(key)] = infoFromProto(&f)
m[key] = infoFromProto(&f)
}
return m, nil
}
Expand Down
8 changes: 4 additions & 4 deletions presence_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewTestRedisPresenceManagerWithPrefix(tb testing.TB, n *Node, prefix string

func NewTestRedisPresenceManagerClusterWithPrefix(tb testing.TB, n *Node, prefix string) *RedisPresenceManager {
redisConf := RedisShardConfig{
ClusterAddresses: []string{"localhost:7000", "localhost:7001", "localhost:7002"},
ClusterAddresses: []string{"127.0.0.1:7000", "127.0.0.1:7001", "127.0.0.1:7002"},
Password: testRedisPassword,

ReadTimeout: 100 * time.Second,
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestRedisPresenceManager(t *testing.T) {
}

func BenchmarkRedisAddPresence_1Ch(b *testing.B) {
for _, tt := range benchRedisTests {
for _, tt := range redisPresenceTests {
b.Run(tt.Name, func(b *testing.B) {
node := testNode(b)
e := newTestRedisPresenceManager(b, node, tt.UseCluster)
Expand All @@ -120,7 +120,7 @@ func BenchmarkRedisAddPresence_1Ch(b *testing.B) {
}

func BenchmarkRedisPresence_1Ch(b *testing.B) {
for _, tt := range benchRedisTests {
for _, tt := range redisPresenceTests {
b.Run(tt.Name, func(b *testing.B) {
node := testNode(b)
e := newTestRedisPresenceManager(b, node, tt.UseCluster)
Expand All @@ -141,7 +141,7 @@ func BenchmarkRedisPresence_1Ch(b *testing.B) {
}

func BenchmarkRedisPresence_ManyCh(b *testing.B) {
for _, tt := range benchRedisTests {
for _, tt := range redisPresenceTests {
b.Run(tt.Name, func(b *testing.B) {
node := testNode(b)
e := newTestRedisPresenceManager(b, node, tt.UseCluster)
Expand Down
Loading