diff --git a/memberlist_test.go b/memberlist_test.go index 93ebac5c5..3d8cf92d1 100644 --- a/memberlist_test.go +++ b/memberlist_test.go @@ -259,13 +259,8 @@ func TestCreate_checkBroadcastQueueMetrics(t *testing.T) { time.Sleep(3 * time.Second) - intv := getIntervalMetrics(t, sink) sampleName := "consul.usage.test.memberlist.queue.broadcasts" - actualSample := intv.Samples[sampleName] - - if actualSample.Count == 0 { - t.Fatalf("%s sample not taken", sampleName) - } + verifySampleExists(t, sampleName, sink) } func TestCreate_keyringOnly(t *testing.T) { diff --git a/net.go b/net.go index a8291c4f3..c89d38c0f 100644 --- a/net.go +++ b/net.go @@ -1007,6 +1007,22 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string } m.nodeLock.RUnlock() + nodeStateCounts := make(map[string]int) + nodeStateCounts[StateAlive.metricsString()] = 0 + nodeStateCounts[StateLeft.metricsString()] = 0 + nodeStateCounts[StateDead.metricsString()] = 0 + nodeStateCounts[StateSuspect.metricsString()] = 0 + + for _, n := range localNodes { + nodeStateCounts[n.State.metricsString()]++ + } + + for nodeState, cnt := range nodeStateCounts { + metrics.SetGaugeWithLabels([]string{"memberlist", "node", "instances"}, + float32(cnt), + append(m.metricLabels, metrics.Label{Name: "node_state", Value: nodeState})) + } + // Get the delegate state var userData []byte if m.config.Delegate != nil { @@ -1042,6 +1058,9 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string } } + moreBytes := binary.BigEndian.Uint32(bufConn.Bytes()[1:5]) + metrics.SetGaugeWithLabels([]string{"memberlist", "size", "local"}, float32(moreBytes), m.metricLabels) + // Get the send buffer return m.rawSendMsgStream(conn, bufConn.Bytes(), streamLabel) } @@ -1088,6 +1107,8 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) ( // Ensure we aren't asked to download too much. This is to guard against // an attack vector where a huge amount of state is sent moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5]) + metrics.AddSampleWithLabels([]string{"memberlist", "size", "remote"}, float32(moreBytes), m.metricLabels) + if moreBytes > maxPushStateBytes { return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes) diff --git a/net_test.go b/net_test.go index ce287547e..8555d54bf 100644 --- a/net_test.go +++ b/net_test.go @@ -690,6 +690,7 @@ func TestEncryptDecryptState(t *testing.T) { SecretKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, ProtocolVersion: ProtocolVersionMax, } + sink := registerInMemorySink(t) m, err := Create(config) if err != nil { @@ -710,6 +711,7 @@ func TestEncryptDecryptState(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + verifySampleExists(t, "consul.usage.test.memberlist.size.remote", sink) if !reflect.DeepEqual(state, plain) { t.Fatalf("Decrypt failed: %v", plain) diff --git a/state.go b/state.go index a9ee88996..4e3590995 100644 --- a/state.go +++ b/state.go @@ -15,6 +15,21 @@ import ( type NodeStateType int +func (t NodeStateType) metricsString() string { + switch t { + case StateAlive: + return "alive" + case StateDead: + return "dead" + case StateSuspect: + return "suspect" + case StateLeft: + return "left" + default: + return fmt.Sprintf("unhandled-value-%d", t) + } +} + const ( StateAlive NodeStateType = iota StateSuspect diff --git a/state_test.go b/state_test.go index 0645aca50..9b1324b0f 100644 --- a/state_test.go +++ b/state_test.go @@ -2239,6 +2239,8 @@ func TestMemberlist_PushPull(t *testing.T) { ip1 := []byte(addr1) ip2 := []byte(addr2) + sink := registerInMemorySink(t) + ch := make(chan NodeEvent, 3) m1 := HostMemberlist(addr1.String(), t, func(c *Config) { @@ -2270,6 +2272,13 @@ func TestMemberlist_PushPull(t *testing.T) { if len(ch) < 2 { failf("expected 2 messages from pushPull") } + + instancesMetricName := "consul.usage.test.memberlist.node.instances" + verifyGaugeExists(t, "consul.usage.test.memberlist.size.local", sink) + verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateAlive.metricsString()), sink) + verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateDead.metricsString()), sink) + verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateLeft.metricsString()), sink) + verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateSuspect.metricsString()), sink) }) } @@ -2412,3 +2421,24 @@ func getIntervalMetrics(t *testing.T, sink *metrics.InmemSink) *metrics.Interval intv := intervals[0] return intv } + +func verifyGaugeExists(t *testing.T, name string, sink *metrics.InmemSink) { + t.Helper() + interval := getIntervalMetrics(t, sink) + interval.RLock() + defer interval.RUnlock() + if _, ok := interval.Gauges[name]; !ok { + t.Fatalf("%s gauge not emmited", name) + } +} + +func verifySampleExists(t *testing.T, name string, sink *metrics.InmemSink) { + t.Helper() + interval := getIntervalMetrics(t, sink) + interval.RLock() + defer interval.RUnlock() + + if _, ok := interval.Samples[name]; !ok { + t.Fatalf("%s sample not emmited", name) + } +}