Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update: #386

Merged
merged 12 commits into from
Mar 1, 2024
5 changes: 5 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ const (
UnixSockProtocolFlag = "unix://"
)

// metrics request application
const (
MetricsReqApplication = "metricsReqApp"
)

// attachment keys
const (
XForwardedForLower = "x-forwarded-for" // used as motan default proxy key
Expand Down
6 changes: 3 additions & 3 deletions core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"errors"
"flag"
"fmt"
cfg "github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/log"
"os"
"reflect"
"strings"

cfg "github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/log"
)

const (
Expand Down Expand Up @@ -250,6 +249,7 @@ func (c *Context) Initialize() {
c.parserBasicServices()
c.parseServices()
c.parseHTTPClients()
initSwitcher(c)
}

func (c *Context) parseSingleConfiguration() (*cfg.Config, error) {
Expand Down
22 changes: 18 additions & 4 deletions core/switcher.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package core

import (
"sync"

"github.com/weibocom/motan-go/log"
"sync"
)

var (
Expand All @@ -19,10 +18,25 @@ type SwitcherListener interface {
Notify(value bool)
}

type switcherConfig map[string]bool

func GetSwitcherManager() *SwitcherManager {
return manager
}

func initSwitcher(c *Context) {
var sc switcherConfig
err := c.Config.GetStruct("switchers", &sc)
if err != nil {
vlog.Warningf("init switcher config fail: %v", err)
return
}
for k, v := range sc {
GetSwitcherManager().Register(k, v)
}

}

func (s *SwitcherManager) Register(name string, value bool, listeners ...SwitcherListener) {
if name == "" {
vlog.Warningln("[switcher] register failed: switcher name is empty")
Expand Down Expand Up @@ -51,7 +65,7 @@ func (s *SwitcherManager) GetAllSwitchers() map[string]bool {
return result
}

//GetSwitcher returns the switcher with the given name, or nil if not found.
// GetSwitcher returns the switcher with the given name, or nil if not found.
func (s *SwitcherManager) GetSwitcher(name string) *Switcher {
s.switcherLock.RLock()
defer s.switcherLock.RUnlock()
Expand All @@ -62,7 +76,7 @@ func (s *SwitcherManager) GetSwitcher(name string) *Switcher {
return nil
}

//GetOrRegister returns the switcher with the given name if it's already registered, otherwise registers and returns the new switcher.
// GetOrRegister returns the switcher with the given name if it's already registered, otherwise registers and returns the new switcher.
func (s *SwitcherManager) GetOrRegister(name string, value bool, listeners ...SwitcherListener) *Switcher {
sw := s.GetSwitcher(name)
if sw == nil {
Expand Down
18 changes: 17 additions & 1 deletion filter/clusterMetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (c *ClusterMetricsFilter) GetIndex() int {
}

func (c *ClusterMetricsFilter) NewFilter(url *motan.URL) motan.Filter {
initReqAppSwitcher()
return &ClusterMetricsFilter{}
}

Expand Down Expand Up @@ -56,7 +57,22 @@ func (c *ClusterMetricsFilter) Filter(haStrategy motan.HaStrategy, loadBalance m
if ctx != nil && ctx.Proxy {
role = "motan-client-agent"
}
keys := []string{role, request.GetAttachment(protocol.MSource), request.GetMethod()}
var application string
url := haStrategy.GetURL()
if url != nil {
application = url.GetParam(motan.ApplicationKey, "")
}
// to support application in caller URL
if metricsReqAppSwitcher.IsOpen() {
reqApplication := request.GetAttachment(protocol.MSource)
if application != reqApplication {
keys := []string{role, reqApplication, request.GetMethod()}
addMetricWithKeys(request.GetAttachment(protocol.MGroup), ".cluster", request.GetServiceName(),
keys, time.Since(start).Nanoseconds()/1e6, response)
}
}

keys := []string{role, application, request.GetMethod()}
addMetricWithKeys(request.GetAttachment(protocol.MGroup), ".cluster",
request.GetServiceName(), keys, time.Since(start).Nanoseconds()/1e6, response)
return response
Expand Down
85 changes: 85 additions & 0 deletions filter/clusterMetrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package filter

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/weibocom/motan-go/config"
motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/metrics"
"github.com/weibocom/motan-go/protocol"
"testing"
"time"
)

func TestClusterMetricsFilter(t *testing.T) {
application := "testApplication"
url := mockURL()
url.PutParam(motan.ApplicationKey, application)
url.PutParam("haStrategy", "test")
mf := (&ClusterMetricsFilter{}).NewFilter(url).(motan.ClusterFilter)
assert.NotNil(t, mf, "new filter")
assert.Equal(t, motan.ClusterFilterType, int(mf.GetType()), "filter type")
assert.Equal(t, ClusterMetrics, mf.GetName(), "filter name")

metrics.StartReporter(&motan.Context{Config: config.NewConfig()})
// test filter
factory := initFactory()
mf = factory.GetFilter(ClusterMetrics).(motan.ClusterFilter)
mf.SetNext(motan.GetLastClusterFilter())
request := defaultRequest()
var getKeysStr = func(keys []string) string {
return metrics.Escape(keys[0]) + ":" + metrics.Escape(keys[1]) + ":" + metrics.Escape(keys[2])
}
request.GetRPCContext(true).Proxy = true
request.SetAttachment(protocol.MSource, application)
request.SetAttachment(protocol.MPath, testService)
assert.Nil(t, metrics.GetStatItem(testGroup, "", testService), "metric stat")
haStrategy := &testHA{url}
lb := factory.GetLB(url)
// test different client application
motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication).SetValue(true)
request3 := request.Clone().(motan.Request)
request3.SetAttachment(protocol.MSource, "test")
time.Sleep(10 * time.Millisecond)
mf.Filter(haStrategy, lb, request3)
time.Sleep(1000 * time.Millisecond)
snapShot := metrics.GetStatItem(testGroup, ".cluster", testService).SnapshotAndClear()
// The metrics filter has do escape
assert.Equal(t, 1, int(snapShot.Count(getKeysStr([]string{"motan-client-agent", application, testMethod})+MetricsTotalCountSuffix)), "metric count")
assert.Equal(t, 1, int(snapShot.Count(getKeysStr([]string{"motan-client-agent", "test", testMethod})+MetricsTotalCountSuffix)), "metric count")
// test switcher
motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication).SetValue(false)
request4 := request.Clone().(motan.Request)
time.Sleep(10 * time.Millisecond)
mf.Filter(haStrategy, lb, request4)
time.Sleep(1000 * time.Millisecond)
snapShot1 := metrics.GetStatItem(testGroup, ".cluster", testService).SnapshotAndClear()
// The metrics filter has do escape
assert.Equal(t, 1, int(snapShot1.Count(getKeysStr([]string{"motan-client-agent", application, testMethod})+MetricsTotalCountSuffix)), "metric count")
assert.Equal(t, 0, int(snapShot1.Count(getKeysStr([]string{"motan-client-agent", "test", testMethod})+MetricsTotalCountSuffix)), "metric count")
}

type testHA struct {
url *motan.URL
}

func (f *testHA) GetName() string {
return "test"
}

func (f *testHA) GetURL() *motan.URL {
return f.url
}

func (f *testHA) SetURL(url *motan.URL) {
f.url = url
}

func (f *testHA) Call(request motan.Request, loadBalance motan.LoadBalance) motan.Response {
errorResponse := getErrorResponseWithCode(request.GetRequestID(), 500, fmt.Sprintf("test"))
return errorResponse
}

func getErrorResponseWithCode(requestID uint64, errCode int, errMsg string) *motan.MotanResponse {
return motan.BuildExceptionResponse(requestID, &motan.Exception{ErrCode: errCode, ErrMsg: errMsg, ErrType: motan.ServiceException})
}
28 changes: 25 additions & 3 deletions filter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/metrics"
"github.com/weibocom/motan-go/protocol"
"sync"
"time"
)

Expand All @@ -16,6 +17,11 @@ const (
MetricsSlowCountSuffix = ".slow_count"
)

var (
metricOnce = sync.Once{}
metricsReqAppSwitcher *motan.Switcher
)

type MetricsFilter struct {
next motan.EndPointFilter
}
Expand All @@ -25,6 +31,7 @@ func (m *MetricsFilter) GetIndex() int {
}

func (m *MetricsFilter) NewFilter(url *motan.URL) motan.Filter {
initReqAppSwitcher()
return &MetricsFilter{}
}

Expand Down Expand Up @@ -80,9 +87,15 @@ func (m *MetricsFilter) Filter(caller motan.Caller, request motan.Request) motan
}
}
//get application
application := request.GetAttachment(protocol.MSource)
if provider {
application = caller.GetURL().GetParam(motan.ApplicationKey, "")
application := caller.GetURL().GetParam(motan.ApplicationKey, "")
if !provider && metricsReqAppSwitcher.IsOpen() {
// to support application in caller URL
reqApplication := request.GetAttachment(protocol.MSource)
if application != reqApplication {
keys := []string{role, reqApplication, request.GetMethod()}
addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetServiceName(),
keys, time.Since(start).Nanoseconds()/1e6, response)
}
}
keys := []string{role, application, request.GetMethod()}
addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetServiceName(),
Expand Down Expand Up @@ -111,3 +124,12 @@ func addMetricWithKeys(group, groupSuffix string, service string, keys []string,
func (m *MetricsFilter) SetContext(context *motan.Context) {
metrics.StartReporter(context)
}

func initReqAppSwitcher() {
// registry default switcher value here
// if the switcher has already been registered in Context.Initialize,
// the default value will not overwrite it.
metricOnce.Do(func() {
metricsReqAppSwitcher = motan.GetSwitcherManager().GetOrRegister(motan.MetricsReqApplication, false)
})
}
21 changes: 21 additions & 0 deletions filter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ func TestMetricsFilter(t *testing.T) {
assert.Equal(t, 1, int(metrics.GetStatItem(testGroup, "", testService).SnapshotAndClear().Count(getKeysStr(test.keys)+MetricsTotalCountSuffix)), "metric count")
})
}
// test different client application
motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication).SetValue(true)
request3 := request.Clone().(motan.Request)
request3.SetAttachment(protocol.MSource, "test")
time.Sleep(10 * time.Millisecond)
mf.Filter(ep, request3)
time.Sleep(1000 * time.Millisecond)
snapShot := metrics.GetStatItem(testGroup, "", testService).SnapshotAndClear()
// The metrics filter has do escape
assert.Equal(t, 1, int(snapShot.Count(getKeysStr([]string{"motan-client-agent", application, testMethod})+MetricsTotalCountSuffix)), "metric count")
assert.Equal(t, 1, int(snapShot.Count(getKeysStr([]string{"motan-client-agent", "test", testMethod})+MetricsTotalCountSuffix)), "metric count")
// test switcher
motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication).SetValue(false)
request4 := request.Clone().(motan.Request)
time.Sleep(10 * time.Millisecond)
mf.Filter(ep, request4)
time.Sleep(1000 * time.Millisecond)
snapShot1 := metrics.GetStatItem(testGroup, "", testService).SnapshotAndClear()
// The metrics filter has do escape
assert.Equal(t, 1, int(snapShot1.Count(getKeysStr([]string{"motan-client-agent", application, testMethod})+MetricsTotalCountSuffix)), "metric count")
assert.Equal(t, 0, int(snapShot1.Count(getKeysStr([]string{"motan-client-agent", "test", testMethod})+MetricsTotalCountSuffix)), "metric count")
}

func TestAddMetric(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion ha/backupRequestHA_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ func TestBackupRequestHA_Call3(t *testing.T) {

func getEP(processTime int64) motan.EndPoint {
caller := &motan.TestEndPoint{ProcessTime: processTime}
caller.SetURL(&motan.URL{})
motan.Initialize(caller)
fep := &motan.FilterEndPoint{Caller: caller}
mf := &filter.MetricsFilter{}
metricsFilter := &filter.MetricsFilter{}
mf := metricsFilter.NewFilter(nil).(*filter.MetricsFilter)
mf.SetContext(&motan.Context{Config: config.NewConfig()})
mf.SetNext(motan.GetLastEndPointFilter())
fep.Filter = mf
Expand Down
Loading