From bb26c8574e67603ff22056af3768781bad6e32e4 Mon Sep 17 00:00:00 2001 From: Matthew Pound Date: Mon, 14 Aug 2023 11:59:52 -0600 Subject: [PATCH] sfxclient: Add ability to panic if we're about to emit duplicate datapoints --- go.mod | 1 + go.sum | 2 ++ sfxclient/sfxclient.go | 44 +++++++++++++++++++++++++++++++++++++ sfxclient/sfxclient_test.go | 15 +++++++++++++ 4 files changed, 62 insertions(+) diff --git a/go.mod b/go.mod index b8b0257..2527a98 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/signalfx/sapm-proto v0.12.0 github.com/smartystreets/goconvey v1.6.4 github.com/stretchr/testify v1.8.4 + github.com/twmb/murmur3 v1.1.7 github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec google.golang.org/grpc v1.56.1 ) diff --git a/go.sum b/go.sum index 6cf30d4..91075c6 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03O github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o= github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= +github.com/twmb/murmur3 v1.1.7 h1:ULWBiM04n/XoN3YMSJ6Z2pHDFLf+MeIVQU71ZPrvbWg= +github.com/twmb/murmur3 v1.1.7/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= diff --git a/sfxclient/sfxclient.go b/sfxclient/sfxclient.go index 8c523e9..89cb271 100644 --- a/sfxclient/sfxclient.go +++ b/sfxclient/sfxclient.go @@ -64,7 +64,9 @@ import ( "context" "expvar" "fmt" + "os" "reflect" + "sort" "strings" "sync" "sync/atomic" @@ -75,6 +77,7 @@ import ( "github.com/signalfx/golib/v3/errors" "github.com/signalfx/golib/v3/log" "github.com/signalfx/golib/v3/timekeeper" + "github.com/twmb/murmur3" ) const ( @@ -84,6 +87,8 @@ const ( DefaultReportingTimeout = time.Second * 5 // used when group name is "" defaultCallbackGroup = "default-callback-group" + // used if you want to check if you're emitting the same datapoint twice and causing inaccurate metrics + panicOnDupes = "SFXCLIENT_PANIC_ON_DUPES" ) // DefaultErrorHandler is the default way to handle errors by a scheduler. It simply prints them to stdout @@ -236,6 +241,7 @@ func (s *Scheduler) collectDatapoints() []*datapoint.Datapoint { ret = append(ret, p.getDatapoints(now, s.SendZeroTime)...) } } + s.checkDatapoints(ret) return ret } @@ -372,3 +378,41 @@ func (s *Scheduler) Schedule(ctx context.Context) error { } } } + +func (s *Scheduler) checkDatapoints(ret []*datapoint.Datapoint) { + if len(os.Getenv(panicOnDupes)) > 0 { + var dupes []*datapoint.Datapoint + m := make(map[uint32]*datapoint.Datapoint, len(ret)) + buff := bytes.Buffer{} + for _, d := range ret { + buff.Reset() + + buff.WriteString(d.Metric) + mk := make([]string, len(d.Dimensions)) + i := 0 + for k := range d.Dimensions { + mk[i] = k + i++ + } + sort.Strings(mk) + for k := range mk { + buff.WriteString(mk[k]) + buff.WriteString(d.Dimensions[mk[k]]) + } + + murmHash := murmur3.Sum32(buff.Bytes()) + if _, ok := m[murmHash]; ok { + dupes = append(dupes, d) + } else { + m[murmHash] = d + } + } + if len(dupes) > 0 { + msg := make([]string, len(dupes)) + for i, d := range dupes { + msg[i] = fmt.Sprintf("Found duplicate datapoint: %s", d) + } + panic(strings.Join(msg, "\n")) + } + } +} diff --git a/sfxclient/sfxclient_test.go b/sfxclient/sfxclient_test.go index 9ff9dcd..e1a9f89 100644 --- a/sfxclient/sfxclient_test.go +++ b/sfxclient/sfxclient_test.go @@ -3,6 +3,7 @@ package sfxclient import ( "context" "fmt" + "os" "runtime" "strconv" "strings" @@ -12,6 +13,7 @@ import ( "time" "github.com/signalfx/golib/v3/datapoint" + "github.com/signalfx/golib/v3/datapoint/dptest" "github.com/signalfx/golib/v3/errors" "github.com/signalfx/golib/v3/timekeeper/timekeepertest" . "github.com/smartystreets/goconvey/convey" @@ -123,6 +125,19 @@ func TestScheduler_ReportingTimeout(t *testing.T) { }) } +func TestPanicOnDupes(t *testing.T) { + Convey("testing panic on dupes", t, func() { + s := &Scheduler{} + + os.Setenv(panicOnDupes, "true") + defer os.Setenv(panicOnDupes, "") + dp := dptest.DP() + s.checkDatapoints([]*datapoint.Datapoint{dp}) + + So(func() { s.checkDatapoints([]*datapoint.Datapoint{dp, dp}) }, ShouldPanic) + }) +} + func TestCollectDatapointDebug(t *testing.T) { Convey("testing collect datapoints with debug mode enabled", t, func() { var handleErrRet error