diff --git a/cluster/agent_test.go b/cluster/agent_test.go index eddb9bc..847df72 100644 --- a/cluster/agent_test.go +++ b/cluster/agent_test.go @@ -134,6 +134,128 @@ func TestCluster_Hashicorp_Memberlist(t *testing.T) { testCluster(t, conf1, conf2, conf3) } +func TestCluster_Etcd_Serf(t *testing.T) { + bindPort1, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node1") + raftPort1, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node1 Raft") + + bindPort2, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node2") + raftPort2, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node2 Raft") + + bindPort3, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node3") + raftPort3, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node3 Raft") + + members := []string{ + "127.0.0.1:" + strconv.Itoa(bindPort1), + "127.0.0.1:" + strconv.Itoa(bindPort2), + "127.0.0.1:" + strconv.Itoa(bindPort3), + } + + conf1 := &config.Cluster{ + NodeName: "1", + RaftImpl: config.RaftImplEtcd, + BindAddr: "127.0.0.1", + BindPort: bindPort1, + RaftPort: raftPort1, + RaftBootstrap: true, + RaftDir: t.TempDir(), + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWaySerf, + NodesFileDir: t.TempDir(), + } + conf2 := &config.Cluster{ + NodeName: "2", + RaftImpl: config.RaftImplEtcd, + BindAddr: "127.0.0.1", + BindPort: bindPort2, + RaftPort: raftPort2, + RaftBootstrap: false, + RaftDir: t.TempDir(), + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWaySerf, + NodesFileDir: t.TempDir(), + } + conf3 := &config.Cluster{ + NodeName: "3", + RaftImpl: config.RaftImplEtcd, + BindAddr: "127.0.0.1", + BindPort: bindPort3, + RaftPort: raftPort3, + RaftBootstrap: false, + RaftDir: t.TempDir(), + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWaySerf, + NodesFileDir: t.TempDir(), + } + testCluster(t, conf1, conf2, conf3) +} + +func TestCluster_Etcd_Memberlist(t *testing.T) { + bindPort1, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node1") + + bindPort2, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node2") + + bindPort3, err := utils.GetFreePort() + require.NoError(t, err, "Failed to get free port for node3") + + members := []string{ + "127.0.0.1:" + strconv.Itoa(bindPort1), + "127.0.0.1:" + strconv.Itoa(bindPort2), + "127.0.0.1:" + strconv.Itoa(bindPort3), + } + + conf1 := &config.Cluster{ + NodeName: "1", + RaftImpl: config.RaftImplEtcd, + BindAddr: "127.0.0.1", + BindPort: bindPort1, + RaftPort: mlist.GetRaftPortFromBindPort(bindPort1), + RaftBootstrap: true, + RaftDir: t.TempDir(), + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWayMemberlist, + NodesFileDir: t.TempDir(), + } + conf2 := &config.Cluster{ + NodeName: "2", + RaftImpl: config.RaftImplEtcd, + BindAddr: "127.0.0.1", + BindPort: bindPort2, + RaftPort: mlist.GetRaftPortFromBindPort(bindPort2), + RaftBootstrap: false, + RaftDir: t.TempDir(), + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWayMemberlist, + NodesFileDir: t.TempDir(), + } + conf3 := &config.Cluster{ + NodeName: "3", + RaftImpl: config.RaftImplEtcd, + BindAddr: "127.0.0.1", + BindPort: bindPort3, + RaftPort: mlist.GetRaftPortFromBindPort(bindPort3), + RaftBootstrap: false, + RaftDir: t.TempDir(), + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWayMemberlist, + NodesFileDir: t.TempDir(), + } + testCluster(t, conf1, conf2, conf3) +} + func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, conf3 *config.Cluster) { log.Init(log.DefaultOptions()) @@ -141,11 +263,15 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con err := agent1.Start() require.NoError(t, err, "Agent start failed for node: %s", conf1.NodeName) + time.Sleep(1 * time.Second) + agent2 := NewAgent(conf2) err = agent2.Start() defer agent2.Stop() require.NoError(t, err, "Agent start failed for node: %s", conf2.NodeName) + time.Sleep(1 * time.Second) + agent3 := NewAgent(conf3) err = agent3.Start() defer agent3.Stop() @@ -157,9 +283,9 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con _, leader2 := agent2.raftPeer.GetLeader() _, leader3 := agent3.raftPeer.GetLeader() - require.Equal(t, leader1, "node1") - require.Equal(t, leader2, "node1") - require.Equal(t, leader3, "node1") + require.Equal(t, leader1, conf1.NodeName) + require.Equal(t, leader2, conf1.NodeName) + require.Equal(t, leader3, conf1.NodeName) members1 := agent1.GetMemberList() members2 := agent2.GetMemberList() @@ -177,13 +303,14 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con _, newLeader3 := agent3.raftPeer.GetLeader() // Check that either agent2 or agent3 becomes the new leader - if newLeader2 == "node2" || newLeader2 == "node3" { + if newLeader2 == conf2.NodeName || newLeader3 == conf3.NodeName { require.Equal(t, newLeader2, newLeader3, "Leaders should be the same for agent2 and agent3") } else { require.Fail(t, "New leader should be either node2 or node3") } // Restart agent1 and verify it is a follower + t.Log("Restarting agent1") restartedAgent1 := NewAgent(conf1) err = restartedAgent1.Start() require.NoError(t, err, "Agent restart failed for node: %s", conf1.NodeName) @@ -198,7 +325,7 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con require.Equal(t, leaderAfterRestart2, leaderAfterRestart1) require.Equal(t, leaderAfterRestart3, leaderAfterRestart1) - require.NotEqual(t, leaderAfterRestart1, "node1", "After restart, node1 should not be the leader") + require.NotEqual(t, leaderAfterRestart1, conf1.NodeName, "After restart, node1 should not be the leader") t.Log("Test completed successfully") } diff --git a/cluster/raft/etcd/peer.go b/cluster/raft/etcd/peer.go index 0241b68..48f9708 100644 --- a/cluster/raft/etcd/peer.go +++ b/cluster/raft/etcd/peer.go @@ -368,7 +368,7 @@ func (p *Peer) serveChannels() { return case <-p.stopC: - p.Stop() + //p.Stop() return } } @@ -603,6 +603,8 @@ func (p *Peer) writeError(err error) { func (p *Peer) Stop() { p.stopHTTP() + close(p.proposeC) + close(p.confChangeC) close(p.commitC) close(p.errorC) p.node.Stop()