-
Notifications
You must be signed in to change notification settings - Fork 23
/
raft_server.go
134 lines (119 loc) · 3.08 KB
/
raft_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package basalt
import (
"bytes"
"encoding/gob"
"log"
"strings"
"github.com/rpcxio/etcd/etcdserver/api/snap"
"github.com/rpcxio/etcd/raft/raftpb"
)
type ConfChange interface {
AddNode(id uint64, addr []byte) error
RemoveNode(id uint64) error
}
type RaftServer struct {
proposeC chan<- string
confChangeC chan raftpb.ConfChange
bmServer *Server
snapshotter *snap.Snapshotter
}
type operaton struct {
OP OP
Val string
}
func NewRaftServer(bmServer *Server, snapshotter *snap.Snapshotter, confChangeC chan raftpb.ConfChange, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *RaftServer {
s := &RaftServer{proposeC: proposeC, confChangeC: confChangeC, bmServer: bmServer, snapshotter: snapshotter}
bmServer.bitmaps.writeCallback = s.Propose
s.readCommits(commitC, errorC)
go s.readCommits(commitC, errorC)
return s
}
func (s *RaftServer) Propose(op OP, value string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(operaton{op, value}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}
func (s *RaftServer) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
if data == nil {
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return
}
if err != nil {
log.Panic(err)
}
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
continue
}
var op operaton
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&op); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.processOP(op)
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
func (s *RaftServer) processOP(op operaton) {
switch op.OP {
case BmOpAdd:
items := strings.SplitN(op.Val, ",", 2)
if len(items) != 2 {
log.Printf("wrong request: %+v", op)
return
}
s.bmServer.add(items[0], items[1], false)
case BmOpAddMany:
items := strings.SplitN(op.Val, ",", 2)
if len(items) != 2 {
log.Printf("wrong request: %+v", op)
return
}
s.bmServer.addMany(items[0], items[1], false)
case BmOpRemove:
items := strings.SplitN(op.Val, ",", 2)
if len(items) != 2 {
log.Printf("wrong request: %+v", op)
return
}
s.bmServer.remove(items[0], items[1], false)
case BmOpDrop:
s.bmServer.drop(op.Val, false)
case BmOpClear:
s.bmServer.clear(op.Val, false)
}
}
func (s *RaftServer) GetSnapshot() ([]byte, error) {
var buf bytes.Buffer
err := s.bmServer.bitmaps.Save(&buf)
return buf.Bytes(), err
}
func (s *RaftServer) recoverFromSnapshot(snapshot []byte) error {
var buf = bytes.NewBuffer(snapshot)
return s.bmServer.bitmaps.Read(buf)
}
func (s *RaftServer) AddNode(id uint64, addr []byte) error {
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: id,
Context: addr,
}
s.confChangeC <- cc
return nil
}
func (s *RaftServer) RemoveNode(id uint64) error {
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
s.confChangeC <- cc
return nil
}