Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] add outlier ejection #579

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
16 changes: 15 additions & 1 deletion api/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package api

import (
"github.com/pkg/errors"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)

// TraceError records the provided error to the given SentinelEntry.
Expand All @@ -34,3 +35,16 @@ func TraceError(entry *base.SentinelEntry, err error) {

entry.SetError(err)
}

func TraceCallee(entry *base.SentinelEntry, address string) {
defer func() {
if e := recover(); e != nil {
logging.Error(errors.Errorf("%+v", e), "Failed to api.TraceCallee()")
return
}
}()
if entry == nil || address == "" {
return
}
entry.SetPair("address", address)
}
16 changes: 16 additions & 0 deletions core/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ func (ctx *EntryContext) Rt() uint64 {
return ctx.rt
}

func (ctx *EntryContext) FilterNodes() []string {
return ctx.RuleCheckResult.FilterNodes()
}

func (ctx *EntryContext) HalfOpenNodes() []string {
return ctx.RuleCheckResult.HalfOpenNodes()
}

func (ctx *EntryContext) SetPair(key, val interface{}) {
ctx.Data[key] = val
}

func (ctx *EntryContext) GetPair(key interface{}) interface{} {
return ctx.Data[key]
}

func NewEmptyEntryContext() *EntryContext {
return &EntryContext{}
}
Expand Down
9 changes: 8 additions & 1 deletion core/base/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package base
import (
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"

"github.com/alibaba/sentinel-golang/logging"
)

type ExitHandler func(entry *SentinelEntry, ctx *EntryContext) error
Expand Down Expand Up @@ -55,6 +56,12 @@ func (e *SentinelEntry) SetError(err error) {
}
}

func (e *SentinelEntry) SetPair(key, val interface{}) {
if e.ctx != nil {
e.ctx.SetPair(key, val)
}
}

