Skip to content

Commit

Permalink
Remove fields about concurrency limiting in FlowRule
Browse files Browse the repository at this point in the history
  • Loading branch information
louyuting authored and sczyh30 committed Sep 23, 2020
1 parent 4162e79 commit b65899d
Show file tree
Hide file tree
Showing 20 changed files with 54 additions and 117 deletions.
6 changes: 2 additions & 4 deletions adapter/echo/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ func initSentinel(t *testing.T) {
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "GET:/ping",
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
{
Resource: "/api/:uid",
MetricType: flow.QPS,
Count: 0,
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
6 changes: 2 additions & 4 deletions adapter/gin/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ func initSentinel(t *testing.T) {
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "GET:/ping",
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
{
Resource: "/api/users/:id",
MetricType: flow.QPS,
Count: 0,
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
12 changes: 4 additions & 8 deletions adapter/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ func TestUnaryClientIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "client:" + method,
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand All @@ -45,8 +44,7 @@ func TestUnaryClientIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "client:" + method,
MetricType: flow.QPS,
Count: 0,
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand All @@ -72,8 +70,7 @@ func TestStreamClientIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "client:/grpc.testing.TestService/StreamingOutputCall",
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand All @@ -93,8 +90,7 @@ func TestStreamClientIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "client:/grpc.testing.TestService/StreamingOutputCall",
MetricType: flow.QPS,
Count: 0,
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
12 changes: 4 additions & 8 deletions adapter/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func TestStreamServerIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "/grpc.testing.TestService/StreamingInputCall",
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand All @@ -53,8 +52,7 @@ func TestStreamServerIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "/grpc.testing.TestService/StreamingInputCall",
MetricType: flow.QPS,
Count: 0,
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand All @@ -78,8 +76,7 @@ func TestUnaryServerIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "/grpc.testing.TestService/UnaryCall",
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down Expand Up @@ -107,8 +104,7 @@ func TestUnaryServerIntercept(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: "/grpc.testing.TestService/UnaryCall",
MetricType: flow.QPS,
Count: 0,
Threshold: 0,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
3 changes: 1 addition & 2 deletions adapter/micro/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func TestClientLimiter(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: req.Method(),
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
6 changes: 2 additions & 4 deletions adapter/micro/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ func TestServerLimiter(t *testing.T) {
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: req.Method(),
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand All @@ -73,8 +72,7 @@ func TestServerLimiter(t *testing.T) {
var _, err = flow.LoadRules([]*flow.Rule{
{
Resource: req.Method(),
MetricType: flow.QPS,
Count: 1,
Threshold: 1,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
2 changes: 1 addition & 1 deletion api/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// {
// Resource: "some-test",
// MetricType: flow.QPS,
// Count: 10,
// Threshold: 10,
// ControlBehavior: flow.Reject,
// },
// })
Expand Down
2 changes: 1 addition & 1 deletion core/flow/doc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package flow implements the flow shaping control.
//
// flow module supports two statistic metric: QPS and Concurrency.
// flow module is based on QPS statistic metric
//
// The TrafficShapingController consists of two part: TrafficShapingCalculator and TrafficShapingChecker
//
Expand Down
34 changes: 6 additions & 28 deletions core/flow/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,6 @@ import (
"fmt"
)

// MetricType represents the target metric type.
type MetricType int32

const (
// Concurrency represents concurrency count.
Concurrency MetricType = iota
// QPS represents request count per second.
QPS
)

func (s MetricType) String() string {
switch s {
case Concurrency:
return "Concurrency"
case QPS:
return "QPS"
default:
return "Undefined"
}
}

// RelationStrategy indicates the flow control strategy based on the relation of invocations.
type RelationStrategy int32

Expand Down Expand Up @@ -83,17 +62,15 @@ func (s ControlBehavior) String() string {
}
}

// Rule describes the strategy of flow control.
// Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metric
type Rule struct {
// ID represents the unique ID of the rule (optional).
ID string `json:"id,omitempty"`

// Resource represents the resource name.
Resource string `json:"resource"`
MetricType MetricType `json:"metricType"`
TokenCalculateStrategy TokenCalculateStrategy `json:"tokenCalculateStrategy"`
ControlBehavior ControlBehavior `json:"controlBehavior"`
Count float64 `json:"count"`
Threshold float64 `json:"threshold"`
RelationStrategy RelationStrategy `json:"relationStrategy"`
RefResource string `json:"refResource"`
MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`
Expand All @@ -105,9 +82,10 @@ func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("Rule{Resource=%s, MetricType=%s, TokenCalculateStrategy=%s, ControlBehavior=%s, "+
"Count=%.2f, RelationStrategy=%s, WarmUpPeriodSec=%d, WarmUpColdFactor=%d, MaxQueueingTimeMs=%d}",
r.Resource, r.MetricType, r.TokenCalculateStrategy, r.ControlBehavior, r.Count, r.RelationStrategy, r.WarmUpPeriodSec, r.WarmUpColdFactor, r.MaxQueueingTimeMs)
return fmt.Sprintf("Rule{Resource=%s, TokenCalculateStrategy=%s, ControlBehavior=%s, "+
"Threshold=%.2f, RelationStrategy=%s, RefResource=%s, MaxQueueingTimeMs=%d, WarmUpPeriodSec=%d, WarmUpColdFactor=%d}",
r.Resource, r.TokenCalculateStrategy, r.ControlBehavior, r.Threshold, r.RelationStrategy,
r.RefResource, r.MaxQueueingTimeMs, r.WarmUpPeriodSec, r.WarmUpColdFactor)
}
return string(b)
}
Expand Down
9 changes: 3 additions & 6 deletions core/flow/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func init() {
tokenCalculateStrategy: Direct,
controlBehavior: Reject,
}] = func(rule *Rule) *TrafficShapingController {
return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule), rule)
return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Threshold), NewDefaultTrafficShapingChecker(rule), rule)
}
tcGenFuncMap[trafficControllerGenKey{
tokenCalculateStrategy: Direct,
controlBehavior: Throttling,
}] = func(rule *Rule) *TrafficShapingController {
return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule)
return NewTrafficShapingController(NewDirectTrafficShapingCalculator(rule.Threshold), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule)
}
tcGenFuncMap[trafficControllerGenKey{
tokenCalculateStrategy: WarmUp,
Expand Down Expand Up @@ -270,12 +270,9 @@ func IsValidRule(rule *Rule) error {
if rule.Resource == "" {
return errors.New("empty resource name")
}
if rule.Count < 0 {
if rule.Threshold < 0 {
return errors.New("negative threshold")
}
if rule.MetricType < 0 {
return errors.New("invalid metric type")
}
if rule.RelationStrategy < 0 {
return errors.New("invalid relation strategy")
}
Expand Down
23 changes: 9 additions & 14 deletions core/flow/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) {
resource := "test-customized-tc"
_, err = LoadRules([]*Rule{
{
Count: 20,
MetricType: QPS,
Threshold: 20,
Resource: resource,
TokenCalculateStrategy: TokenCalculateStrategy(111),
ControlBehavior: ControlBehavior(112),
Expand All @@ -50,10 +49,10 @@ func TestSetAndRemoveTrafficShapingGenerator(t *testing.T) {
}

func TestIsValidFlowRule(t *testing.T) {
badRule1 := &Rule{Count: 1, MetricType: QPS, Resource: ""}
badRule2 := &Rule{Count: -1.9, MetricType: QPS, Resource: "test"}
badRule3 := &Rule{Count: 5, MetricType: QPS, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Reject}
goodRule1 := &Rule{Count: 10, MetricType: QPS, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling, WarmUpPeriodSec: 10}
badRule1 := &Rule{Threshold: 1, Resource: ""}
badRule2 := &Rule{Threshold: -1.9, Resource: "test"}
badRule3 := &Rule{Threshold: 5, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Reject}
goodRule1 := &Rule{Threshold: 10, Resource: "test", TokenCalculateStrategy: WarmUp, ControlBehavior: Throttling, WarmUpPeriodSec: 10}

assert.Error(t, IsValidRule(badRule1))
assert.Error(t, IsValidRule(badRule2))
Expand All @@ -68,8 +67,7 @@ func TestGetRules(t *testing.T) {
}
r1 := &Rule{
Resource: "abc1",
MetricType: 0,
Count: 0,
Threshold: 0,
RelationStrategy: 0,
TokenCalculateStrategy: Direct,
ControlBehavior: Reject,
Expand All @@ -79,8 +77,7 @@ func TestGetRules(t *testing.T) {
}
r2 := &Rule{
Resource: "abc2",
MetricType: 0,
Count: 0,
Threshold: 0,
RelationStrategy: 0,
TokenCalculateStrategy: Direct,
ControlBehavior: Throttling,
Expand Down Expand Up @@ -112,8 +109,7 @@ func TestGetRules(t *testing.T) {
t.Run("getRules", func(t *testing.T) {
r1 := &Rule{
Resource: "abc1",
MetricType: 0,
Count: 0,
Threshold: 0,
RelationStrategy: 0,
TokenCalculateStrategy: Direct,
ControlBehavior: Reject,
Expand All @@ -123,8 +119,7 @@ func TestGetRules(t *testing.T) {
}
r2 := &Rule{
Resource: "abc2",
MetricType: 0,
Count: 0,
Threshold: 0,
RelationStrategy: 0,
TokenCalculateStrategy: Direct,
ControlBehavior: Throttling,
Expand Down
5 changes: 2 additions & 3 deletions core/flow/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)

type Slot struct {
Expand All @@ -15,9 +16,6 @@ func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
res := ctx.Resource.Name()
tcs := getTrafficControllerListFor(res)
result := ctx.RuleCheckResult
if len(tcs) == 0 {
return result
}

// Check rules in order
for _, tc := range tcs {
Expand Down Expand Up @@ -62,6 +60,7 @@ func selectNodeByRelStrategy(rule *Rule, node base.StatNode) base.StatNode {
func checkInLocal(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
actual := selectNodeByRelStrategy(tc.rule, node)
if actual == nil {
logging.Error(errors.Errorf("nil resource node"), "no resource node for flow rule", "rule", tc.rule)
return base.NewTokenResultPass()
}
return tc.PerformChecking(node, acquireCount, flag)
Expand Down
7 changes: 1 addition & 6 deletions core/flow/tc_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@ func (d *DefaultTrafficShapingChecker) DoCheck(node base.StatNode, acquireCount
if node == nil {
return nil
}
var curCount float64
if d.rule.MetricType == Concurrency {
curCount = float64(node.CurrentGoroutineNum())
} else {
curCount = node.GetQPS(base.MetricEventPass)
}
curCount := node.GetQPS(base.MetricEventPass)
if curCount+float64(acquireCount) > threshold {
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)
}
Expand Down
8 changes: 4 additions & 4 deletions core/flow/tc_warm_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ func NewWarmUpTrafficShapingCalculator(rule *Rule) *WarmUpTrafficShapingCalculat
logging.Warn("[NewWarmUpTrafficShapingCalculator] No set WarmUpColdFactor,use default warm up cold factor value", "defaultWarmUpColdFactor", config.DefaultWarmUpColdFactor)
}

warningToken := uint64((float64(rule.WarmUpPeriodSec) * rule.Count) / float64(rule.WarmUpColdFactor-1))
warningToken := uint64((float64(rule.WarmUpPeriodSec) * rule.Threshold) / float64(rule.WarmUpColdFactor-1))

maxToken := warningToken + uint64(2*float64(rule.WarmUpPeriodSec)*rule.Count/float64(1.0+rule.WarmUpColdFactor))
maxToken := warningToken + uint64(2*float64(rule.WarmUpPeriodSec)*rule.Threshold/float64(1.0+rule.WarmUpColdFactor))

slope := float64(rule.WarmUpColdFactor-1.0) / rule.Count / float64(maxToken-warningToken)
slope := float64(rule.WarmUpColdFactor-1.0) / rule.Threshold / float64(maxToken-warningToken)

warmUpTrafficShapingCalculator := &WarmUpTrafficShapingCalculator{
warmUpPeriodInSec: rule.WarmUpPeriodSec,
coldFactor: rule.WarmUpColdFactor,
warningToken: warningToken,
maxToken: maxToken,
slope: slope,
threshold: rule.Count,
threshold: rule.Threshold,
storedTokens: 0,
lastFilledTime: 0,
}
Expand Down
3 changes: 1 addition & 2 deletions example/qps/qps_limit_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func main() {
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "some-test",
MetricType: flow.QPS,
Count: 10,
Threshold: 10,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
},
Expand Down
Loading

0 comments on commit b65899d

Please sign in to comment.