Skip to content

Commit

Permalink
Added restfull apis
Browse files Browse the repository at this point in the history
Added restfull apis of mqtt and cluster.
  • Loading branch information
wind-c committed Aug 8, 2024
1 parent dc99116 commit f05f581
Show file tree
Hide file tree
Showing 16 changed files with 531 additions and 119 deletions.
12 changes: 12 additions & 0 deletions cluster/rest/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package rest

type result struct {
Url string `json:"url"`
Data string `json:"data"`
Err string `json:"err"`
}

type node struct {
Name string `json:"name"`
Addr string `json:"addr"`
}
85 changes: 85 additions & 0 deletions cluster/rest/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package rest

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"sync"
"time"
)

const (
HttpGet = "GET"
HttpPost = "POST"
HttpDelete = "DELETE"
Timeout = 3 * time.Second
)

func fetch(ctx context.Context, method string, url string, data []byte) result {
rs := result{Url: url}
var req *http.Request
var err error
var body io.Reader
if data != nil {
body = bytes.NewBuffer(data)
}
if ctx != nil {
req, err = http.NewRequestWithContext(ctx, method, url, body)
} else {
req, err = http.NewRequest(method, url, body)
}
if err != nil {
rs.Err = err.Error()
return rs
}

req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
if ctxErr := ctx.Err(); errors.Is(ctxErr, context.DeadlineExceeded) {
rs.Err = "Request timeout"
} else {
rs.Err = err.Error()
}
return rs
}
defer resp.Body.Close()
if data, err := io.ReadAll(resp.Body); err != nil {
rs.Err = err.Error()
} else {
if resp.StatusCode == http.StatusOK {
rs.Data = string(data)
} else {
rs.Err = string(data)
}
}
return rs
}

func fetchM(method string, urls []string, body []byte) []result {
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
defer cancel()

var wg sync.WaitGroup
ch := make(chan result, len(urls))

wg.Add(len(urls))
for _, url := range urls {
go func(url string) {
defer wg.Done()
ch <- fetch(ctx, method, url, body)
}(url)
}

wg.Wait()
close(ch)
results := make([]result, len(urls))
index := 0
for rs := range ch {
results[index] = rs
index++
}
return results
}
177 changes: 177 additions & 0 deletions cluster/rest/rest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package rest

import (
"encoding/json"
"fmt"
cs "github.com/wind-c/comqtt/v2/cluster"
"github.com/wind-c/comqtt/v2/cluster/discovery"
rt "github.com/wind-c/comqtt/v2/mqtt/rest"
"net/http"
"net/netip"
"strings"
)

type rest struct {
agent *cs.Agent
}

func New(agent *cs.Agent) *rest {
return &rest{
agent: agent,
}
}

func (s *rest) GenHandlers() map[string]rt.Handler {
return map[string]rt.Handler{
"GET /api/v1/node/config": s.viewConfig,
"DELETE /api/v1/node/{name}": s.leave,
"GET /api/v1/cluster/nodes": s.getNodes,
"POST /api/v1/cluster/nodes": s.join,
"POST /api/v1/cluster/peers": s.addRaftPeer,
"DELETE /api/v1/cluster/peers/{name}": s.removeRaftPeer,
"GET /api/v1/cluster/stat/online": s.getOnlineCount,
"GET /api/v1/cluster/clients/{id}": s.getClient,
"POST /api/v1/cluster/blacklist/{id}": s.kickClient,
"DELETE /api/v1/cluster/blacklist/{id}": s.blanchClient,
}
}

// viewConfig return the configuration parameters of this node
// GET api/v1/node/config
func (s *rest) viewConfig(w http.ResponseWriter, r *http.Request) {
rt.Ok(w, s.agent.Config)
}

// getMembers return all nodes in the cluster
// GET api/v1/cluster/nodes
func (s *rest) getNodes(w http.ResponseWriter, r *http.Request) {
rt.Ok(w, s.agent.GetMemberList())
}