func (e *SentinelEntry) Context() *EntryContext {
return e.ctx
}
Expand Down
22 changes: 20 additions & 2 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (s TokenResultStatus) String() string {
type TokenResult struct {
status TokenResultStatus

blockErr *BlockError
nanosToWait time.Duration
blockErr *BlockError
nanosToWait time.Duration
filterNodes []string
halfOpenNodes []string
}

func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) {
Expand Down Expand Up @@ -154,6 +156,22 @@ func (r *TokenResult) NanosToWait() time.Duration {
return r.nanosToWait
}

func (r *TokenResult) FilterNodes() []string {
return r.filterNodes
}

func (r *TokenResult) HalfOpenNodes() []string {
return r.halfOpenNodes
}

func (r *TokenResult) SetFilterNodes(nodes []string) {
r.filterNodes = nodes
}

func (r *TokenResult) SetHalfOpenNodes(nodes []string) {
r.halfOpenNodes = nodes
}

func (r *TokenResult) String() string {
var blockMsg string
if r.blockErr == nil {
Expand Down
21 changes: 11 additions & 10 deletions core/circuitbreaker/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"reflect"
"sync"

"github.com/pkg/errors"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

type CircuitBreakerGenFunc func(r *Rule, reuseStat interface{}) (CircuitBreaker, error)
Expand Down Expand Up @@ -262,7 +263,7 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {

newBreakers := make(map[string][]CircuitBreaker, len(validResRulesMap))
for res, resRules := range validResRulesMap {
newCbsOfRes := buildResourceCircuitBreaker(res, resRules, breakersClone[res])
newCbsOfRes := BuildResourceCircuitBreaker(res, resRules, breakersClone[res])
if len(newCbsOfRes) > 0 {
newBreakers[res] = newCbsOfRes
}
Expand All @@ -275,7 +276,7 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
currentRules = rawResRulesMap

logging.Debug("[CircuitBreaker onRuleUpdate] Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano()-start)
logRuleUpdate(validResRulesMap)
LogRuleUpdate(validResRulesMap)
return nil
}

Expand Down Expand Up @@ -305,7 +306,7 @@ func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) {
oldResCbs = append(oldResCbs, breakers[res]...)
updateMux.RUnlock()

newCbsOfRes := buildResourceCircuitBreaker(res, rawResRules, oldResCbs)
newCbsOfRes := BuildResourceCircuitBreaker(res, rawResRules, oldResCbs)

updateMux.Lock()
if len(newCbsOfRes) == 0 {
Expand Down Expand Up @@ -341,7 +342,7 @@ func rulesFrom(rm map[string][]*Rule) []*Rule {
return rules
}

func logRuleUpdate(m map[string][]*Rule) {
func LogRuleUpdate(m map[string][]*Rule) {
rs := rulesFrom(m)
if len(rs) == 0 {
logging.Info("[CircuitBreakerRuleManager] Circuit breaking rules were cleared")
Expand Down Expand Up @@ -399,12 +400,12 @@ func ClearRulesOfResource(res string) error {
return err
}

// buildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
// BuildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
func BuildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
newCbsOfRes := make([]CircuitBreaker, 0, len(rulesOfRes))
for _, r := range rulesOfRes {
if res != r.Resource {
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.buildResourceCircuitBreaker()", "rule", r)
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.BuildResourceCircuitBreaker()", "rule", r)
continue
}
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResCbs)
Expand All @@ -421,7 +422,7 @@ func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []Cir

generator := cbGenFuncMap[r.Strategy]
if generator == nil {
logging.Warn("[CircuitBreaker buildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
continue
}

Expand All @@ -433,7 +434,7 @@ func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []Cir
cb, e = generator(r, nil)
}
if cb == nil || e != nil {
logging.Warn("[CircuitBreaker buildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
continue
}

Expand Down
114 changes: 114 additions & 0 deletions core/outlier/recycler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package outlier

import (
"errors"
"fmt"
"sync"
"time"

"github.com/alibaba/sentinel-golang/logging"
)

const capacity = 200

var (
// resource name ---> node recycler
recyclers = make(map[string]*Recycler)
recyclerMutex = new(sync.Mutex)
recyclerCh = make(chan task, capacity)
)

type task struct {
nodes []string
resource string
}

func init() {
go func() {
defer func() {
if err := recover(); err != nil {
logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming recyclerCh")
}
}()
for task := range recyclerCh {
recycler := getRecyclerOfResource(task.resource)
recycler.scheduleNodes(task.nodes)
}
}()
}

// Recycler recycles node instance that have been invalidated for a long time
type Recycler struct {
resource string
interval time.Duration
status map[string]bool
mtx sync.Mutex
}

func getRecyclerOfResource(resource string) *Recycler {
recyclerMutex.Lock()
defer recyclerMutex.Unlock()
if _, ok := recyclers[resource]; !ok {
recycler := &Recycler{
resource: resource,
status: make(map[string]bool),
}
rule := getOutlierRuleOfResource(resource)
if rule == nil {
logging.Error(errors.New("nil outlier rule"), "Nil outlier rule in getRecyclerOfResource()")
} else {
if rule.RecycleIntervalS == 0 {
recycler.interval = 10 * time.Minute
} else {
recycler.interval = time.Duration(rule.RecycleIntervalS * 1e9)
}
}
recyclers[resource] = recycler
}
return recyclers[resource]
}

func (r *Recycler) scheduleNodes(nodes []string) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, node := range nodes {
if _, ok := r.status[node]; !ok {
r.status[node] = false
nodeCopy := node // Copy values to correctly capture the closure for node.
time.AfterFunc(r.interval, func() {
r.recycle(nodeCopy)
})
}
}
}

func (r *Recycler) recover(node string) {
r.mtx.Lock()
defer r.mtx.Unlock()
if _, ok := r.status[node]; ok {
r.status[node] = true
}
}

func (r *Recycler) recycle(node string) {
r.mtx.Lock()
defer r.mtx.Unlock()
if v, ok := r.status[node]; ok && !v {
deleteNodeBreakerOfResource(r.resource, node)
}
delete(r.status, node)
}
Loading
Loading