From 4e8f5a0b9e3d764a94ebcac67c5879c399ce108d Mon Sep 17 00:00:00 2001 From: louyuting Date: Fri, 25 Sep 2020 23:23:08 +0800 Subject: [PATCH] Support arbitrary statistic duration for flow control and refactor internal implementation (#200) * Support arbitrary statistic duration for flow control. Add `StatIntervalInMs` attribute in flow.Rule. When StatIntervalInMs > globalInterval or < bucketLength, we create a new sliding window for it. * Add stat reuse mechanism for flow rules * Make threshold of flow rule "request amount per interval" rather than QPS. * Refine internal implementation --- adapter/echo/middleware_test.go | 2 + adapter/gin/middleware_test.go | 2 + api/slot_chain.go | 1 + core/base/stat.go | 49 +++- core/base/stat_test.go | 25 +- core/config/config.go | 19 ++ core/config/entity.go | 33 +++ core/flow/rule.go | 50 +++- core/flow/rule_manager.go | 267 +++++++++++++++--- core/flow/rule_manager_test.go | 240 +++++++++++++++- core/flow/slot.go | 6 +- core/flow/slot_test.go | 54 ++++ core/flow/standalone_stat_slot.go | 31 ++ core/flow/tc_default.go | 37 ++- core/flow/tc_throttling.go | 13 +- core/flow/tc_throttling_test.go | 6 +- core/flow/tc_warm_up.go | 16 +- core/flow/traffic_shaping.go | 35 ++- core/stat/base/sliding_window_metric.go | 29 +- core/stat/base/sliding_window_metric_test.go | 94 +----- core/stat/base_node.go | 15 +- core/stat/resource_node.go | 4 +- core/stat/stat_slot.go | 10 +- tests/testdata/extension/helper/FlowRule.json | 9 +- 24 files changed, 834 insertions(+), 213 deletions(-) create mode 100644 core/flow/slot_test.go create mode 100644 core/flow/standalone_stat_slot.go diff --git a/adapter/echo/middleware_test.go b/adapter/echo/middleware_test.go index b94bd512f..71bf2a3b0 100644 --- a/adapter/echo/middleware_test.go +++ b/adapter/echo/middleware_test.go @@ -24,12 +24,14 @@ func initSentinel(t *testing.T) { Threshold: 1, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, + StatIntervalInMs: 1000, }, { Resource: "/api/:uid", Threshold: 0, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, + StatIntervalInMs: 1000, }, }) if err != nil { diff --git a/adapter/gin/middleware_test.go b/adapter/gin/middleware_test.go index 0bd1fbf03..5ea2fc871 100644 --- a/adapter/gin/middleware_test.go +++ b/adapter/gin/middleware_test.go @@ -24,12 +24,14 @@ func initSentinel(t *testing.T) { Threshold: 1, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, + StatIntervalInMs: 1000, }, { Resource: "/api/users/:id", Threshold: 0, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, + StatIntervalInMs: 1000, }, }) if err != nil { diff --git a/api/slot_chain.go b/api/slot_chain.go index 6fa0556de..90c9411be 100644 --- a/api/slot_chain.go +++ b/api/slot_chain.go @@ -38,5 +38,6 @@ func BuildDefaultSlotChain() *base.SlotChain { sc.AddStatSlotLast(&log.Slot{}) sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{}) sc.AddStatSlotLast(&hotspot.ConcurrencyStatSlot{}) + sc.AddStatSlotLast(&flow.StandaloneStatSlot{}) return sc } diff --git a/core/base/stat.go b/core/base/stat.go index bd9404d8a..2a410ded8 100644 --- a/core/base/stat.go +++ b/core/base/stat.go @@ -1,5 +1,9 @@ package base +import ( + "github.com/pkg/errors" +) + type TimePredicate func(uint64) bool type MetricEvent int8 @@ -31,7 +35,7 @@ type ReadStat interface { } type WriteStat interface { - AddMetric(event MetricEvent, count uint64) + AddCount(event MetricEvent, count int64) } // StatNode holds real-time statistics for resources. @@ -46,4 +50,47 @@ type StatNode interface { DecreaseGoroutineNum() Reset() + + // GenerateReadStat generates the readonly metric statistic based on resource level global statistic + // If parameters, sampleCount and intervalInMs, are not suitable for resource level global statistic, return (nil, error) + GenerateReadStat(sampleCount uint32, intervalInMs uint32) (ReadStat, error) +} + +var ( + IllegalGlobalStatisticParamsError = errors.New("Invalid parameters, sampleCount or interval, for resource's global statistic") + IllegalStatisticParamsError = errors.New("Invalid parameters, sampleCount or interval, for metric statistic") + GlobalStatisticNonReusableError = errors.New("The parameters, sampleCount and interval, mismatch for reusing between resource's global statistic and readonly metric statistic.") +) + +func CheckValidityForStatistic(sampleCount, intervalInMs uint32) error { + if intervalInMs == 0 || sampleCount == 0 || intervalInMs%sampleCount != 0 { + return IllegalStatisticParamsError + } + return nil +} + +// CheckValidityForReuseStatistic check the compliance whether readonly metric statistic can be built based on resource's global statistic +// The parameters, sampleCount and intervalInMs, are the parameters of the metric statistic you want to build +// The parameters, parentSampleCount and parentIntervalInMs, are the parameters of the resource's global statistic +// If compliance passes, return nil, if not returns specific error +func CheckValidityForReuseStatistic(sampleCount, intervalInMs uint32, parentSampleCount, parentIntervalInMs uint32) error { + if intervalInMs == 0 || sampleCount == 0 || intervalInMs%sampleCount != 0 { + return IllegalStatisticParamsError + } + bucketLengthInMs := intervalInMs / sampleCount + + if parentIntervalInMs == 0 || parentSampleCount == 0 || parentIntervalInMs%parentSampleCount != 0 { + return IllegalGlobalStatisticParamsError + } + parentBucketLengthInMs := parentIntervalInMs / parentSampleCount + + //SlidingWindowMetric's intervalInMs is not divisible by BucketLeapArray's intervalInMs + if parentIntervalInMs%intervalInMs != 0 { + return GlobalStatisticNonReusableError + } + // BucketLeapArray's BucketLengthInMs is not divisible by SlidingWindowMetric's BucketLengthInMs + if bucketLengthInMs%parentBucketLengthInMs != 0 { + return GlobalStatisticNonReusableError + } + return nil } diff --git a/core/base/stat_test.go b/core/base/stat_test.go index 3111a39dc..aef75a614 100644 --- a/core/base/stat_test.go +++ b/core/base/stat_test.go @@ -1,12 +1,17 @@ package base -import "github.com/stretchr/testify/mock" +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) type StatNodeMock struct { mock.Mock } -func (m *StatNodeMock) AddMetric(event MetricEvent, count uint64) { +func (m *StatNodeMock) AddCount(event MetricEvent, count int64) { m.Called(event, count) } @@ -64,3 +69,19 @@ func (m *StatNodeMock) Reset() { m.Called() return } + +func (m *StatNodeMock) GenerateReadStat(sampleCount uint32, intervalInMs uint32) (ReadStat, error) { + args := m.Called(sampleCount, intervalInMs) + return args.Get(0).(ReadStat), args.Error(1) +} + +func TestCheckValidityForReuseStatistic(t *testing.T) { + assert.Equal(t, CheckValidityForReuseStatistic(3, 1000, 20, 10000), IllegalStatisticParamsError) + assert.Equal(t, CheckValidityForReuseStatistic(0, 1000, 20, 10000), IllegalStatisticParamsError) + assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 21, 10000), IllegalGlobalStatisticParamsError) + assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 0, 10000), IllegalGlobalStatisticParamsError) + assert.Equal(t, CheckValidityForReuseStatistic(2, 8000, 20, 10000), GlobalStatisticNonReusableError) + assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 10, 10000), GlobalStatisticNonReusableError) + assert.Equal(t, CheckValidityForReuseStatistic(1, 1000, 100, 10000), nil) + assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 20, 10000), nil) +} diff --git a/core/config/config.go b/core/config/config.go index f2beec2e8..c6ae3049d 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -205,3 +205,22 @@ func SystemStatCollectIntervalMs() uint32 { func UseCacheTime() bool { return globalCfg.UseCacheTime() } + +func GlobalStatisticIntervalMsTotal() uint32 { + return globalCfg.GlobalStatisticIntervalMsTotal() +} + +func GlobalStatisticSampleCountTotal() uint32 { + return globalCfg.GlobalStatisticSampleCountTotal() +} + +func GlobalStatisticBucketLengthInMs() uint32 { + return globalCfg.GlobalStatisticIntervalMsTotal() / GlobalStatisticSampleCountTotal() +} + +func MetricStatisticIntervalMs() uint32 { + return globalCfg.MetricStatisticIntervalMs() +} +func MetricStatisticSampleCount() uint32 { + return globalCfg.MetricStatisticSampleCount() +} diff --git a/core/config/entity.go b/core/config/entity.go index a6b97c96a..44c24c2dc 100644 --- a/core/config/entity.go +++ b/core/config/entity.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/logging" "github.com/pkg/errors" ) @@ -52,6 +53,15 @@ type MetricLogConfig struct { // StatConfig represents the configuration items of statistics. type StatConfig struct { + // GlobalStatisticSampleCountTotal and GlobalStatisticIntervalMsTotal is the per resource's global default statistic sliding window config + GlobalStatisticSampleCountTotal uint32 `yaml:"globalStatisticSampleCountTotal"` + GlobalStatisticIntervalMsTotal uint32 `yaml:"globalStatisticIntervalMsTotal"` + + // MetricStatisticSampleCount and MetricStatisticIntervalMs is the per resource's default readonly metric statistic + // This default readonly metric statistic must be reusable based on global statistic. + MetricStatisticSampleCount uint32 `yaml:"metricStatisticSampleCount"` + MetricStatisticIntervalMs uint32 `yaml:"metricStatisticIntervalMs"` + System SystemStatConfig `yaml:"system"` } @@ -84,6 +94,10 @@ func NewDefaultConfig() *Entity { }, }, Stat: StatConfig{ + GlobalStatisticSampleCountTotal: base.DefaultSampleCountTotal, + GlobalStatisticIntervalMsTotal: base.DefaultIntervalMsTotal, + MetricStatisticSampleCount: base.DefaultSampleCount, + MetricStatisticIntervalMs: base.DefaultIntervalMs, System: SystemStatConfig{ CollectIntervalMs: DefaultSystemStatCollectIntervalMs, }, @@ -117,6 +131,10 @@ func checkConfValid(conf *SentinelConfig) error { if mc.SingleFileMaxSize <= 0 { return errors.New("Illegal metric log globalCfg: singleFileMaxSize <= 0") } + if err := base.CheckValidityForReuseStatistic(conf.Stat.MetricStatisticSampleCount, conf.Stat.MetricStatisticIntervalMs, + conf.Stat.GlobalStatisticSampleCountTotal, conf.Stat.GlobalStatisticIntervalMsTotal); err != nil { + return err + } return nil } @@ -168,3 +186,18 @@ func (entity *Entity) SystemStatCollectIntervalMs() uint32 { func (entity *Entity) UseCacheTime() bool { return entity.Sentinel.UseCacheTime } + +func (entity *Entity) GlobalStatisticIntervalMsTotal() uint32 { + return entity.Sentinel.Stat.GlobalStatisticIntervalMsTotal +} + +func (entity *Entity) GlobalStatisticSampleCountTotal() uint32 { + return entity.Sentinel.Stat.GlobalStatisticSampleCountTotal +} + +func (entity *Entity) MetricStatisticIntervalMs() uint32 { + return entity.Sentinel.Stat.MetricStatisticIntervalMs +} +func (entity *Entity) MetricStatisticSampleCount() uint32 { + return entity.Sentinel.Stat.MetricStatisticSampleCount +} diff --git a/core/flow/rule.go b/core/flow/rule.go index a3ce62efe..524637e6d 100644 --- a/core/flow/rule.go +++ b/core/flow/rule.go @@ -70,12 +70,44 @@ type Rule struct { Resource string `json:"resource"` TokenCalculateStrategy TokenCalculateStrategy `json:"tokenCalculateStrategy"` ControlBehavior ControlBehavior `json:"controlBehavior"` - Threshold float64 `json:"threshold"` - RelationStrategy RelationStrategy `json:"relationStrategy"` - RefResource string `json:"refResource"` - MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"` - WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"` - WarmUpColdFactor uint32 `json:"warmUpColdFactor"` + // Threshold means the threshold during StatIntervalInMs + // If StatIntervalInMs is 1000(1 second), Threshold means QPS + Threshold float64 `json:"threshold"` + RelationStrategy RelationStrategy `json:"relationStrategy"` + RefResource string `json:"refResource"` + MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"` + WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"` + WarmUpColdFactor uint32 `json:"warmUpColdFactor"` + // StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule. + // If user doesn't set StatIntervalInMs, that means using default metric statistic of resource. + // If the StatIntervalInMs user specifies can not reuse the global statistic of resource, + // sentinel will generate independent statistic structure for this rule. + StatIntervalInMs uint32 `json:"statIntervalInMs"` +} + +func (r *Rule) isEqualsTo(newRule *Rule) bool { + if newRule == nil { + return false + } + if !(r.Resource == newRule.Resource && r.RelationStrategy == newRule.RelationStrategy && + r.RefResource == newRule.RefResource && r.StatIntervalInMs == newRule.StatIntervalInMs && + r.TokenCalculateStrategy == newRule.TokenCalculateStrategy && r.ControlBehavior == newRule.ControlBehavior && r.Threshold == newRule.Threshold && + r.MaxQueueingTimeMs == newRule.MaxQueueingTimeMs && r.WarmUpPeriodSec == newRule.WarmUpPeriodSec && r.WarmUpColdFactor == newRule.WarmUpColdFactor) { + return false + } + return true +} + +func (r *Rule) isStatReusable(newRule *Rule) bool { + if newRule == nil { + return false + } + return r.Resource == newRule.Resource && r.RelationStrategy == newRule.RelationStrategy && + r.RefResource == newRule.RefResource && r.StatIntervalInMs == newRule.StatIntervalInMs +} + +func (r *Rule) needStatistic() bool { + return !(r.TokenCalculateStrategy == Direct && r.ControlBehavior == Throttling) } func (r *Rule) String() string { @@ -83,9 +115,9 @@ func (r *Rule) String() string { if err != nil { // Return the fallback string return fmt.Sprintf("Rule{Resource=%s, TokenCalculateStrategy=%s, ControlBehavior=%s, "+ - "Threshold=%.2f, RelationStrategy=%s, RefResource=%s, MaxQueueingTimeMs=%d, WarmUpPeriodSec=%d, WarmUpColdFactor=%d}", - r.Resource, r.TokenCalculateStrategy, r.ControlBehavior, r.Threshold, r.RelationStrategy, - r.RefResource, r.MaxQueueingTimeMs, r.WarmUpPeriodSec, r.WarmUpColdFactor) + "Threshold=%.2f, RelationStrategy=%s, RefResource=%s, MaxQueueingTimeMs=%d, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, StatIntervalInMs=%d}", + r.Resource, r.TokenCalculateStrategy, r.ControlBehavior, r.Threshold, r.RelationStrategy, r.RefResource, + r.MaxQueueingTimeMs, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.StatIntervalInMs) } return string(b) } diff --git a/core/flow/rule_manager.go b/core/flow/rule_manager.go index 37f3fe5de..e90d2c51f 100644 --- a/core/flow/rule_manager.go +++ b/core/flow/rule_manager.go @@ -4,13 +4,17 @@ import ( "fmt" "sync" + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/config" + "github.com/alibaba/sentinel-golang/core/stat" + sbase "github.com/alibaba/sentinel-golang/core/stat/base" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) // TrafficControllerGenFunc represents the TrafficShapingController generator function of a specific control behavior. -type TrafficControllerGenFunc func(*Rule) *TrafficShapingController +type TrafficControllerGenFunc func(*Rule, *standaloneStatistic) (*TrafficShapingController, error) type trafficControllerGenKey struct { tokenCalculateStrategy TokenCalculateStrategy @@ -31,26 +35,78 @@ func init() { tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: Direct, controlBehavior: Reject, - }] = func(rule *Rule) *TrafficShapingController { - return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Threshold), NewDefaultTrafficShapingChecker(rule), rule) + }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { + if boundStat == nil { + var err error + boundStat, err = generateStatFor(rule) + if err != nil { + return nil, err + } + } + tsc, err := NewTrafficShapingController(rule, boundStat) + if err != nil || tsc == nil { + return nil, err + } + tsc.flowCalculator = NewDirectTrafficShapingCalculator(tsc, rule.Threshold) + tsc.flowChecker = NewRejectTrafficShapingChecker(tsc, rule) + return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: Direct, controlBehavior: Throttling, - }] = func(rule *Rule) *TrafficShapingController { - return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Threshold), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) + }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { + if boundStat == nil { + var err error + boundStat, err = generateStatFor(rule) + if err != nil { + return nil, err + } + } + tsc, err := NewTrafficShapingController(rule, boundStat) + if err != nil || tsc == nil { + return nil, err + } + tsc.flowCalculator = NewDirectTrafficShapingCalculator(tsc, rule.Threshold) + tsc.flowChecker = NewThrottlingChecker(tsc, rule.MaxQueueingTimeMs) + return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: WarmUp, controlBehavior: Reject, - }] = func(rule *Rule) *TrafficShapingController { - return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewDefaultTrafficShapingChecker(rule), rule) + }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { + if boundStat == nil { + var err error + boundStat, err = generateStatFor(rule) + if err != nil { + return nil, err + } + } + tsc, err := NewTrafficShapingController(rule, boundStat) + if err != nil || tsc == nil { + return nil, err + } + tsc.flowCalculator = NewWarmUpTrafficShapingCalculator(tsc, rule) + tsc.flowChecker = NewRejectTrafficShapingChecker(tsc, rule) + return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: WarmUp, controlBehavior: Throttling, - }] = func(rule *Rule) *TrafficShapingController { - return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule) + }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { + if boundStat == nil { + var err error + boundStat, err = generateStatFor(rule) + if err != nil { + return nil, err + } + } + tsc, err := NewTrafficShapingController(rule, boundStat) + if err != nil || tsc == nil { + return nil, err + } + tsc.flowCalculator = NewWarmUpTrafficShapingCalculator(tsc, rule) + tsc.flowChecker = NewThrottlingChecker(tsc, rule.MaxQueueingTimeMs) + return tsc, nil } } @@ -74,8 +130,19 @@ func onRuleUpdate(rules []*Rule) (err error) { } }() - m := buildFlowMap(rules) - + resRulesMap := make(map[string][]*Rule) + for _, rule := range rules { + if err := IsValidRule(rule); err != nil { + logging.Warn("ignoring invalid flow rule", "rule", rule, "reason", err) + continue + } + resRules, exist := resRulesMap[rule.Resource] + if !exist { + resRules = make([]*Rule, 0, 1) + } + resRulesMap[rule.Resource] = append(resRules, rule) + } + m := make(TrafficControllerMap, len(resRulesMap)) start := util.CurrentTimeNano() tcMux.Lock() defer func() { @@ -86,7 +153,9 @@ func onRuleUpdate(rules []*Rule) (err error) { logging.Debug("time statistic(ns) for updating flow rule", "timeCost", util.CurrentTimeNano()-start) logRuleUpdate(m) }() - + for res, rulesOfRes := range resRulesMap { + m[res] = buildRulesOfRes(res, rulesOfRes) + } tcMap = m return nil } @@ -119,7 +188,7 @@ func getRulesOfResource(res string) []*Rule { } ret := make([]*Rule, 0, len(resTcs)) for _, tc := range resTcs { - ret = append(ret, tc.Rule()) + ret = append(ret, tc.BoundRule()) } return ret } @@ -162,14 +231,74 @@ func rulesFrom(m TrafficControllerMap) []*Rule { continue } for _, r := range rs { - if r != nil && r.Rule() != nil { - rules = append(rules, r.Rule()) + if r != nil && r.BoundRule() != nil { + rules = append(rules, r.BoundRule()) } } } return rules } +func generateStatFor(rule *Rule) (*standaloneStatistic, error) { + intervalInMs := rule.StatIntervalInMs + + var retStat standaloneStatistic + + var resNode *stat.ResourceNode + if rule.RelationStrategy == AssociatedResource { + // use associated statistic + resNode = stat.GetOrCreateResourceNode(rule.RefResource, base.ResTypeCommon) + } else { + resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon) + } + if intervalInMs == 0 || intervalInMs == config.MetricStatisticIntervalMs() { + // default case, use the resource's default statistic + readStat := resNode.DefaultMetric() + retStat.reuseResourceStat = true + retStat.readOnlyMetric = readStat + retStat.writeOnlyMetric = nil + return &retStat, nil + } + + sampleCount := uint32(0) + //calculate the sample count + if intervalInMs > config.GlobalStatisticIntervalMsTotal() { + sampleCount = 1 + } else if intervalInMs < config.GlobalStatisticBucketLengthInMs() { + sampleCount = 1 + } else { + if intervalInMs%config.GlobalStatisticBucketLengthInMs() == 0 { + sampleCount = intervalInMs / config.GlobalStatisticBucketLengthInMs() + } else { + sampleCount = 1 + } + } + err := base.CheckValidityForReuseStatistic(sampleCount, intervalInMs, config.GlobalStatisticSampleCountTotal(), config.GlobalStatisticIntervalMsTotal()) + if err == nil { + // global statistic reusable + readStat, e := resNode.GenerateReadStat(sampleCount, intervalInMs) + if e != nil { + return nil, e + } + retStat.reuseResourceStat = true + retStat.readOnlyMetric = readStat + retStat.writeOnlyMetric = nil + return &retStat, nil + } else if err == base.GlobalStatisticNonReusableError { + logging.Info("flow rule couldn't reuse global statistic and will generate independent statistic", "rule", rule) + retStat.reuseResourceStat = false + realLeapArray := sbase.NewBucketLeapArray(sampleCount, intervalInMs) + metricStat, e := sbase.NewSlidingWindowMetric(sampleCount, intervalInMs, realLeapArray) + if e != nil { + return nil, errors.Errorf("fail to generate statistic for warm up rule: %+v, err: %+v", rule, e) + } + retStat.readOnlyMetric = metricStat + retStat.writeOnlyMetric = realLeapArray + return &retStat, nil + } + return nil, errors.Wrapf(err, "fail to new standalone statistic because of invalid StatIntervalInMs in flow.Rule, StatIntervalInMs: %d", intervalInMs) +} + // SetTrafficShapingGenerator sets the traffic controller generator for the given TokenCalculateStrategy and ControlBehavior. // Note that modifying the generator of default control strategy is not allowed. func SetTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy, controlBehavior ControlBehavior, generator TrafficControllerGenFunc) error { @@ -217,40 +346,85 @@ func getTrafficControllerListFor(name string) []*TrafficShapingController { return tcMap[name] } -// NotThreadSafe (should be guarded by the lock) -func buildFlowMap(rules []*Rule) TrafficControllerMap { - m := make(TrafficControllerMap) - if len(rules) == 0 { - return m +func calculateReuseIndexFor(r *Rule, oldResCbs []*TrafficShapingController) (equalIdx, reuseStatIdx int) { + // the index of equivalent rule in old circuit breaker slice + equalIdx = -1 + // the index of statistic reusable rule in old circuit breaker slice + reuseStatIdx = -1 + + for idx, oldTc := range oldResCbs { + oldRule := oldTc.BoundRule() + if oldRule.isEqualsTo(r) { + // break if there is equivalent rule + equalIdx = idx + break + } + // search the index of first stat reusable rule + if !oldRule.isStatReusable(r) { + continue + } + if reuseStatIdx >= 0 { + // had find reuse rule. + continue + } + reuseStatIdx = idx } + return equalIdx, reuseStatIdx +} - for _, rule := range rules { - if err := IsValidRule(rule); err != nil { - logging.Warn("Ignoring invalid flow rule", "rule", rule, "err", err) +// buildRulesOfRes builds TrafficShapingController slice from rules. the resource of rules must be equals to res +func buildRulesOfRes(res string, rulesOfRes []*Rule) []*TrafficShapingController { + newTcsOfRes := make([]*TrafficShapingController, 0, len(rulesOfRes)) + emptyTcs := make([]*TrafficShapingController, 0, 0) + for _, rule := range rulesOfRes { + if res != rule.Resource { + logging.Error(errors.Errorf("unmatched resource name, expect: %s, actual: %s", res, rule.Resource), "FlowManager: unmatched resource name ", "rule", rule) continue } + oldResTcs, exist := tcMap[res] + if !exist { + oldResTcs = emptyTcs + } + + equalIdx, reuseStatIdx := calculateReuseIndexFor(rule, oldResTcs) + + // First check equals scenario + if equalIdx >= 0 { + // reuse the old cb + equalOldTc := oldResTcs[equalIdx] + newTcsOfRes = append(newTcsOfRes, equalOldTc) + // remove old cb from oldResCbs + tcMap[res] = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...) + continue + } + generator, supported := tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: rule.TokenCalculateStrategy, controlBehavior: rule.ControlBehavior, }] - if !supported { - logging.Warn("Ignoring the rule due to unsupported control behavior", "rule", rule) + if !supported || generator == nil { + logging.Error(errors.New("unsupported flow control strategy"), "ignoring the rule due to unsupported control behavior", "rule", rule) continue } - tsc := generator(rule) - if tsc == nil { - logging.Warn("Ignoring the rule due to bad generated traffic controller.", "rule", rule) - continue + var tc *TrafficShapingController + var e error + if reuseStatIdx >= 0 { + tc, e = generator(rule, &(oldResTcs[reuseStatIdx].boundStat)) + } else { + tc, e = generator(rule, nil) } - rulesOfRes, exists := m[rule.Resource] - if !exists { - m[rule.Resource] = []*TrafficShapingController{tsc} - } else { - m[rule.Resource] = append(rulesOfRes, tsc) + if tc == nil || e != nil { + logging.Error(errors.New("bad generated traffic controller"), "ignoring the rule due to bad generated traffic controller.", "rule", rule) + continue + } + if reuseStatIdx >= 0 { + // remove old cb from oldResCbs + tcMap[res] = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...) } + newTcsOfRes = append(newTcsOfRes, tc) } - return m + return newTcsOfRes } // IsValidRule checks whether the given Rule is valid. @@ -264,24 +438,31 @@ func IsValidRule(rule *Rule) error { if rule.Threshold < 0 { return errors.New("negative threshold") } - if rule.RelationStrategy < 0 { - return errors.New("invalid relation strategy") + if int32(rule.TokenCalculateStrategy) < 0 { + return errors.New("invalid token calculate strategy") } - if rule.TokenCalculateStrategy < 0 || rule.ControlBehavior < 0 { - return errors.New("invalid control strategy") + if int32(rule.ControlBehavior) < 0 { + return errors.New("invalid control behavior") + } + if !(rule.RelationStrategy >= CurrentResource && rule.RelationStrategy <= AssociatedResource) { + return errors.New("invalid relation strategy") } - if rule.RelationStrategy == AssociatedResource && rule.RefResource == "" { - return errors.New("Bad flow rule: invalid control behavior") + return errors.New("Bad flow rule: invalid relation strategy") } - if rule.TokenCalculateStrategy == WarmUp { if rule.WarmUpPeriodSec <= 0 { - return errors.New("invalid warmUpPeriodSec") + return errors.New("invalid WarmUpPeriodSec") } if rule.WarmUpColdFactor == 1 { return errors.New("WarmUpColdFactor must be great than 1") } } + if rule.ControlBehavior == Throttling && rule.MaxQueueingTimeMs == 0 { + return errors.New("invalid MaxQueueingTimeMs") + } + if rule.StatIntervalInMs > config.GlobalStatisticIntervalMsTotal()*60 { + return errors.New("StatIntervalInMs must be less than 10 minutes") + } return nil } diff --git a/core/flow/rule_manager_test.go b/core/flow/rule_manager_test.go index 4768f8e25..1910bda0f 100644 --- a/core/flow/rule_manager_test.go +++ b/core/flow/rule_manager_test.go @@ -4,21 +4,23 @@ import ( "reflect" "testing" + "github.com/alibaba/sentinel-golang/core/stat" + sbase "github.com/alibaba/sentinel-golang/core/stat/base" "github.com/stretchr/testify/assert" ) func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) { tsc := &TrafficShapingController{} - err := SetTrafficShapingGenerator(Direct, Reject, func(_ *Rule) *TrafficShapingController { - return tsc + err := SetTrafficShapingGenerator(Direct, Reject, func(_ *Rule, _ *standaloneStatistic) (*TrafficShapingController, error) { + return tsc, nil }) assert.Error(t, err, "default control behaviors are not allowed to be modified") err = RemoveTrafficShapingGenerator(Direct, Reject) assert.Error(t, err, "default control behaviors are not allowed to be removed") - err = SetTrafficShapingGenerator(TokenCalculateStrategy(111), ControlBehavior(112), func(_ *Rule) *TrafficShapingController { - return tsc + err = SetTrafficShapingGenerator(TokenCalculateStrategy(111), ControlBehavior(112), func(_ *Rule, _ *standaloneStatistic) (*TrafficShapingController, error) { + return tsc, nil }) assert.NoError(t, err) @@ -52,12 +54,14 @@ func TestIsValidFlowRule(t *testing.T) { badRule1 := &Rule{Threshold: 1, Resource: ""} badRule2 := &Rule{Threshold: -1.9, Resource: "test"} badRule3 := &Rule{Threshold: 5, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Reject} - goodRule1 := &Rule{Threshold: 10, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling, WarmUpPeriodSec: 10} + goodRule1 := &Rule{Threshold: 10, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling, WarmUpPeriodSec: 10, MaxQueueingTimeMs: 10} + badRule4 := &Rule{Threshold: 5, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Reject, StatIntervalInMs: 6000000} assert.Error(t, IsValidRule(badRule1)) assert.Error(t, IsValidRule(badRule2)) assert.Error(t, IsValidRule(badRule3)) assert.NoError(t, IsValidRule(goodRule1)) + assert.Error(t, IsValidRule(badRule4)) } func TestGetRules(t *testing.T) { @@ -83,7 +87,7 @@ func TestGetRules(t *testing.T) { ControlBehavior: Throttling, RefResource: "", WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + MaxQueueingTimeMs: 10, } if _, err := LoadRules([]*Rule{r1, r2}); err != nil { t.Fatal(err) @@ -125,7 +129,7 @@ func TestGetRules(t *testing.T) { ControlBehavior: Throttling, RefResource: "", WarmUpPeriodSec: 0, - MaxQueueingTimeMs: 0, + MaxQueueingTimeMs: 10, } if _, err := LoadRules([]*Rule{r1, r2}); err != nil { t.Fatal(err) @@ -147,3 +151,225 @@ func TestGetRules(t *testing.T) { } }) } + +func Test_generateStatFor(t *testing.T) { + t.Run("generateStatFor_reuse_default_metric_stat", func(t *testing.T) { + r1 := &Rule{ + Resource: "abc", + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + StatIntervalInMs: 0, + Threshold: 100, + RelationStrategy: CurrentResource, + } + // global: 10000ms, 20 sample, bucketLen: 500ms + // metric: 1000ms, 2 sample, bucketLen: 500ms + boundStat, err := generateStatFor(r1) + if err != nil { + t.Fatal(err) + } + assert.True(t, boundStat.reuseResourceStat && boundStat.writeOnlyMetric == nil) + ps, succ := boundStat.readOnlyMetric.(*sbase.SlidingWindowMetric) + assert.True(t, succ) + + resNode := stat.GetResourceNode("abc") + assert.True(t, reflect.DeepEqual(ps, resNode.DefaultMetric())) + }) + + t.Run("generateStatFor_reuse_global_stat", func(t *testing.T) { + // global: 10000ms, 20 sample, bucketLen: 500ms + // metric: 1000ms, 2 sample, bucketLen: 500ms + r1 := &Rule{ + Resource: "abc", + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + StatIntervalInMs: 5000, + Threshold: 100, + RelationStrategy: CurrentResource, + RefResource: "", + } + // global: 10000ms, 20 sample, bucketLen: 500ms + // metric: 1000ms, 2 sample, bucketLen: 500ms + boundStat, err := generateStatFor(r1) + if err != nil { + t.Fatal(err) + } + assert.True(t, boundStat.reuseResourceStat && boundStat.writeOnlyMetric == nil) + ps, succ := boundStat.readOnlyMetric.(*sbase.SlidingWindowMetric) + assert.True(t, succ) + resNode := stat.GetResourceNode("abc") + assert.True(t, !reflect.DeepEqual(ps, resNode.DefaultMetric())) + }) + + t.Run("generateStatFor_standalone_stat", func(t *testing.T) { + // global: 10000ms, 20 sample, bucketLen: 500ms + // metric: 1000ms, 2 sample, bucketLen: 500ms + r1 := &Rule{ + Resource: "abc", + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + StatIntervalInMs: 50000, + Threshold: 100, + RelationStrategy: CurrentResource, + } + // global: 10000ms, 20 sample, bucketLen: 500ms + // metric: 1000ms, 2 sample, bucketLen: 500ms + boundStat, err := generateStatFor(r1) + if err != nil { + t.Fatal(err) + } + assert.True(t, boundStat.reuseResourceStat == false && boundStat.writeOnlyMetric != nil) + }) +} + +func Test_buildRulesOfRes(t *testing.T) { + t.Run("Test_buildRulesOfRes_no_reuse_stat", func(t *testing.T) { + r1 := &Rule{ + Resource: "abc1", + Threshold: 100, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + } + r2 := &Rule{ + Resource: "abc1", + Threshold: 200, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + MaxQueueingTimeMs: 10, + } + assert.True(t, len(tcMap["abc1"]) == 0) + tcs := buildRulesOfRes("abc1", []*Rule{r1, r2}) + assert.True(t, len(tcs) == 2) + assert.True(t, tcs[0].BoundRule() == r1) + assert.True(t, tcs[1].BoundRule() == r2) + assert.True(t, reflect.DeepEqual(tcs[0].BoundRule(), r1)) + assert.True(t, reflect.DeepEqual(tcs[1].BoundRule(), r2)) + }) + + t.Run("Test_buildRulesOfRes_reuse_stat", func(t *testing.T) { + // reuse + r1 := &Rule{ + Resource: "abc1", + Threshold: 100, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + StatIntervalInMs: 1000, + } + // reuse + r2 := &Rule{ + Resource: "abc1", + Threshold: 200, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + MaxQueueingTimeMs: 10, + StatIntervalInMs: 2000, + } + // reuse + r3 := &Rule{ + Resource: "abc1", + Threshold: 300, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + MaxQueueingTimeMs: 10, + StatIntervalInMs: 5000, + } + // independent statistic + r4 := &Rule{ + Resource: "abc1", + Threshold: 400, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + MaxQueueingTimeMs: 10, + StatIntervalInMs: 50000, + } + + s1, err := generateStatFor(r1) + if err != nil { + t.Fatal(err) + } + fakeTc1 := &TrafficShapingController{flowCalculator: nil, flowChecker: nil, rule: r1, boundStat: *s1} + s2, err := generateStatFor(r2) + if err != nil { + t.Fatal(err) + } + fakeTc2 := &TrafficShapingController{flowCalculator: nil, flowChecker: nil, rule: r2, boundStat: *s2} + s3, err := generateStatFor(r3) + if err != nil { + t.Fatal(err) + } + fakeTc3 := &TrafficShapingController{flowCalculator: nil, flowChecker: nil, rule: r3, boundStat: *s3} + s4, err := generateStatFor(r4) + if err != nil { + t.Fatal(err) + } + fakeTc4 := &TrafficShapingController{flowCalculator: nil, flowChecker: nil, rule: r4, boundStat: *s4} + tcMap["abc1"] = []*TrafficShapingController{fakeTc1, fakeTc2, fakeTc3, fakeTc4} + assert.True(t, len(tcMap["abc1"]) == 4) + stat1 := tcMap["abc1"][0].boundStat + stat2 := tcMap["abc1"][1].boundStat + oldTc3 := tcMap["abc1"][2] + assert.True(t, tcMap["abc1"][3].boundStat.writeOnlyMetric != nil) + assert.True(t, !tcMap["abc1"][3].boundStat.reuseResourceStat) + stat4 := tcMap["abc1"][3].boundStat + + // reuse stat + r12 := &Rule{ + Resource: "abc1", + Threshold: 300, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + StatIntervalInMs: 1000, + } + // not reusable, generate from resource's global statistic + r22 := &Rule{ + Resource: "abc1", + Threshold: 400, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Throttling, + MaxQueueingTimeMs: 10, + StatIntervalInMs: 10000, + } + // equals + r32 := &Rule{ + Resource: "abc1", + Threshold: 300, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + MaxQueueingTimeMs: 10, + StatIntervalInMs: 5000, + } + // reuse independent stat + r42 := &Rule{ + Resource: "abc1", + Threshold: 4000, + RelationStrategy: CurrentResource, + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + MaxQueueingTimeMs: 10, + StatIntervalInMs: 50000, + } + tcs := buildRulesOfRes("abc1", []*Rule{r12, r22, r32, r42}) + assert.True(t, len(tcs) == 4) + assert.True(t, tcs[0].BoundRule() == r12) + assert.True(t, tcs[1].BoundRule() == r22) + assert.True(t, tcs[2].BoundRule() == r3) + assert.True(t, tcs[3].BoundRule() == r42) + assert.True(t, reflect.DeepEqual(tcs[0].BoundRule(), r12)) + assert.True(t, reflect.DeepEqual(tcs[1].BoundRule(), r22)) + assert.True(t, reflect.DeepEqual(tcs[2].BoundRule(), r32) && reflect.DeepEqual(tcs[2].BoundRule(), r3)) + assert.True(t, reflect.DeepEqual(tcs[3].BoundRule(), r42)) + assert.True(t, tcs[0].boundStat == stat1) + assert.True(t, tcs[1].boundStat != stat2) + assert.True(t, tcs[2] == oldTc3) + assert.True(t, tcs[3].boundStat == stat4) + }) +} diff --git a/core/flow/slot.go b/core/flow/slot.go index 1352cabf5..5776c0314 100644 --- a/core/flow/slot.go +++ b/core/flow/slot.go @@ -57,13 +57,13 @@ func selectNodeByRelStrategy(rule *Rule, node base.StatNode) base.StatNode { return node } -func checkInLocal(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult { - actual := selectNodeByRelStrategy(tc.rule, node) +func checkInLocal(tc *TrafficShapingController, resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult { + actual := selectNodeByRelStrategy(tc.rule, resStat) if actual == nil { logging.FrequentErrorOnce.Do(func() { logging.Error(errors.Errorf("nil resource node"), "no resource node for flow rule", "rule", tc.rule) }) return base.NewTokenResultPass() } - return tc.PerformChecking(node, acquireCount, flag) + return tc.PerformChecking(actual, acquireCount, flag) } diff --git a/core/flow/slot_test.go b/core/flow/slot_test.go new file mode 100644 index 000000000..fda898159 --- /dev/null +++ b/core/flow/slot_test.go @@ -0,0 +1,54 @@ +package flow + +import ( + "testing" + + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/stat" + "github.com/alibaba/sentinel-golang/logging" + "github.com/stretchr/testify/assert" +) + +func Test_FlowSlot_StandaloneStat(t *testing.T) { + slot := &Slot{} + statSLot := &StandaloneStatSlot{} + res := base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound) + resNode := stat.GetOrCreateResourceNode("abc", base.ResTypeCommon) + ctx := &base.EntryContext{ + Resource: res, + StatNode: resNode, + Input: &base.SentinelInput{ + AcquireCount: 1, + }, + RuleCheckResult: nil, + Data: nil, + } + + slot.Check(ctx) + + r1 := &Rule{ + Resource: "abc", + TokenCalculateStrategy: Direct, + ControlBehavior: Reject, + // Use standalone statistic, using single-bucket-sliding-windows + StatIntervalInMs: 20000, + Threshold: 100, + RelationStrategy: CurrentResource, + } + _, e := LoadRules([]*Rule{r1}) + if e != nil { + logging.Error(e, "") + t.Fail() + return + } + + for i := 0; i < 50; i++ { + ret := slot.Check(ctx) + if ret != nil { + t.Fail() + return + } + statSLot.OnEntryPassed(ctx) + } + assert.True(t, getTrafficControllerListFor("abc")[0].boundStat.readOnlyMetric.GetSum(base.MetricEventPass) == 50) +} diff --git a/core/flow/standalone_stat_slot.go b/core/flow/standalone_stat_slot.go new file mode 100644 index 000000000..a69f5d31a --- /dev/null +++ b/core/flow/standalone_stat_slot.go @@ -0,0 +1,31 @@ +package flow + +import ( + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/logging" + "github.com/pkg/errors" +) + +type StandaloneStatSlot struct { +} + +func (s StandaloneStatSlot) OnEntryPassed(ctx *base.EntryContext) { + res := ctx.Resource.Name() + for _, tc := range getTrafficControllerListFor(res) { + if !tc.boundStat.reuseResourceStat { + if tc.boundStat.writeOnlyMetric != nil { + tc.boundStat.writeOnlyMetric.AddCount(base.MetricEventPass, int64(ctx.Input.AcquireCount)) + } else { + logging.Error(errors.New("nil independent write statistic"), "flow module: nil statistic for traffic control", "rule", tc.rule) + } + } + } +} + +func (s StandaloneStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) { + // Do nothing +} + +func (s StandaloneStatSlot) OnCompleted(ctx *base.EntryContext) { + // Do nothing +} diff --git a/core/flow/tc_default.go b/core/flow/tc_default.go index c11c1c21b..9e120961f 100644 --- a/core/flow/tc_default.go +++ b/core/flow/tc_default.go @@ -5,30 +5,47 @@ import ( ) type DirectTrafficShapingCalculator struct { + owner *TrafficShapingController threshold float64 } -func NewDirectTrafficShapingCalculator(threshold float64) *DirectTrafficShapingCalculator { - return &DirectTrafficShapingCalculator{threshold: threshold} +func NewDirectTrafficShapingCalculator(owner *TrafficShapingController, threshold float64) *DirectTrafficShapingCalculator { + return &DirectTrafficShapingCalculator{ + owner: owner, + threshold: threshold, + } } -func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(base.StatNode, uint32, int32) float64 { +func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(uint32, int32) float64 { return d.threshold } -type DefaultTrafficShapingChecker struct { - rule *Rule +func (d *DirectTrafficShapingCalculator) BoundOwner() *TrafficShapingController { + return d.owner +} + +type RejectTrafficShapingChecker struct { + owner *TrafficShapingController + rule *Rule +} + +func NewRejectTrafficShapingChecker(owner *TrafficShapingController, rule *Rule) *RejectTrafficShapingChecker { + return &RejectTrafficShapingChecker{ + owner: owner, + rule: rule, + } } -func NewDefaultTrafficShapingChecker(rule *Rule) *DefaultTrafficShapingChecker { - return &DefaultTrafficShapingChecker{rule: rule} +func (d *RejectTrafficShapingChecker) BoundOwner() *TrafficShapingController { + return d.owner } -func (d *DefaultTrafficShapingChecker) DoCheck(node base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult { - if node == nil { +func (d *RejectTrafficShapingChecker) DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult { + metricReadonlyStat := d.BoundOwner().boundStat.readOnlyMetric + if metricReadonlyStat == nil { return nil } - curCount := node.GetQPS(base.MetricEventPass) + curCount := float64(metricReadonlyStat.GetSum(base.MetricEventPass)) if curCount+float64(acquireCount) > threshold { return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount) } diff --git a/core/flow/tc_throttling.go b/core/flow/tc_throttling.go index f6d0fea05..ebbb03abc 100644 --- a/core/flow/tc_throttling.go +++ b/core/flow/tc_throttling.go @@ -13,20 +13,21 @@ const nanoUnitOffset = time.Second / time.Nanosecond // ThrottlingChecker limits the time interval between two requests. type ThrottlingChecker struct { + owner *TrafficShapingController maxQueueingTimeNs uint64 - - lastPassedTime uint64 - - // TODO: support strict mode - strict bool + lastPassedTime uint64 } -func NewThrottlingChecker(timeoutMs uint32) *ThrottlingChecker { +func NewThrottlingChecker(owner *TrafficShapingController, timeoutMs uint32) *ThrottlingChecker { return &ThrottlingChecker{ + owner: owner, maxQueueingTimeNs: uint64(timeoutMs) * util.UnixTimeUnitOffset, lastPassedTime: 0, } } +func (c *ThrottlingChecker) BoundOwner() *TrafficShapingController { + return c.owner +} func (c *ThrottlingChecker) DoCheck(_ base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult { // Pass when acquire count is less or equal than 0. diff --git a/core/flow/tc_throttling_test.go b/core/flow/tc_throttling_test.go index 6481cc253..75676de2b 100644 --- a/core/flow/tc_throttling_test.go +++ b/core/flow/tc_throttling_test.go @@ -11,7 +11,7 @@ import ( ) func TestThrottlingChecker_DoCheckNoQueueingSingleThread(t *testing.T) { - tc := NewThrottlingChecker(0) + tc := NewThrottlingChecker(nil, 0) var qps float64 = 5 // The first request will pass. @@ -28,7 +28,7 @@ func TestThrottlingChecker_DoCheckNoQueueingSingleThread(t *testing.T) { } func TestThrottlingChecker_DoCheckSingleThread(t *testing.T) { - tc := NewThrottlingChecker(1000) + tc := NewThrottlingChecker(nil, 1000) var qps float64 = 5 resultList := make([]*base.TokenResult, 0) for i := 0; i < 10; i++ { @@ -48,7 +48,7 @@ func TestThrottlingChecker_DoCheckSingleThread(t *testing.T) { } func TestThrottlingChecker_DoCheckQueueingParallel(t *testing.T) { - tc := NewThrottlingChecker(1000) + tc := NewThrottlingChecker(nil, 1000) var qps float64 = 5 assert.True(t, tc.DoCheck(nil, 1, qps) == nil) diff --git a/core/flow/tc_warm_up.go b/core/flow/tc_warm_up.go index 110264f21..e15a3f4cf 100644 --- a/core/flow/tc_warm_up.go +++ b/core/flow/tc_warm_up.go @@ -4,14 +4,14 @@ import ( "math" "sync/atomic" - "github.com/alibaba/sentinel-golang/logging" - "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/config" + "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" ) type WarmUpTrafficShapingCalculator struct { + owner *TrafficShapingController threshold float64 warmUpPeriodInSec uint32 coldFactor uint32 @@ -22,7 +22,11 @@ type WarmUpTrafficShapingCalculator struct { lastFilledTime uint64 } -func NewWarmUpTrafficShapingCalculator(rule *Rule) *WarmUpTrafficShapingCalculator { +func (c *WarmUpTrafficShapingCalculator) BoundOwner() *TrafficShapingController { + return c.owner +} + +func NewWarmUpTrafficShapingCalculator(owner *TrafficShapingController, rule *Rule) TrafficShapingCalculator { if rule.WarmUpColdFactor <= 1 { rule.WarmUpColdFactor = config.DefaultWarmUpColdFactor logging.Warn("[NewWarmUpTrafficShapingCalculator] No set WarmUpColdFactor,use default warm up cold factor value", "defaultWarmUpColdFactor", config.DefaultWarmUpColdFactor) @@ -35,6 +39,7 @@ func NewWarmUpTrafficShapingCalculator(rule *Rule) *WarmUpTrafficShapingCalculat slope := float64(rule.WarmUpColdFactor-1.0) / rule.Threshold / float64(maxToken-warningToken) warmUpTrafficShapingCalculator := &WarmUpTrafficShapingCalculator{ + owner: owner, warmUpPeriodInSec: rule.WarmUpPeriodSec, coldFactor: rule.WarmUpColdFactor, warningToken: warningToken, @@ -48,8 +53,9 @@ func NewWarmUpTrafficShapingCalculator(rule *Rule) *WarmUpTrafficShapingCalculat return warmUpTrafficShapingCalculator } -func (c *WarmUpTrafficShapingCalculator) CalculateAllowedTokens(node base.StatNode, acquireCount uint32, flag int32) float64 { - previousQps := node.GetPreviousQPS(base.MetricEventPass) +func (c *WarmUpTrafficShapingCalculator) CalculateAllowedTokens(_ uint32, _ int32) float64 { + metricReadonlyStat := c.BoundOwner().boundStat.readOnlyMetric + previousQps := metricReadonlyStat.GetPreviousQPS(base.MetricEventPass) c.syncToken(previousQps) restToken := atomic.LoadInt64(&c.storedTokens) diff --git a/core/flow/traffic_shaping.go b/core/flow/traffic_shaping.go index 52b06df4e..0ece11fb8 100644 --- a/core/flow/traffic_shaping.go +++ b/core/flow/traffic_shaping.go @@ -7,13 +7,29 @@ import ( // TrafficShapingCalculator calculates the actual traffic shaping threshold // based on the threshold of rule and the traffic shaping strategy. type TrafficShapingCalculator interface { - CalculateAllowedTokens(node base.StatNode, acquireCount uint32, flag int32) float64 + BoundOwner() *TrafficShapingController + CalculateAllowedTokens(acquireCount uint32, flag int32) float64 } // TrafficShapingChecker performs checking according to current metrics and the traffic // shaping strategy, then yield the token result. type TrafficShapingChecker interface { - DoCheck(node base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult + BoundOwner() *TrafficShapingController + DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult +} + +// standaloneStatistic indicates the independent statistic for each TrafficShapingController +type standaloneStatistic struct { + // reuseResourceStat indicates whether current standaloneStatistic reuse the current resource's global statistic + reuseResourceStat bool + // readOnlyMetric is the readonly metric statistic. + // if reuseResourceStat is true, it would be the reused SlidingWindowMetric + // if reuseResourceStat is false, it would be the BucketLeapArray + readOnlyMetric base.ReadStat + // writeOnlyMetric is the write only metric statistic. + // if reuseResourceStat is true, it would be nil + // if reuseResourceStat is false, it would be the BucketLeapArray + writeOnlyMetric base.WriteStat } type TrafficShapingController struct { @@ -21,14 +37,15 @@ type TrafficShapingController struct { flowChecker TrafficShapingChecker rule *Rule + // boundStat is the statistic of current TrafficShapingController + boundStat standaloneStatistic } -// NewTrafficShapingController creates a TrafficShapingController wrapped with the given checker and flow rule. -func NewTrafficShapingController(flowCalculator TrafficShapingCalculator, flowChecker TrafficShapingChecker, rule *Rule) *TrafficShapingController { - return &TrafficShapingController{flowCalculator: flowCalculator, flowChecker: flowChecker, rule: rule} +func NewTrafficShapingController(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { + return &TrafficShapingController{rule: rule, boundStat: *boundStat}, nil } -func (t *TrafficShapingController) Rule() *Rule { +func (t *TrafficShapingController) BoundRule() *Rule { return t.rule } @@ -40,7 +57,7 @@ func (t *TrafficShapingController) FlowCalculator() TrafficShapingCalculator { return t.flowCalculator } -func (t *TrafficShapingController) PerformChecking(node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult { - allowedTokens := t.flowCalculator.CalculateAllowedTokens(node, acquireCount, flag) - return t.flowChecker.DoCheck(node, acquireCount, allowedTokens) +func (t *TrafficShapingController) PerformChecking(resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult { + allowedTokens := t.flowCalculator.CalculateAllowedTokens(acquireCount, flag) + return t.flowChecker.DoCheck(resStat, acquireCount, allowedTokens) } diff --git a/core/stat/base/sliding_window_metric.go b/core/stat/base/sliding_window_metric.go index 7cdcd5a14..f1a2b0c74 100644 --- a/core/stat/base/sliding_window_metric.go +++ b/core/stat/base/sliding_window_metric.go @@ -1,7 +1,6 @@ package base import ( - "fmt" "reflect" "sync/atomic" @@ -23,37 +22,21 @@ type SlidingWindowMetric struct { } // It must pass the parameter point to the real storage entity -func NewSlidingWindowMetric(sampleCount, intervalInMs uint32, real *BucketLeapArray) *SlidingWindowMetric { - if real == nil || intervalInMs <= 0 || sampleCount <= 0 { - panic(fmt.Sprintf("Illegal parameters,intervalInMs=%d,sampleCount=%d,real=%+v.", intervalInMs, sampleCount, real)) +func NewSlidingWindowMetric(sampleCount, intervalInMs uint32, real *BucketLeapArray) (*SlidingWindowMetric, error) { + if real == nil { + return nil, errors.New("Nil BucketLeapArray") } - - if intervalInMs%sampleCount != 0 { - panic(fmt.Sprintf("Invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount)) + if err := base.CheckValidityForReuseStatistic(sampleCount, intervalInMs, real.SampleCount(), real.IntervalInMs()); err != nil { + return nil, err } bucketLengthInMs := intervalInMs / sampleCount - parentIntervalInMs := real.IntervalInMs() - parentBucketLengthInMs := real.BucketLengthInMs() - - // bucketLengthInMs of BucketLeapArray must be divisible by bucketLengthInMs of SlidingWindowMetric - // for example: bucketLengthInMs of BucketLeapArray is 500ms, and bucketLengthInMs of SlidingWindowMetric is 2000ms - // for example: bucketLengthInMs of BucketLeapArray is 500ms, and bucketLengthInMs of SlidingWindowMetric is 500ms - if bucketLengthInMs%parentBucketLengthInMs != 0 { - panic(fmt.Sprintf("BucketLeapArray's BucketLengthInMs(%d) is not divisible by SlidingWindowMetric's BucketLengthInMs(%d).", parentBucketLengthInMs, bucketLengthInMs)) - } - - if intervalInMs > parentIntervalInMs { - // todo if SlidingWindowMetric's intervalInMs is greater than BucketLeapArray. - panic(fmt.Sprintf("The interval(%d) of SlidingWindowMetric is greater than parent BucketLeapArray(%d).", intervalInMs, parentIntervalInMs)) - } - return &SlidingWindowMetric{ bucketLengthInMs: bucketLengthInMs, sampleCount: sampleCount, intervalInMs: intervalInMs, real: real, - } + }, nil } // Get the start time range of the bucket for the provided time. diff --git a/core/stat/base/sliding_window_metric_test.go b/core/stat/base/sliding_window_metric_test.go index 7767e9537..ca2c15e80 100644 --- a/core/stat/base/sliding_window_metric_test.go +++ b/core/stat/base/sliding_window_metric_test.go @@ -1,11 +1,10 @@ package base import ( - "reflect" - "strings" "testing" "github.com/alibaba/sentinel-golang/core/base" + "github.com/stretchr/testify/assert" ) func TestSlidingWindowMetric_getBucketStartRange(t *testing.T) { @@ -95,7 +94,9 @@ func TestSlidingWindowMetric_getBucketStartRange(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := NewSlidingWindowMetric(tt.args.sampleCount, tt.args.intervalInMs, NewBucketLeapArray(tt.args.realSampleCount, tt.args.realIntervalInMs)) + m, err := NewSlidingWindowMetric(tt.args.sampleCount, tt.args.intervalInMs, NewBucketLeapArray(tt.args.realSampleCount, tt.args.realIntervalInMs)) + assert.True(t, err == nil) + gotStart, gotEnd := m.getBucketStartRange(tt.args.now) if gotStart != tt.wantStart { t.Errorf("SlidingWindowMetric.getBucketStartRange() gotStart = %v, want %v", gotStart, tt.wantStart) @@ -108,81 +109,16 @@ func TestSlidingWindowMetric_getBucketStartRange(t *testing.T) { } func Test_NewSlidingWindowMetric(t *testing.T) { - type args struct { - sampleCount uint32 - intervalInMs uint32 - real *BucketLeapArray - } - tests := []struct { - name string - args args - want string - }{ - { - name: "Test_NewSlidingWindowMetric-1", - args: args{ - intervalInMs: 2000, - sampleCount: 4, - real: NewBucketLeapArray(SampleCount, IntervalInMs), - }, - want: "", - }, - { - name: "Test_NewSlidingWindowMetric-2", - args: args{ - intervalInMs: 0, - sampleCount: 0, - real: NewBucketLeapArray(SampleCount, IntervalInMs), - }, - want: "Illegal parameters,intervalInMs=0,sampleCount=0,real=", - }, - { - name: "Test_NewSlidingWindowMetric-3", - args: args{ - intervalInMs: 2001, - sampleCount: 4, - real: NewBucketLeapArray(SampleCount, IntervalInMs), - }, - want: "Invalid parameters, intervalInMs is 2001, sampleCount is 4.", - }, - { - name: "Test_NewSlidingWindowMetric-4", - args: args{ - intervalInMs: 2002, - sampleCount: 2, - real: NewBucketLeapArray(SampleCount, IntervalInMs), - }, - want: "BucketLeapArray's BucketLengthInMs(500) is not divisible by SlidingWindowMetric's BucketLengthInMs(1001).", - }, - { - name: "Test_NewSlidingWindowMetric-5", - args: args{ - intervalInMs: 200000, - sampleCount: 4, - real: NewBucketLeapArray(SampleCount, IntervalInMs), - }, - want: "The interval(200000) of SlidingWindowMetric is greater than parent BucketLeapArray(10000).", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer func() { - if err := recover(); err != nil { - errContent, ok := err.(string) - if !ok { - t.Errorf("Fail to assert err, except string, in fact:%+v", reflect.TypeOf(err)) - } - if !strings.Contains(errContent, tt.want) { - t.Errorf("Failed, except [%s],in fact:[%s]", tt.want, errContent) - } - } - }() - got := NewSlidingWindowMetric(tt.args.sampleCount, tt.args.intervalInMs, tt.args.real) - if got == nil || "" != tt.want { - t.Errorf("NewSlidingWindowMetric() = %v", got) - } - }) - } + got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs)) + assert.True(t, err == nil && got != nil) + got, err = NewSlidingWindowMetric(0, 0, NewBucketLeapArray(SampleCount, IntervalInMs)) + assert.True(t, got == nil && err != nil) + got, err = NewSlidingWindowMetric(4, 2001, NewBucketLeapArray(SampleCount, IntervalInMs)) + assert.True(t, got == nil && err != nil) + got, err = NewSlidingWindowMetric(2, 2002, NewBucketLeapArray(SampleCount, IntervalInMs)) + assert.True(t, got == nil && err != nil) + got, err = NewSlidingWindowMetric(4, 200000, NewBucketLeapArray(SampleCount, IntervalInMs)) + assert.True(t, got == nil && err != nil) } func TestSlidingWindowMetric_GetIntervalSumWithTime(t *testing.T) { @@ -223,7 +159,7 @@ func TestSlidingWindowMetric_GetIntervalSumWithTime(t *testing.T) { for i := 0; i < int(tt.fields.intervalInMs); i++ { tt.fields.real.addCountWithTime(tt.args.now-100-uint64(i), tt.args.event, 1) } - m := NewSlidingWindowMetric(tt.fields.sampleCount, tt.fields.intervalInMs, tt.fields.real) + m, _ := NewSlidingWindowMetric(tt.fields.sampleCount, tt.fields.intervalInMs, tt.fields.real) if got := m.getSumWithTime(tt.args.now, tt.args.event); got != tt.want { t.Errorf("SlidingWindowMetric.getSumWithTime() = %v, want %v", got, tt.want) } diff --git a/core/stat/base_node.go b/core/stat/base_node.go index 43b98ee0a..637ef1cee 100644 --- a/core/stat/base_node.go +++ b/core/stat/base_node.go @@ -4,6 +4,7 @@ import ( "sync/atomic" "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/config" sbase "github.com/alibaba/sentinel-golang/core/stat/base" ) @@ -18,8 +19,8 @@ type BaseStatNode struct { } func NewBaseStatNode(sampleCount uint32, intervalInMs uint32) *BaseStatNode { - la := sbase.NewBucketLeapArray(base.DefaultSampleCountTotal, base.DefaultIntervalMsTotal) - metric := sbase.NewSlidingWindowMetric(sampleCount, intervalInMs, la) + la := sbase.NewBucketLeapArray(config.GlobalStatisticSampleCountTotal(), config.GlobalStatisticIntervalMsTotal()) + metric, _ := sbase.NewSlidingWindowMetric(sampleCount, intervalInMs, la) return &BaseStatNode{ goroutineNum: 0, sampleCount: sampleCount, @@ -49,7 +50,7 @@ func (n *BaseStatNode) GetMaxAvg(event base.MetricEvent) float64 { return float64(n.metric.GetMaxOfSingleBucket(event)) * float64(n.sampleCount) / float64(n.intervalMs) * 1000 } -func (n *BaseStatNode) AddMetric(event base.MetricEvent, count uint64) { +func (n *BaseStatNode) AddCount(event base.MetricEvent, count int64) { n.arr.AddCount(event, int64(count)) } @@ -81,3 +82,11 @@ func (n *BaseStatNode) Reset() { // TODO: this should be thread-safe, or error may occur panic("to be implemented") } + +func (n *BaseStatNode) GenerateReadStat(sampleCount uint32, intervalInMs uint32) (base.ReadStat, error) { + return sbase.NewSlidingWindowMetric(sampleCount, intervalInMs, n.arr) +} + +func (n *BaseStatNode) DefaultMetric() base.ReadStat { + return n.metric +} diff --git a/core/stat/resource_node.go b/core/stat/resource_node.go index 4ba78e506..261a3126e 100644 --- a/core/stat/resource_node.go +++ b/core/stat/resource_node.go @@ -2,6 +2,7 @@ package stat import ( "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/config" ) type ResourceNode struct { @@ -14,8 +15,7 @@ type ResourceNode struct { // NewResourceNode creates a new resource node with given name and classification. func NewResourceNode(resourceName string, resourceType base.ResourceType) *ResourceNode { return &ResourceNode{ - // TODO: make this configurable - BaseStatNode: *NewBaseStatNode(base.DefaultSampleCount, base.DefaultIntervalMs), + BaseStatNode: *NewBaseStatNode(config.MetricStatisticSampleCount(), config.MetricStatisticIntervalMs()), resourceName: resourceName, resourceType: resourceType, } diff --git a/core/stat/stat_slot.go b/core/stat/stat_slot.go index 1df3e04a2..dfa085b5d 100644 --- a/core/stat/stat_slot.go +++ b/core/stat/stat_slot.go @@ -36,14 +36,14 @@ func (s *Slot) recordPassFor(sn base.StatNode, count uint32) { return } sn.IncreaseGoroutineNum() - sn.AddMetric(base.MetricEventPass, uint64(count)) + sn.AddCount(base.MetricEventPass, int64(count)) } func (s *Slot) recordBlockFor(sn base.StatNode, count uint32) { if sn == nil { return } - sn.AddMetric(base.MetricEventBlock, uint64(count)) + sn.AddCount(base.MetricEventBlock, int64(count)) } func (s *Slot) recordCompleteFor(sn base.StatNode, count uint32, rt uint64, err error) { @@ -51,9 +51,9 @@ func (s *Slot) recordCompleteFor(sn base.StatNode, count uint32, rt uint64, err return } if err != nil { - sn.AddMetric(base.MetricEventError, uint64(count)) + sn.AddCount(base.MetricEventError, int64(count)) } - sn.AddMetric(base.MetricEventRt, rt) - sn.AddMetric(base.MetricEventComplete, uint64(count)) + sn.AddCount(base.MetricEventRt, int64(rt)) + sn.AddCount(base.MetricEventComplete, int64(count)) sn.DecreaseGoroutineNum() } diff --git a/tests/testdata/extension/helper/FlowRule.json b/tests/testdata/extension/helper/FlowRule.json index c3c90c384..80baa98b1 100644 --- a/tests/testdata/extension/helper/FlowRule.json +++ b/tests/testdata/extension/helper/FlowRule.json @@ -7,7 +7,8 @@ "controlBehavior": 0, "refResource": "refDefault", "warmUpPeriodSec": 10, - "maxQueueingTimeMs": 1000 + "maxQueueingTimeMs": 1000, + "statIntervalInMs": 0 }, { "resource": "abc", @@ -17,7 +18,8 @@ "controlBehavior": 1, "refResource": "refDefault", "warmUpPeriodSec": 20, - "maxQueueingTimeMs": 2000 + "maxQueueingTimeMs": 2000, + "statIntervalInMs": 0 }, { "resource": "abc", @@ -27,6 +29,7 @@ "controlBehavior": 1, "refResource": "refDefault", "warmUpPeriodSec": 30, - "maxQueueingTimeMs": 3000 + "maxQueueingTimeMs": 3000, + "statIntervalInMs": 0 } ] \ No newline at end of file