// join add a node to the cluster
// POST api/v1/cluster/nodes
func (s *rest) join(w http.ResponseWriter, r *http.Request) {
var n node
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
rt.Error(w, http.StatusBadRequest, err.Error())
return
}
n.Name = strings.TrimSpace(n.Name)
n.Addr = strings.TrimSpace(n.Addr)
if n.Name == "" || n.Addr == "" {
rt.Error(w, http.StatusBadRequest, "name and addr cannot be empty")
return
}
if _, err := netip.ParseAddrPort(n.Addr); err != nil {
rt.Error(w, http.StatusBadRequest, "invalid address")
return
}

if err := s.agent.Join(n.Name, n.Addr); err != nil {
rt.Error(w, http.StatusInternalServerError, err.Error())
} else {
rt.Ok(w, n)
}
}

// leave local node gracefully exits the cluster
// DELETE api/v1/node/{name}
func (s *rest) leave(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
localName := s.agent.GetLocalName()
if name != localName {
rt.Error(w, http.StatusBadRequest, fmt.Sprintf("cannot remove not local node %s", localName))
return
}

if err := s.agent.Leave(); err != nil {
rt.Error(w, http.StatusInternalServerError, err.Error())
return
} else {
rt.Ok(w, name)
}
}

// addRaftPeer add peer to raft cluster
// POST api/v1/cluster/peers
func (s *rest) addRaftPeer(w http.ResponseWriter, r *http.Request) {
var p node
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
rt.Error(w, http.StatusBadRequest, err.Error())
return
}
p.Name = strings.TrimSpace(p.Name)
p.Addr = strings.TrimSpace(p.Addr)
if p.Name == "" || p.Addr == "" {
rt.Error(w, http.StatusBadRequest, "name and addr cannot be empty")
return
}
if _, err := netip.ParseAddrPort(p.Addr); err != nil {
rt.Error(w, http.StatusBadRequest, "invalid address")
return
}

s.agent.AddRaftPeer(p.Name, p.Addr)
rt.Ok(w, p)
}

// removeRaftPeer remove peer from raft cluster
// DELETE api/v1/cluster/peers/{name}
func (s *rest) removeRaftPeer(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
if strings.TrimSpace(name) == "" {
rt.Error(w, http.StatusBadRequest, "name cannot be empty")
return
}

s.agent.RemoveRaftPeer(name)
rt.Ok(w, name)
}

// getOnlineCount return online number from all nodes in the cluster
// GET api/v1/cluster/stat/online
func (s *rest) getOnlineCount(w http.ResponseWriter, r *http.Request) {
path := rt.MqttGetOnlinePath
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// getClient return a client information, search from all nodes in the cluster
// GET api/v1/cluster/clients/{id}
func (s *rest) getClient(w http.ResponseWriter, r *http.Request) {
cid := r.PathValue("id")
path := strings.Replace(rt.MqttGetClientPath, "{id}", cid, 1)
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// kickClient add it to the blacklist on all nodes in the cluster
// POST api/v1/cluster/blacklist/{id}
func (s *rest) kickClient(w http.ResponseWriter, r *http.Request) {
cid := r.PathValue("id")
path := strings.Replace(rt.MqttAddBlacklistPath, "{id}", cid, 1)
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpPost, urls, nil)
rt.Ok(w, rs)
}

// blanchClient remove from the blacklist on all nodes in the cluster
// DELETE api/v1/cluster/blacklist/{id}
func (s *rest) blanchClient(w http.ResponseWriter, r *http.Request) {
cid := r.PathValue("id")
path := strings.Replace(rt.MqttDelBlacklistPath, "{id}", cid, 1)
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpDelete, urls, nil)
rt.Ok(w, rs)
}

