Skip to content

Commit

Permalink
Merge pull request #73 from werbenhu/main
Browse files Browse the repository at this point in the history
Replace herolog with slog and add the inline subscription feature
  • Loading branch information
wind-c authored Oct 15, 2023
2 parents 1c0b22a + 2796d31 commit 68d0489
Show file tree
Hide file tree
Showing 105 changed files with 3,093 additions and 1,881 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/runtests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.21'

- name: Build
run: go build ./...
Expand Down
60 changes: 24 additions & 36 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ package cluster
import (
"bytes"
"context"
"net"
"path"
"path/filepath"
"strconv"

"github.com/panjf2000/ants/v2"
"github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/discovery/mlist"
"github.com/wind-c/comqtt/v2/cluster/discovery/serf"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/cluster/message"
"github.com/wind-c/comqtt/v2/cluster/raft"
"github.com/wind-c/comqtt/v2/cluster/raft/etcd"
Expand All @@ -21,10 +26,6 @@ import (
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"github.com/wind-c/comqtt/v2/mqtt/packets"
"net"
"path"
"path/filepath"
"strconv"
)

const (
Expand Down Expand Up @@ -119,7 +120,7 @@ func (a *Agent) Start() (err error) {
if err := a.grpcService.StartRpcServer(); err != nil {
return err
}
zero.Info().Str("addr", net.JoinHostPort(a.Config.BindAddr, strconv.Itoa(a.Config.GrpcPort))).Msg("grpc listen at")
log.Info("grpc listen at", "addr", net.JoinHostPort(a.Config.BindAddr, strconv.Itoa(a.Config.GrpcPort)))
}

// init goroutine pool
Expand Down Expand Up @@ -176,17 +177,16 @@ func (a *Agent) Stop() {
}

// stop raft
zero.Info().Msg("stopping raft...")
log.Info("stopping raft...")
a.raftPeer.Stop()
zero.Info().Msg("raft stopped")
log.Info("raft stopped")

// stop node
zero.Info().Msg("stopping node...")
log.Info("stopping node...")
a.membership.Stop()
a.grpcService.StopRpcServer()
zero.Info().Msg("grpc server stopped")
zero.Info().Msg("node stopped")
zero.Close()
log.Info("grpc server stopped")
log.Info("node stopped")
}

func (a *Agent) BindMqttServer(server *mqtt.Server) {
Expand Down Expand Up @@ -264,7 +264,7 @@ func (a *Agent) raftApplyListener() {
} else {
continue
}
zero.Info().Str("from", msg.NodeID).Str("filter", filter).Uint8("type", msg.Type).Msg("apply listening")
log.Info("apply listening", "from", msg.NodeID, "filter", filter, "type", msg.Type)
case <-a.ctx.Done():
return
}
Expand Down Expand Up @@ -306,10 +306,10 @@ func (a *Agent) getPeersFile() string {

func (a *Agent) genNodesFile() {
if err := discovery.GenNodesFile(a.getNodesFile(), a.membership.Members()); err != nil {
zero.Error().Err(err).Msg("gen nodes file")
log.Error("gen nodes file", "error", err)
}
if err := a.raftPeer.GenPeersFile(a.getPeersFile()); err != nil {
zero.Error().Err(err).Msg("gen peers file")
log.Error("gen peers file", "error", err)
}
}

Expand Down Expand Up @@ -461,39 +461,27 @@ func (a *Agent) processOutboundPacket(pk *packets.Packet) {
}

func OnJoinLog(nodeId, addr, prompt string, err error) {
logEvent := zero.Info()
if err != nil {
logEvent.Err(err)
}
logEvent.Str("node", nodeId).Str("addr", addr).Msg(prompt)
log.Info(prompt, "error", err, "addr", addr)
}

func OnApplyLog(leaderId, nodeId string, tp byte, filter []byte, prompt string, err error) {
logEvent := zero.Info()
if err != nil {
logEvent.Err(err)
}
logEvent.Str("leader", leaderId).Str("from", nodeId).Uint8("type", tp).Bytes("filter", filter).Msg(prompt)
log.Info(prompt, "error", err, "leader", leaderId, "from", nodeId, "type", tp, "filter", filter)
}

func OnPublishPacketLog(direction byte, nodeId, cid, topic string, pid uint16) {
logEvent := zero.Info()
if direction == DirectionInbound {
logEvent.Str("d", "inbound").Str("from", nodeId)
log.Info("publish message", "d", "inbound", "from", nodeId, "cid", cid, "pid", pid, "topic", topic)
} else {
logEvent.Str("d", "outbound").Str("to", nodeId)
log.Info("publish message", "d", "outbound", "to", nodeId, "cid", cid, "pid", pid, "topic", topic)
}
logEvent.Str("cid", cid).Uint16("pid", pid).Str("topic", topic).Msg("publish message")
}

func OnConnectPacketLog(direction byte, node, clientId string) {
logEvent := zero.Info()
if direction == DirectionInbound {
logEvent.Str("d", "inbound").Str("from", node)
log.Info("connection notification", "d", "inbound", "from", node, "cid", clientId)
} else {
logEvent.Str("d", "outbound").Str("to", node)
log.Info("connection notification", "d", "outbound", "to", node, "cid", clientId)
}
logEvent.Str("cid", clientId).Msg("connection notification")
}

func (a *Agent) Join(nodeName, addr string) error {
Expand All @@ -513,15 +501,15 @@ func (a *Agent) Leave() error {

func (a *Agent) AddRaftPeer(id, addr string) {
a.raftPeer.Join(id, addr)
zero.Info().Str("nid", id).Str("addr", addr).Msg("add peer")
log.Info("add peer", "nid", id, "addr", addr)
}

func (a *Agent) RemoveRaftPeer(id string) {
a.raftPeer.Leave(id)
zero.Info().Str("nid", id).Msg("remove peer")
log.Info("remove peer", "nid", id)
}

func (a *Agent) GetValue(key string) []string {
zero.Info().Str("key", key).Msg("get value")
log.Info("get value", "key", key)
return a.raftPeer.Lookup(key)
}
9 changes: 5 additions & 4 deletions cluster/discovery/mlist/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package mlist

import (
"encoding/json"
"github.com/hashicorp/memberlist"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
mqtt "github.com/wind-c/comqtt/v2/mqtt"
"sync"
"time"

"github.com/hashicorp/memberlist"
"github.com/wind-c/comqtt/v2/cluster/log"
mqtt "github.com/wind-c/comqtt/v2/mqtt"
)

// Maximum number of messages to be held in the queue.
Expand Down Expand Up @@ -126,7 +127,7 @@ func (d *Delegate) handleQueueDepth() {
case <-time.After(15 * time.Minute):
n := d.Broadcasts.NumQueued()
if n > maxQueueSize {
zero.Info().Int("current", n).Int("limit", maxQueueSize).Msg("delete messages")
log.Info("delete messages", "current", n, "limit", maxQueueSize)
d.Broadcasts.Prune(maxQueueSize)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/discovery/mlist/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package mlist
import (
"github.com/hashicorp/memberlist"
"github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
)

//type Event struct {
Expand Down Expand Up @@ -50,5 +50,5 @@ func (n *NodeEvents) NotifyUpdate(node *memberlist.Node) {
}

func onLog(node *memberlist.Node, prompt string) {
zero.Info().Str("node", node.Name).Str("addr", node.Addr.String()).Msg(prompt)
log.Info(prompt, "node", node.Name, "addr", node.Addr.String())
}
15 changes: 8 additions & 7 deletions cluster/discovery/mlist/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package mlist

import (
"net"
"time"

"github.com/hashicorp/memberlist"
mb "github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"net"
"time"
)

type Membership struct {
Expand All @@ -25,7 +26,7 @@ type Membership struct {

func wrapOptions(conf *config.Cluster) *memberlist.Config {
opts := make([]Option, 3)
opts[0] = WithLogOutput(zero.Logger(), LogLevelInfo) //Used to filter memberlist logs
opts[0] = WithLogOutput(log.Writer(), LogLevelInfo) //Used to filter memberlist logs
opts[1] = WithBindPort(conf.BindPort)
opts[2] = WithHandoffQueueDepth(conf.QueueDepth)
if conf.NodeName != "" {
Expand Down Expand Up @@ -62,7 +63,7 @@ func (m *Membership) Setup() error {
return err
}
}
zero.Info().Str("addr", m.LocalAddr()).Int("port", m.config.BindPort).Msg("local member")
log.Info("local member", "addr", m.LocalAddr(), "port", m.config.BindPort)

return nil
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (m *Membership) SendToOthers(msg []byte) {
continue // skip self
}
if err := m.send(node, msg); err != nil {
zero.Error().Err(err).Str("from", m.config.NodeName).Str("to", node.Name).Msg("send to others")
log.Error("send to others", "error", err, "from", m.config.NodeName, "to", node.Name)
}
}
}
Expand All @@ -163,7 +164,7 @@ func (m *Membership) SendToNode(nodeName string, msg []byte) error {
for _, node := range m.aliveMembers() {
if node.Name == nodeName {
if err := m.send(node, msg); err != nil {
zero.Error().Err(err).Str("from", m.config.NodeName).Str("to", nodeName).Msg("send to node")
log.Error("send to others", "error", err, "from", m.config.NodeName, "to", nodeName)
return err
}
}
Expand Down
28 changes: 10 additions & 18 deletions cluster/discovery/serf/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
package serf

import (
"strconv"

"github.com/hashicorp/logutils"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
mb "github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"io"
"os"
"strconv"
)

const (
Expand Down Expand Up @@ -42,16 +41,11 @@ func wrapOptions(conf *config.Cluster, ech chan serf.Event) *serf.Config {
if conf.QueueDepth != 0 {
config.MaxQueueDepth = conf.QueueDepth
}
var logger io.Writer
if zero.Logger() != nil {
logger = zero.Logger()
} else {
logger = os.Stderr
}

filter := &logutils.LevelFilter{
Levels: []logutils.LogLevel{LogLevelDebug, LogLevelWarn, LogLevelError, LogLevelInfo},
MinLevel: logutils.LogLevel(LogLevelError),
Writer: logger,
Writer: log.Writer(),
}
config.MemberlistConfig.LogOutput = filter
config.LogOutput = filter
Expand Down Expand Up @@ -93,8 +87,7 @@ func (m *Membership) Setup() (err error) {
}
}
}
zero.Info().Str("addr", m.LocalAddr()).Int("port", m.config.BindPort).Msg("local member")

log.Info("local member", "addr", m.LocalAddr(), "port", m.config.BindPort)
return
}

Expand Down Expand Up @@ -125,11 +118,11 @@ func (m *Membership) Stat() map[string]int64 {
func (m *Membership) Stop() {
err := m.serf.Leave()
if err != nil {
zero.Error().Err(err).Msg("serf leave")
log.Error("serf leave", "error", err)
}
err = m.serf.Shutdown()
if err != nil {
zero.Error().Err(err).Msg("serf shutdown")
log.Error("serf shutdown", "error", err)
}
// this shuts down the event loop, note that this can't be called multiple times
// if we need to do so, we could use a bool, sync.Once or recover from the panic
Expand Down Expand Up @@ -189,7 +182,6 @@ func (m *Membership) eventLoop() {
continue
}
m.msgCh <- ue.Payload
//zero.Info().Str("name", ue.Name).Msg("serf message")
case serf.EventQuery:
q := e.(*serf.Query)
if q.SourceNode() == m.config.NodeName {
Expand Down Expand Up @@ -217,7 +209,7 @@ func (m *Membership) SendToNode(nodeName string, msg []byte) error {
qp := m.serf.DefaultQueryParams()
qp.FilterNodes = m.otherNames(nodeName)
if _, err := m.serf.Query(m.config.NodeName, msg, qp); err != nil {
zero.Error().Err(err).Str("from", m.config.NodeName).Str("to", nodeName).Msg("send to node")
log.Error("send to node", "error", err, "from", m.config.NodeName, "to", nodeName)
return err
}

Expand All @@ -229,7 +221,7 @@ func (m *Membership) Broadcast(msg []byte) {
}

func onLog(node *serf.Member, prompt string) {
zero.Info().Str("node", node.Name).Str("addr", node.Addr.String()).Msg(prompt)
log.Info(prompt, "node", node.Name, "addr", node.Addr.String())
}

func (m *Membership) isLocal(member serf.Member) bool {
Expand Down
Loading

0 comments on commit 68d0489

Please sign in to comment.