Skip to content

Commit

Permalink
Support arbitrary statistic duration for flow control and refactor in…
Browse files Browse the repository at this point in the history
…ternal implementation (#200)

* Support arbitrary statistic duration for flow control. Add `StatIntervalInMs` attribute in flow.Rule. When StatIntervalInMs > globalInterval or < bucketLength, we create a new sliding window for it.
* Add stat reuse mechanism for flow rules
* Make threshold of flow rule "request amount per interval" rather than QPS.
* Refine internal implementation
  • Loading branch information
louyuting authored Sep 25, 2020
1 parent 7ddba92 commit 4e8f5a0
Show file tree
Hide file tree
Showing 24 changed files with 834 additions and 213 deletions.
2 changes: 2 additions & 0 deletions adapter/echo/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ func initSentinel(t *testing.T) {
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatIntervalInMs: 1000,
},
{
Resource: "/api/:uid",
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatIntervalInMs: 1000,
},
})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions adapter/gin/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ func initSentinel(t *testing.T) {
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatIntervalInMs: 1000,
},
{
Resource: "/api/users/:id",
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
StatIntervalInMs: 1000,
},
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddStatSlotLast(&log.Slot{})
sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{})
sc.AddStatSlotLast(&hotspot.ConcurrencyStatSlot{})
sc.AddStatSlotLast(&flow.StandaloneStatSlot{})
return sc
}
49 changes: 48 additions & 1 deletion core/base/stat.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package base

import (
"github.com/pkg/errors"
)

type TimePredicate func(uint64) bool

type MetricEvent int8
Expand Down Expand Up @@ -31,7 +35,7 @@ type ReadStat interface {
}

type WriteStat interface {
AddMetric(event MetricEvent, count uint64)
AddCount(event MetricEvent, count int64)
}

// StatNode holds real-time statistics for resources.
Expand All @@ -46,4 +50,47 @@ type StatNode interface {
DecreaseGoroutineNum()

Reset()

// GenerateReadStat generates the readonly metric statistic based on resource level global statistic
// If parameters, sampleCount and intervalInMs, are not suitable for resource level global statistic, return (nil, error)
GenerateReadStat(sampleCount uint32, intervalInMs uint32) (ReadStat, error)
}

var (
IllegalGlobalStatisticParamsError = errors.New("Invalid parameters, sampleCount or interval, for resource's global statistic")
IllegalStatisticParamsError = errors.New("Invalid parameters, sampleCount or interval, for metric statistic")
GlobalStatisticNonReusableError = errors.New("The parameters, sampleCount and interval, mismatch for reusing between resource's global statistic and readonly metric statistic.")
)

func CheckValidityForStatistic(sampleCount, intervalInMs uint32) error {
if intervalInMs == 0 || sampleCount == 0 || intervalInMs%sampleCount != 0 {
return IllegalStatisticParamsError
}
return nil
}

// CheckValidityForReuseStatistic check the compliance whether readonly metric statistic can be built based on resource's global statistic
// The parameters, sampleCount and intervalInMs, are the parameters of the metric statistic you want to build
// The parameters, parentSampleCount and parentIntervalInMs, are the parameters of the resource's global statistic
// If compliance passes, return nil, if not returns specific error
func CheckValidityForReuseStatistic(sampleCount, intervalInMs uint32, parentSampleCount, parentIntervalInMs uint32) error {
if intervalInMs == 0 || sampleCount == 0 || intervalInMs%sampleCount != 0 {
return IllegalStatisticParamsError
}
bucketLengthInMs := intervalInMs / sampleCount

if parentIntervalInMs == 0 || parentSampleCount == 0 || parentIntervalInMs%parentSampleCount != 0 {
return IllegalGlobalStatisticParamsError
}
parentBucketLengthInMs := parentIntervalInMs / parentSampleCount

//SlidingWindowMetric's intervalInMs is not divisible by BucketLeapArray's intervalInMs
if parentIntervalInMs%intervalInMs != 0 {
return GlobalStatisticNonReusableError
}
// BucketLeapArray's BucketLengthInMs is not divisible by SlidingWindowMetric's BucketLengthInMs
if bucketLengthInMs%parentBucketLengthInMs != 0 {
return GlobalStatisticNonReusableError
}
return nil
}
25 changes: 23 additions & 2 deletions core/base/stat_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package base