// genUrls generate urls
func genUrls(ms []discovery.Member, path string) []string {
urls := make([]string, len(ms))
for i, m := range ms {
urls[i] = "http://" + m.Addr + ":8080" + path
}
return urls
}
90 changes: 7 additions & 83 deletions cmd/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
csRt "github.com/wind-c/comqtt/v2/cluster/rest"
"maps"
"net"
"net/http"
_ "net/http/pprof"
Expand All @@ -27,6 +27,7 @@ import (
mqtt "github.com/wind-c/comqtt/v2/mqtt"
"github.com/wind-c/comqtt/v2/mqtt/hooks/auth"
"github.com/wind-c/comqtt/v2/mqtt/listeners"
mqttRt "github.com/wind-c/comqtt/v2/mqtt/rest"
"github.com/wind-c/comqtt/v2/plugin"
hauth "github.com/wind-c/comqtt/v2/plugin/auth/http"
mauth "github.com/wind-c/comqtt/v2/plugin/auth/mysql"
Expand Down Expand Up @@ -139,11 +140,10 @@ func realMain(ctx context.Context) error {
onError(server.AddListener(ws), "add websocket listener")

// add http listener
handles := make(map[string]func(http.ResponseWriter, *http.Request), 1)
handles["/cluster/conf"] = ConfHandler
handles["/cluster/ms"] = MsHandler
handles["/cluster/peer/"] = PeerHandler //for test peer join and leave
http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, server.Info, handles)
csHls := csRt.New(agent).GenHandlers()
mqHls := mqttRt.New(server).GenHandlers()
maps.Copy(csHls, mqHls)
http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, csHls)
onError(server.AddListener(http), "add http listener")

errCh := make(chan error, 1)
Expand Down Expand Up @@ -248,79 +248,3 @@ func onError(err error, msg string) {
os.Exit(1)
}
}

func ConfHandler(w http.ResponseWriter, req *http.Request) {
body, err := json.MarshalIndent(agent.Config, "", "\t")
if err != nil {
io.WriteString(w, err.Error())
return
}

w.Write(body)
}

func MsHandler(w http.ResponseWriter, r *http.Request) {
body, err := json.MarshalIndent(agent.GetMemberList(), "", "\t")
if err != nil {
io.WriteString(w, err.Error())
return
}

w.Write(body)
}

func PeerHandler(w http.ResponseWriter, r *http.Request) {
key := strings.SplitN(r.RequestURI, "/", 4)[3]
defer r.Body.Close()
switch r.Method {
case http.MethodPut:
//val, err := io.ReadAll(r.Body)
//if err != nil {
// //logger.Errorf("[http] failed to read on PUT: %v", err)
// http.Error(w, "Failed to PUT", http.StatusBadRequest)
// return
//}

//agent.Propose(key, string(val))
w.WriteHeader(http.StatusNoContent)
case http.MethodGet:
if val := agent.GetValue(key); len(val) > 0 {
w.Write([]byte(strings.Join(val, ",")))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case http.MethodPost:
addr, err := io.ReadAll(r.Body)
if err != nil {
//logger.Errorf("[http] failed to read on POST: %v", err)
http.Error(w, "Failed to POST", http.StatusBadRequest)
return
}

nodeId, err := strconv.ParseUint(key, 0, 64)
if err != nil {
//logger.Errorf("[http] failed to convert ID for conf change: %v", err)
http.Error(w, "Failed to POST", http.StatusBadRequest)
return
}

agent.AddRaftPeer(fmt.Sprint(nodeId), string(addr))
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
nodeId, err := strconv.ParseUint(key, 0, 64)
if err != nil {
//logger.Errorf("[http] failed to convert ID for conf change: %v", err)
http.Error(w, "Failed to POST", http.StatusBadRequest)
return
}

agent.RemoveRaftPeer(fmt.Sprint(nodeId))
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Add("Allow", http.MethodPut)
w.Header().Add("Allow", http.MethodGet)
w.Header().Add("Allow", http.MethodPost)
w.Header().Add("Allow", http.MethodDelete)
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
Loading

0 comments on commit f05f581

Please sign in to comment.