Skip to content

Commit

Permalink
Merge pull request #254 from signalfx/dupesfxclient
Browse files Browse the repository at this point in the history
sfxclient: Add ability to panic if we're about to emit duplicate data…
  • Loading branch information
mdubbyap authored Aug 14, 2023
2 parents 89d8886 + bb26c85 commit 4ab4277
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/signalfx/sapm-proto v0.12.0
github.com/smartystreets/goconvey v1.6.4
github.com/stretchr/testify v1.8.4
github.com/twmb/murmur3 v1.1.7
github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec
google.golang.org/grpc v1.56.1
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03O
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/twmb/murmur3 v1.1.7 h1:ULWBiM04n/XoN3YMSJ6Z2pHDFLf+MeIVQU71ZPrvbWg=
github.com/twmb/murmur3 v1.1.7/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
Expand Down
44 changes: 44 additions & 0 deletions sfxclient/sfxclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ import (
"context"
"expvar"
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
Expand All @@ -75,6 +77,7 @@ import (
"github.com/signalfx/golib/v3/errors"
"github.com/signalfx/golib/v3/log"
"github.com/signalfx/golib/v3/timekeeper"
"github.com/twmb/murmur3"
)

const (
Expand All @@ -84,6 +87,8 @@ const (
DefaultReportingTimeout = time.Second * 5
// used when group name is ""
defaultCallbackGroup = "default-callback-group"
// used if you want to check if you're emitting the same datapoint twice and causing inaccurate metrics
panicOnDupes = "SFXCLIENT_PANIC_ON_DUPES"
)

// DefaultErrorHandler is the default way to handle errors by a scheduler. It simply prints them to stdout
Expand Down Expand Up @@ -236,6 +241,7 @@ func (s *Scheduler) collectDatapoints() []*datapoint.Datapoint {
ret = append(ret, p.getDatapoints(now, s.SendZeroTime)...)
}
}
s.checkDatapoints(ret)
return ret
}

Expand Down Expand Up @@ -372,3 +378,41 @@ func (s *Scheduler) Schedule(ctx context.Context) error {
}
}
}

func (s *Scheduler) checkDatapoints(ret []*datapoint.Datapoint) {
if len(os.Getenv(panicOnDupes)) > 0 {
var dupes []*datapoint.Datapoint
m := make(map[uint32]*datapoint.Datapoint, len(ret))
buff := bytes.Buffer{}
for _, d := range ret {
buff.Reset()

buff.WriteString(d.Metric)
mk := make([]string, len(d.Dimensions))
i := 0
for k := range d.Dimensions {
mk[i] = k
i++
}
sort.Strings(mk)
for k := range mk {
buff.WriteString(mk[k])
buff.WriteString(d.Dimensions[mk[k]])
}

murmHash := murmur3.Sum32(buff.Bytes())
if _, ok := m[murmHash]; ok {
dupes = append(dupes, d)
} else {
m[murmHash] = d
}
}
if len(dupes) > 0 {
msg := make([]string, len(dupes))
for i, d := range dupes {
msg[i] = fmt.Sprintf("Found duplicate datapoint: %s", d)
}
panic(strings.Join(msg, "\n"))
}
}
}
15 changes: 15 additions & 0 deletions sfxclient/sfxclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sfxclient
import (
"context"
"fmt"
"os"
"runtime"
"strconv"
"strings"
Expand All @@ -12,6 +13,7 @@ import (
"time"

"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/datapoint/dptest"
"github.com/signalfx/golib/v3/errors"
"github.com/signalfx/golib/v3/timekeeper/timekeepertest"
. "github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -123,6 +125,19 @@ func TestScheduler_ReportingTimeout(t *testing.T) {
})
}

func TestPanicOnDupes(t *testing.T) {
Convey("testing panic on dupes", t, func() {
s := &Scheduler{}

os.Setenv(panicOnDupes, "true")
defer os.Setenv(panicOnDupes, "")
dp := dptest.DP()
s.checkDatapoints([]*datapoint.Datapoint{dp})

So(func() { s.checkDatapoints([]*datapoint.Datapoint{dp, dp}) }, ShouldPanic)
})
}

func TestCollectDatapointDebug(t *testing.T) {
Convey("testing collect datapoints with debug mode enabled", t, func() {
var handleErrRet error
Expand Down

0 comments on commit 4ab4277

Please sign in to comment.