import "github.com/stretchr/testify/mock"
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type StatNodeMock struct {
mock.Mock
}

func (m *StatNodeMock) AddMetric(event MetricEvent, count uint64) {
func (m *StatNodeMock) AddCount(event MetricEvent, count int64) {
m.Called(event, count)
}

Expand Down Expand Up @@ -64,3 +69,19 @@ func (m *StatNodeMock) Reset() {
m.Called()
return
}

func (m *StatNodeMock) GenerateReadStat(sampleCount uint32, intervalInMs uint32) (ReadStat, error) {
args := m.Called(sampleCount, intervalInMs)
return args.Get(0).(ReadStat), args.Error(1)
}

func TestCheckValidityForReuseStatistic(t *testing.T) {
assert.Equal(t, CheckValidityForReuseStatistic(3, 1000, 20, 10000), IllegalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(0, 1000, 20, 10000), IllegalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 21, 10000), IllegalGlobalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 0, 10000), IllegalGlobalStatisticParamsError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 8000, 20, 10000), GlobalStatisticNonReusableError)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 10, 10000), GlobalStatisticNonReusableError)
assert.Equal(t, CheckValidityForReuseStatistic(1, 1000, 100, 10000), nil)
assert.Equal(t, CheckValidityForReuseStatistic(2, 1000, 20, 10000), nil)
}
19 changes: 19 additions & 0 deletions core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,22 @@ func SystemStatCollectIntervalMs() uint32 {
func UseCacheTime() bool {
return globalCfg.UseCacheTime()
}

func GlobalStatisticIntervalMsTotal() uint32 {
return globalCfg.GlobalStatisticIntervalMsTotal()
}

func GlobalStatisticSampleCountTotal() uint32 {
return globalCfg.GlobalStatisticSampleCountTotal()
}

func GlobalStatisticBucketLengthInMs() uint32 {
return globalCfg.GlobalStatisticIntervalMsTotal() / GlobalStatisticSampleCountTotal()
}

func MetricStatisticIntervalMs() uint32 {
return globalCfg.MetricStatisticIntervalMs()
}
func MetricStatisticSampleCount() uint32 {
return globalCfg.MetricStatisticSampleCount()
}
33 changes: 33 additions & 0 deletions core/config/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -52,6 +53,15 @@ type MetricLogConfig struct {

// StatConfig represents the configuration items of statistics.
type StatConfig struct {
// GlobalStatisticSampleCountTotal and GlobalStatisticIntervalMsTotal is the per resource's global default statistic sliding window config
GlobalStatisticSampleCountTotal uint32 `yaml:"globalStatisticSampleCountTotal"`
GlobalStatisticIntervalMsTotal uint32 `yaml:"globalStatisticIntervalMsTotal"`

// MetricStatisticSampleCount and MetricStatisticIntervalMs is the per resource's default readonly metric statistic
// This default readonly metric statistic must be reusable based on global statistic.
MetricStatisticSampleCount uint32 `yaml:"metricStatisticSampleCount"`
MetricStatisticIntervalMs uint32 `yaml:"metricStatisticIntervalMs"`

System SystemStatConfig `yaml:"system"`
}

Expand Down Expand Up @@ -84,6 +94,10 @@ func NewDefaultConfig() *Entity {
},
},
Stat: StatConfig{
GlobalStatisticSampleCountTotal: base.DefaultSampleCountTotal,
GlobalStatisticIntervalMsTotal: base.DefaultIntervalMsTotal,
MetricStatisticSampleCount: base.DefaultSampleCount,
MetricStatisticIntervalMs: base.DefaultIntervalMs,
System: SystemStatConfig{
CollectIntervalMs: DefaultSystemStatCollectIntervalMs,
},
Expand Down Expand Up @@ -117,6 +131,10 @@ func checkConfValid(conf *SentinelConfig) error {
if mc.SingleFileMaxSize <= 0 {
return errors.New("Illegal metric log globalCfg: singleFileMaxSize <= 0")
}
if err := base.CheckValidityForReuseStatistic(conf.Stat.MetricStatisticSampleCount, conf.Stat.MetricStatisticIntervalMs,
conf.Stat.GlobalStatisticSampleCountTotal, conf.Stat.GlobalStatisticIntervalMsTotal); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -168,3 +186,18 @@ func (entity *Entity) SystemStatCollectIntervalMs() uint32 {
func (entity *Entity) UseCacheTime() bool {
return entity.Sentinel.UseCacheTime
}

func (entity *Entity) GlobalStatisticIntervalMsTotal() uint32 {
return entity.Sentinel.Stat.GlobalStatisticIntervalMsTotal
}

func (entity *Entity) GlobalStatisticSampleCountTotal() uint32 {
return entity.Sentinel.Stat.GlobalStatisticSampleCountTotal
}

func (entity *Entity) MetricStatisticIntervalMs() uint32 {
return entity.Sentinel.Stat.MetricStatisticIntervalMs
}
func (entity *Entity) MetricStatisticSampleCount() uint32 {
return entity.Sentinel.Stat.MetricStatisticSampleCount
}
50 changes: 41 additions & 9 deletions core/flow/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,54 @@ type Rule struct {
Resource string `json:"resource"`
TokenCalculateStrategy TokenCalculateStrategy `json:"tokenCalculateStrategy"`
ControlBehavior ControlBehavior `json:"controlBehavior"`
Threshold float64 `json:"threshold"`
RelationStrategy RelationStrategy `json:"relationStrategy"`
RefResource string `json:"refResource"`
MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`
WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"`
WarmUpColdFactor uint32 `json:"warmUpColdFactor"`
// Threshold means the threshold during StatIntervalInMs
// If StatIntervalInMs is 1000(1 second), Threshold means QPS
Threshold float64 `json:"threshold"`
RelationStrategy RelationStrategy `json:"relationStrategy"`
RefResource string `json:"refResource"`
MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`
WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"`
WarmUpColdFactor uint32 `json:"warmUpColdFactor"`
// StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule.
// If user doesn't set StatIntervalInMs, that means using default metric statistic of resource.
// If the StatIntervalInMs user specifies can not reuse the global statistic of resource,
// sentinel will generate independent statistic structure for this rule.
StatIntervalInMs uint32 `json:"statIntervalInMs"`
}

func (r *Rule) isEqualsTo(newRule *Rule) bool {
if newRule == nil {
return false
}
if !(r.Resource == newRule.Resource && r.RelationStrategy == newRule.RelationStrategy &&
r.RefResource == newRule.RefResource && r.StatIntervalInMs == newRule.StatIntervalInMs &&
r.TokenCalculateStrategy == newRule.TokenCalculateStrategy && r.ControlBehavior == newRule.ControlBehavior && r.Threshold == newRule.Threshold &&
r.MaxQueueingTimeMs == newRule.MaxQueueingTimeMs && r.WarmUpPeriodSec == newRule.WarmUpPeriodSec && r.WarmUpColdFactor == newRule.WarmUpColdFactor) {
return false
}
return true
}

func (r *Rule) isStatReusable(newRule *Rule) bool {
if newRule == nil {
return false
}
return r.Resource == newRule.Resource && r.RelationStrategy == newRule.RelationStrategy &&
r.RefResource == newRule.RefResource && r.StatIntervalInMs == newRule.StatIntervalInMs
}

func (r *Rule) needStatistic() bool {
return !(r.TokenCalculateStrategy == Direct && r.ControlBehavior == Throttling)
}

func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("Rule{Resource=%s, TokenCalculateStrategy=%s, ControlBehavior=%s, "+
"Threshold=%.2f, RelationStrategy=%s, RefResource=%s, MaxQueueingTimeMs=%d, WarmUpPeriodSec=%d, WarmUpColdFactor=%d}",
r.Resource, r.TokenCalculateStrategy, r.ControlBehavior, r.Threshold, r.RelationStrategy,
r.RefResource, r.MaxQueueingTimeMs, r.WarmUpPeriodSec, r.WarmUpColdFactor)
"Threshold=%.2f, RelationStrategy=%s, RefResource=%s, MaxQueueingTimeMs=%d, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, StatIntervalInMs=%d}",
r.Resource, r.TokenCalculateStrategy, r.ControlBehavior, r.Threshold, r.RelationStrategy, r.RefResource,
r.MaxQueueingTimeMs, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.StatIntervalInMs)
}
return string(b)
}
Expand Down
Loading

0 comments on commit 4e8f5a0

Please sign in to comment.