From 5c0b90a398d4e96b9c73e4bc4d0d5e13409f2a80 Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Thu, 28 Dec 2023 10:59:05 +0800 Subject: [PATCH 1/7] add filter env --- core/constants.go | 1 + core/globalContext.go | 17 ++++++++++++++++- dynamicConfig.go | 1 + 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/core/constants.go b/core/constants.go index a3108104..ff271ddd 100644 --- a/core/constants.go +++ b/core/constants.go @@ -115,6 +115,7 @@ const ( const ( GroupEnvironmentName = "MESH_SERVICE_ADDITIONAL_GROUP" DirectRPCEnvironmentName = "MESH_DIRECT_RPC" + FilterEnvironmentName = "MESH_FILTERS" ) // meta keys diff --git a/core/globalContext.go b/core/globalContext.go index c910199c..6efd7b7f 100644 --- a/core/globalContext.go +++ b/core/globalContext.go @@ -397,10 +397,11 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL { newURL = url } - //final filters: defaultFilter + globalFilter + filters + //final filters: defaultFilter + globalFilter + filters + envFilter finalFilters := c.MergeFilterSet( c.GetDefaultFilterSet(newURL), c.GetGlobalFilterSet(newURL), + c.GetEnvGlobalFilterSet(), c.GetFilterSet(newURL.GetStringParamsWithDefault(FilterKey, ""), ""), ) if len(finalFilters) > 0 { @@ -474,6 +475,20 @@ func (c *Context) GetGlobalFilterSet(newURL *URL) map[string]bool { newURL.GetStringParamsWithDefault(DisableGlobalFilter, "")) } +func (c *Context) GetEnvGlobalFilterSet() map[string]bool { + res := make(map[string]bool) + if filters := os.Getenv(FilterEnvironmentName); filters != "" { + for _, k := range strings.Split(filters, ",") { + k = strings.TrimSpace(k) + if k == "" { + continue + } + res[k] = true + } + } + return res +} + // parseMultipleServiceGroup add motan-service group support of multiple comma split group name func (c *Context) parseMultipleServiceGroup(motanServiceMap map[string]*URL) { addMotanServiceMap := map[string]*URL{} diff --git a/dynamicConfig.go b/dynamicConfig.go index afc10cc5..10ffe417 100644 --- a/dynamicConfig.go +++ b/dynamicConfig.go @@ -327,6 +327,7 @@ func (h *DynamicConfigurerHandler) parseURL(url *core.URL) (*core.URL, error) { finalFilters := h.agent.Context.MergeFilterSet( h.agent.Context.GetDefaultFilterSet(url), h.agent.Context.GetGlobalFilterSet(url), + h.agent.Context.GetEnvGlobalFilterSet(), h.agent.Context.GetFilterSet(url.GetStringParamsWithDefault(core.FilterKey, ""), ""), ) if len(finalFilters) > 0 { From f3f46a9843d91f48967d2f623ee7c7cfb572e7ab Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Wed, 3 Jan 2024 15:57:12 +0800 Subject: [PATCH 2/7] feat motan v1 metrics attachments --- filter/clusterMetrics.go | 2 +- filter/metrics.go | 2 +- protocol/motan1Protocol.go | 5 +++-- server/motanserver.go | 21 ++++++++++++++------- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/filter/clusterMetrics.go b/filter/clusterMetrics.go index 8019f3ce..46b2678b 100644 --- a/filter/clusterMetrics.go +++ b/filter/clusterMetrics.go @@ -58,6 +58,6 @@ func (c *ClusterMetricsFilter) Filter(haStrategy motan.HaStrategy, loadBalance m } keys := []string{role, request.GetAttachment(protocol.MSource), request.GetMethod()} addMetricWithKeys(request.GetAttachment(protocol.MGroup), ".cluster", - request.GetAttachment(protocol.MPath), keys, time.Since(start).Nanoseconds()/1e6, response) + request.GetServiceName(), keys, time.Since(start).Nanoseconds()/1e6, response) return response } diff --git a/filter/metrics.go b/filter/metrics.go index c69b41fd..49cfd17a 100644 --- a/filter/metrics.go +++ b/filter/metrics.go @@ -85,7 +85,7 @@ func (m *MetricsFilter) Filter(caller motan.Caller, request motan.Request) motan application = caller.GetURL().GetParam(motan.ApplicationKey, "") } keys := []string{role, application, request.GetMethod()} - addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetAttachment(protocol.MPath), + addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetServiceName(), keys, time.Since(start).Nanoseconds()/1e6, response) return response } diff --git a/protocol/motan1Protocol.go b/protocol/motan1Protocol.go index 19d9004c..65a37099 100644 --- a/protocol/motan1Protocol.go +++ b/protocol/motan1Protocol.go @@ -61,8 +61,9 @@ const ( ) const ( - V1Group = "group" - V1Version = "version" + V1Group = "group" + V1Version = "version" + V1Application = "application" ) const MAX_BLOCK_SIZE = 1024 diff --git a/server/motanserver.go b/server/motanserver.go index 23e0059b..7fdd21fb 100644 --- a/server/motanserver.go +++ b/server/motanserver.go @@ -303,13 +303,7 @@ func (m *MotanServer) processV1(msg *mpro.MotanV1Message, start time.Time, ip st var result []byte var reqCtx *motan.RPCContext req, err := mpro.DecodeMotanV1Request(msg) - // fill v2 attachment - if req.GetAttachment(mpro.MGroup) == "" { - req.SetAttachment(mpro.MGroup, req.GetAttachment(mpro.V1Group)) - } - if req.GetAttachment(mpro.MVersion) == "" { - req.SetAttachment(mpro.MVersion, req.GetAttachment(mpro.V1Version)) - } + setV1Attachments(req) if err != nil { vlog.Errorf("decode v1 request fail. conn: %s, err:%s", conn.RemoteAddr().String(), err.Error()) result = mpro.BuildV1ExceptionResponse(msg.Rid, err.Error()) @@ -377,3 +371,16 @@ func getRemoteIP(address string) string { } return ip } + +func setV1Attachments(req motan.Request) { + // fill v2 attachment + if req.GetAttachment(mpro.MGroup) == "" { + req.SetAttachment(mpro.MGroup, req.GetAttachment(mpro.V1Group)) + } + if req.GetAttachment(mpro.MVersion) == "" { + req.SetAttachment(mpro.MVersion, req.GetAttachment(mpro.V1Version)) + } + if req.GetAttachment(mpro.MSource) == "" { + req.SetAttachment(mpro.MSource, req.GetAttachment(mpro.V1Application)) + } +} From 236164d27d63333472573b3437673cca56d3f8f8 Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Thu, 4 Jan 2024 15:00:19 +0800 Subject: [PATCH 3/7] add environment handler feature and relevant filter feature --- agent.go | 29 ++++++++++++++++++++++++++--- core/constants.go | 1 + core/globalContext.go | 15 ++++++++++++++- dynamicConfig.go | 1 + 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/agent.go b/agent.go index 6428ff09..93c53f13 100644 --- a/agent.go +++ b/agent.go @@ -13,6 +13,7 @@ import ( "path/filepath" "runtime" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -75,6 +76,7 @@ type Agent struct { httpProxyServer *mserver.HTTPProxyServer manageHandlers map[string]http.Handler + envHandlers map[string]map[string]http.Handler svcLock sync.Mutex clsLock sync.Mutex @@ -109,6 +111,7 @@ func NewAgent(extfactory motan.ExtensionFactory) *Agent { agent.agentPortServer = make(map[int]motan.Server) agent.serviceRegistries = motan.NewCopyOnWriteMap() agent.manageHandlers = make(map[string]http.Handler) + agent.envHandlers = make(map[string]map[string]http.Handler) agent.serviceMap = motan.NewCopyOnWriteMap() return agent } @@ -777,9 +780,13 @@ func fillDefaultReqInfo(r motan.Request, url *motan.URL) { } } else { if r.GetAttachment(mpro.MSource) == "" { - application := url.GetParam(motan.ApplicationKey, "") - if application != "" { - r.SetAttachment(mpro.MSource, application) + if app := r.GetAttachment(motan.ApplicationKey); app != "" { + r.SetAttachment(mpro.MSource, app) + } else { + application := url.GetParam(motan.ApplicationKey, "") + if application != "" { + r.SetAttachment(mpro.MSource, application) + } } } if r.GetAttachment(mpro.MGroup) == "" { @@ -1090,6 +1097,12 @@ func (a *Agent) RegisterManageHandler(path string, handler http.Handler) { } } +func (a *Agent) RegisterEnvHandlers(envStr string, handlers map[string]http.Handler) { + if envStr != "" && handlers != nil { + a.envHandlers[envStr] = handlers // override + } +} + func (a *Agent) startMServer() { handlers := make(map[string]http.Handler, 16) for k, v := range GetDefaultManageHandlers() { @@ -1098,6 +1111,16 @@ func (a *Agent) startMServer() { for k, v := range a.manageHandlers { handlers[k] = v } + // register env handlers + extHandelrs := os.Getenv(motan.HandlerEnvironmentName) + for _, k := range strings.Split(extHandelrs, ",") { + if v, ok := a.envHandlers[strings.TrimSpace(k)]; ok { + for kk, vv := range v { + handlers[kk] = vv + } + + } + } for k, v := range handlers { a.mhandle(k, v) } diff --git a/core/constants.go b/core/constants.go index ff271ddd..63799607 100644 --- a/core/constants.go +++ b/core/constants.go @@ -116,6 +116,7 @@ const ( GroupEnvironmentName = "MESH_SERVICE_ADDITIONAL_GROUP" DirectRPCEnvironmentName = "MESH_DIRECT_RPC" FilterEnvironmentName = "MESH_FILTERS" + HandlerEnvironmentName = "MESH_ADMIN_EXT_HANDLERS" ) // meta keys diff --git a/core/globalContext.go b/core/globalContext.go index 6efd7b7f..28550f9a 100644 --- a/core/globalContext.go +++ b/core/globalContext.go @@ -69,7 +69,8 @@ var ( defaultConfigPath = "./" defaultFileSuffix = ".yaml" - urlFields = map[string]bool{"protocol": true, "host": true, "port": true, "path": true, "group": true} + urlFields = map[string]bool{"protocol": true, "host": true, "port": true, "path": true, "group": true} + extFilters = make(map[string]bool) ) // all env flag in motan-go @@ -87,6 +88,17 @@ var ( Recover = flag.Bool("recover", false, "recover from accidental exit") ) +func AddRelevantFilter(filterStr string) { + k := strings.TrimSpace(filterStr) + if k != "" { + extFilters[k] = true + } +} + +func GetRelevantFilters() map[string]bool { + return extFilters +} + func (c *Context) confToURLs(section string) map[string]*URL { urls := map[string]*URL{} sectionConf, _ := c.Config.GetSection(section) @@ -402,6 +414,7 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL { c.GetDefaultFilterSet(newURL), c.GetGlobalFilterSet(newURL), c.GetEnvGlobalFilterSet(), + GetRelevantFilters(), c.GetFilterSet(newURL.GetStringParamsWithDefault(FilterKey, ""), ""), ) if len(finalFilters) > 0 { diff --git a/dynamicConfig.go b/dynamicConfig.go index 10ffe417..442fd019 100644 --- a/dynamicConfig.go +++ b/dynamicConfig.go @@ -328,6 +328,7 @@ func (h *DynamicConfigurerHandler) parseURL(url *core.URL) (*core.URL, error) { h.agent.Context.GetDefaultFilterSet(url), h.agent.Context.GetGlobalFilterSet(url), h.agent.Context.GetEnvGlobalFilterSet(), + core.GetRelevantFilters(), h.agent.Context.GetFilterSet(url.GetStringParamsWithDefault(core.FilterKey, ""), ""), ) if len(finalFilters) > 0 { From 430c35313f2c47e709d6bd05725ed4ad53029d26 Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Thu, 4 Jan 2024 15:05:28 +0800 Subject: [PATCH 4/7] add environment handler feature and relevant filter feature --- core/globalContext.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/globalContext.go b/core/globalContext.go index 28550f9a..81ef149f 100644 --- a/core/globalContext.go +++ b/core/globalContext.go @@ -409,7 +409,7 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL { newURL = url } - //final filters: defaultFilter + globalFilter + filters + envFilter + //final filters: defaultFilter + globalFilter + filters + envFilter + relevantFilters finalFilters := c.MergeFilterSet( c.GetDefaultFilterSet(newURL), c.GetGlobalFilterSet(newURL), From a417ef3eb67bf2d34bba7f0c138d019ef8fc6aa0 Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Thu, 4 Jan 2024 16:58:19 +0800 Subject: [PATCH 5/7] add environment handler feature and relevant filter feature --- agent_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/agent_test.go b/agent_test.go index 3942d5ad..d21166e7 100644 --- a/agent_test.go +++ b/agent_test.go @@ -91,6 +91,58 @@ motan-refer: assert.Equal(t, "Hello jack from motan server", resp.GetValue()) assert.Equal(t, 100, server.GetProcessPoolSize()) } + +func Test_envHandler(t *testing.T) { + t.Parallel() + time.Sleep(time.Second * 3) + // start client mesh + ext := GetDefaultExtFactory() + os.Remove("agent.sock") + config, _ := config.NewConfigFromReader(bytes.NewReader([]byte(` +motan-agent: + mport: 13500 + port: 14821 + eport: 14281 + htport: 25282 + +motan-registry: + direct: + protocol: direct + address: 127.0.0.1:22991 + +motan-refer: + recom-engine-refer: + group: hello + path: helloService + protocol: motan2 + registry: direct + asyncInitConnection: false + serialization: breeze`))) + agent := NewAgent(ext) + agent.RegisterEnvHandlers("testHandler", map[string]http.Handler{ + "/test/test": testHandler(), + }) + os.Setenv(core.HandlerEnvironmentName, "testHandler") + go agent.StartMotanAgentFromConfig(config) + time.Sleep(time.Second * 3) + client := http.Client{ + Timeout: time.Second, + } + resp, err := client.Get("http://127.0.0.1:13500/test/test") + assert.Nil(t, err) + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + assert.Nil(t, err) + assert.Equal(t, string(b), "OK") + os.Unsetenv(core.HandlerEnvironmentName) +} + +func testHandler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("OK")) + } +} + func Test_unixClientCall2(t *testing.T) { t.Parallel() startServer(t, "helloService", 22992) From 276d70f43217db6b3e2679a05a65f89c10cb60ef Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Thu, 29 Feb 2024 16:57:24 +0800 Subject: [PATCH 6/7] 1. change the default metrics application for request to the app in url 2. add whitelist url map for mport --- agent.go | 17 +++++++++++++++++ core/constants.go | 6 ++++++ filter/filter.go | 2 +- filter/metrics.go | 23 +++++++++++++++++++++-- filter/metrics_test.go | 22 ++++++++++++++++++++++ permission/whitelist.go | 15 +++++++++++++++ 6 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 permission/whitelist.go diff --git a/agent.go b/agent.go index 9c9052ac..92225de73 100644 --- a/agent.go +++ b/agent.go @@ -223,6 +223,23 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) { a.startAgent() } +func (a *Agent) initSwitchers() { + // init metrics request application switcher + enableMetricsReqApp := a.agentURL.GetParam(motan.EnableMetricsReqApp, "false") + on, e := strconv.ParseBool(enableMetricsReqApp) + if e != nil { + vlog.Warningln("illegal value for enableMetricsReqApp, use default value: false") + } else { + switcher := motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication) + if switcher == nil { + vlog.Warningln("metrics request application switcher is nil, use default value: false") + } else { + vlog.Infoln("enableMetricsReqApp: ", on, ", set metrics request application switcher value") + switcher.SetValue(on) + } + } +} + func (a *Agent) startRegistryFailback() { vlog.Infoln("start agent failback") ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond) diff --git a/core/constants.go b/core/constants.go index 63799607..6098cbeb 100644 --- a/core/constants.go +++ b/core/constants.go @@ -70,6 +70,12 @@ const ( UnixSockProtocolFlag = "unix://" ) +// metrics request application +const ( + MetricsReqApplication = "metricsReqApp" + EnableMetricsReqApp = "enableMetricsReqApp" +) + // attachment keys const ( XForwardedForLower = "x-forwarded-for" // used as motan default proxy key diff --git a/filter/filter.go b/filter/filter.go index 38552ac8..9219ddf3 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -28,7 +28,7 @@ func RegistDefaultFilters(extFactory motan.ExtensionFactory) { }) extFactory.RegistExtFilter(Metrics, func() motan.Filter { - return &MetricsFilter{} + return &MetricsFilter{switcher: motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication)} }) extFactory.RegistExtFilter(CircuitBreaker, func() motan.Filter { diff --git a/filter/metrics.go b/filter/metrics.go index 49cfd17a..f67228c8 100644 --- a/filter/metrics.go +++ b/filter/metrics.go @@ -16,8 +16,13 @@ const ( MetricsSlowCountSuffix = ".slow_count" ) +func init() { + motan.GetSwitcherManager().Register(motan.MetricsReqApplication, false) +} + type MetricsFilter struct { - next motan.EndPointFilter + next motan.EndPointFilter + switcher *motan.Switcher } func (m *MetricsFilter) GetIndex() int { @@ -25,7 +30,9 @@ func (m *MetricsFilter) GetIndex() int { } func (m *MetricsFilter) NewFilter(url *motan.URL) motan.Filter { - return &MetricsFilter{} + return &MetricsFilter{ + switcher: motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication), + } } func (m *MetricsFilter) GetName() string { @@ -83,6 +90,18 @@ func (m *MetricsFilter) Filter(caller motan.Caller, request motan.Request) motan application := request.GetAttachment(protocol.MSource) if provider { application = caller.GetURL().GetParam(motan.ApplicationKey, "") + } else { + // to support application in caller URL + callerApplication := caller.GetURL().GetParam(motan.ApplicationKey, "") + if m.switcher.IsOpen() { + if application != callerApplication { + keys := []string{role, callerApplication, request.GetMethod()} + addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetServiceName(), + keys, time.Since(start).Nanoseconds()/1e6, response) + } + } else { + application = callerApplication + } } keys := []string{role, application, request.GetMethod()} addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetServiceName(), diff --git a/filter/metrics_test.go b/filter/metrics_test.go index 63cc5cc9..3fa33422 100644 --- a/filter/metrics_test.go +++ b/filter/metrics_test.go @@ -59,6 +59,28 @@ func TestMetricsFilter(t *testing.T) { assert.Equal(t, 1, int(metrics.GetStatItem(testGroup, "", testService).SnapshotAndClear().Count(getKeysStr(test.keys)+MetricsTotalCountSuffix)), "metric count") }) } + // test different client application + motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication).SetValue(true) + request3 := request.Clone().(motan.Request) + request3.SetAttachment(protocol.MSource, "test") + time.Sleep(10 * time.Millisecond) + mf.Filter(ep, request3) + time.Sleep(1000 * time.Millisecond) + snapShot := metrics.GetStatItem(testGroup, "", testService).SnapshotAndClear() + // The metrics filter has do escape + assert.Equal(t, 1, int(snapShot.Count(getKeysStr([]string{"motan-client-agent", application, testMethod})+MetricsTotalCountSuffix)), "metric count") + assert.Equal(t, 1, int(snapShot.Count(getKeysStr([]string{"motan-client-agent", "test", testMethod})+MetricsTotalCountSuffix)), "metric count") + // test switcher + motan.GetSwitcherManager().GetSwitcher(motan.MetricsReqApplication).SetValue(false) + request4 := request.Clone().(motan.Request) + time.Sleep(10 * time.Millisecond) + mf.Filter(ep, request4) + time.Sleep(1000 * time.Millisecond) + snapShot1 := metrics.GetStatItem(testGroup, "", testService).SnapshotAndClear() + // The metrics filter has do escape + assert.Equal(t, 1, int(snapShot1.Count(getKeysStr([]string{"motan-client-agent", application, testMethod})+MetricsTotalCountSuffix)), "metric count") + assert.Equal(t, 0, int(snapShot1.Count(getKeysStr([]string{"motan-client-agent", "test", testMethod})+MetricsTotalCountSuffix)), "metric count") + } func TestAddMetric(t *testing.T) { diff --git a/permission/whitelist.go b/permission/whitelist.go new file mode 100644 index 00000000..928c57e9 --- /dev/null +++ b/permission/whitelist.go @@ -0,0 +1,15 @@ +package permission + +var ( + whitelistMap = map[string]bool{ + "/version": true, + "/prometheus/metrics": true, + } +) + +func InWhiteList(url string) bool { + if _, ok := whitelistMap[url]; ok { + return true + } + return false +} From 7e87a86d0fbf1daeae66b03bea59e167e75770ef Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Thu, 29 Feb 2024 18:33:04 +0800 Subject: [PATCH 7/7] 1. change the default metrics application for request to the app in url 2. add whitelist url map for mport --- agent.go | 1 + 1 file changed, 1 insertion(+) diff --git a/agent.go b/agent.go index 92225de73..16757ed8 100644 --- a/agent.go +++ b/agent.go @@ -199,6 +199,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) { // start metrics reporter early, here agent context has already initialized metrics.StartReporter(a.Context) a.registerStatusSampler() + a.initSwitchers() a.initStatus() a.initClusters() a.startServerAgent()