Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qps/rate limit & reduce expire seek range & expire-key hash #204

Open
wants to merge 61 commits into
base: master
Choose a base branch
from

Conversation

piaoairy219
Copy link
Collaborator

1 limit namespace&command qps/rate in view of all the titan server
2 reduce expire seek range to avoid rocksdb tomstone problem
3 hash expire-key to 256 prefix to improving the expire handling speed and prevent expire-key writing focus on single node.
4 handle empty/illegal commands
5 if connection has been closed by the client, drop left command to process
6 limit max-connection

xufangping added 30 commits September 3, 2019 22:56
…ash. just after read the command we check if it is in command list we support, else will skip it
…new limit

2 just balance limit base in active titan server num
3 unit test of rate limit passed
2 change qps/rate metrics 3 labels: namespace, command, localip
xufangping added 4 commits April 21, 2020 20:44
…and prevent that current expire-keys region writing node have higher load than other nodes
2 fix expire_test.go: gc key can get before runExpire/doExpire if expireat < now; add 2 testCases for hash/string expire: check key is deleted and gc key exists after runExpire
…ad for dashboard, replace 1.4 with 2, buckets num decrease 50%
expireKeyPrefix = []byte("$sys:0:at:")
sysExpireLeader = []byte("$sys:0:EXL:EXLeader")
expireKeyPrefix = []byte("$sys:0:at:")
hashExpireKeyPrefix = expireKeyPrefix[:len(expireKeyPrefix)-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把hash 值直接保存到expire key 中确实是个好办法,但是重新定义一个变量是否有必要?
是否可以直接把hash 值直接写入到expireKeyPrefix= []byte("$sys:0:at:") 数字0 的位置,这样旧数据也可以兼容在内,对现有key 的拼接改动也会小一些

}

type CommandLimiter struct {
localIp string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use localIP instead

return nil, errors.New(rateLimit.InterfaceName + " adds is empty")
}

if rateLimit.LimiterNamespace == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no namespace supplied, can we just regard it as a global limitation?
@piaoairy219

if rateLimit.LimiterNamespace == "" {
return nil, errors.New("limiter-namespace is configured with empty")
}
if rateLimit.WeightChangeFactor <= 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A configuration validator can be used here.

Comment on lines +134 to +140
if !(rateLimit.UsageToDivide > 0 && rateLimit.UsageToDivide < rateLimit.UsageToMultiply && rateLimit.UsageToMultiply < 1) {
return nil, errors.New("should config 0 < usage-to-divide < usage-to-multiply < 1")
}
if rateLimit.InitialPercent > 1 || rateLimit.InitialPercent <= 0 {
return nil, errors.New("initial-percent should in (0, 1]")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

strUnit = limitStr[len(limitStr)-1]
if strUnit == 'k' || strUnit == 'K' {
unit = 1024
limitStr = limitStr[:len(limitStr)-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap this into a function

v, ok := l.limiters.Load(limiterName)
var commandLimiter *CommandLimiter
if !ok {
commandLimiter = l.init(limiterName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name init seems too generic for its feature

limiterName := k.(string)
commandLimiter := v.(*CommandLimiter)
if commandLimiter != nil {
averageQps := commandLimiter.reportLocalStat(l.conf.GlobalBalancePeriod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use averageQPS instead

return key
}

func getNamespaceAndCmd(limiterName string) []string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the values namespace, command string seems to be more friendly

}
}

func (l *LimitersMgr) getLimit(limiterName string, isQps bool) (int64, int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using a boolean as an argument. Refactor it to getQPSLimit and getRateLimit.

"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}},
//"getbit": Desc{Proc: AutoCommit(GetBit), Cons: Constraint{3, flags("r"), 1, 1, 1}},
//"bitcount": Desc{Proc: AutoCommit(BitCount), Cons: Constraint{-2, flags("r"), 1, 1, 1}},
//"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}},

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把实现的命令注释掉有什么考虑吗?

} else {
continue
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanExecute 这个逻辑设计的很好,在一些case 下返回的数据是否有问题。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi
lpush key 1
xxx zz
exec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以理解这个是安全性方面的考虑?我又两个疑问:

  1. 为什么需要这个功能?如果用户发送的是正确的协议,只是命令不对,可以认为是异常用户吗?
  2. 累加 3 次错误命令就要断开连接,是否有点太严格?如果这个功能是必要的,最好能有个配置选项。

"bitcount": BitCount,
//"getbit": GetBit,
//"bitpos": BitPos,
//"bitcount": BitCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

命令已经实现,可以开放使用

//so, if expire at a ts <= now, delete it at once
return kv.txn.Destory(obj, key)
}

if err := expireAt(kv.txn.t, mkey, obj.ID, obj.Type, obj.ExpireAt, at); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个逻辑判断 是否 IsExpired 在这个函数已经校验,或者在这个函数实现更加优雅。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如有已经过期,是否需要返回特殊err,让用户知道设置失败。

if i == 0 {
isQps = true
}
limit, burst := l.getLimit(limiterName, isQps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart but weird, maybe we should define the closure first and reuse it.

Comment on lines +45 to +46
qpsLw LimiterWrapper
rateLw LimiterWrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QPS has the same meaning of request rate, we should use more friendly names as the limitation type like Command and DataFlow

for i := 0; i < EXPIRE_HASH_NUM; i++ {
expireHash := fmt.Sprintf("%04d", i)
go startExpire(sysdb, &conf.Expire, ls, expireHash)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

关于expire 相关逻辑是否可以封装到expier.go 文件一个独立的方法中

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

赞同

expireLogFlag = "[Expire]"
metricsLabel = expire_unhash_worker
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处对于KeyPrefix 的拼接是否可以封装到独立的方法中

@@ -135,94 +200,166 @@ func toTikvScorePrefix(namespace []byte, id DBID, key []byte) []byte {
return b
}

func runExpire(db *DB, batchLimit int) {
func runExpire(db *DB, batchLimit int, expireHash string, lastExpireEndTs int64) int64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

每次传入lastExpireEndTs 作为迭代器开始的位置与每次默认以expireKeyPrefix 开始,性能上差距有多大?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个在实践中发现影响很大,具体跟使用场景非常相关。对于某些场景,比如每天导入新的数据,过期时间为 1 天的场景,大多数数据会集中过期,这些过期的数据被删除后,会变为 Tombstone ,RocksDB 在查找数据时,会遍历并跳过 Tombstone,如果 Tombstone 过多,比如上百万,则极大的影响遍历性能。

txn.Rollback()
zap.L().Error("[Expire] commit failed", zap.Error(err))
zap.L().Error(expireLogFlag+" commit failed", zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit err 的情况下,是否需要return 0


func (cl *CommandLimiter) checkLimit(cmdName string, cmdArgs []string) {
d := cl.qpsLw.waitTime(1)
time.Sleep(d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the Wait method?

return limit, int(burst)
}

func (l *LimitersMgr) CheckLimit(namespace string, cmdName string, cmdArgs []string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for the public method

Comment on lines +230 to +232
//iter get keys [key, upperBound), so using now+1 as 2nd parameter will get "at:now:" prefixed keys
//we seek end in "at:<now>" replace in "at;" , it can reduce the seek range and seek the deleted expired keys as little as possible.
//the behavior should reduce the expire delay in days and get/mget timeout, which are caused by rocksdb tomstone problem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +84 to +85
LimitConnection bool
MaxConnection int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is too verbose to use two variables here. Maybe we should use MaxConnection with value 0 as unlimited.

ListZipThreshold int
LimitConnection bool
MaxConnection int64
MaxConnectionWait int64
Copy link
Contributor

@shafreeck shafreeck May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this variable be used? Why sleep a while when the limitation is exceeded? Why not close the connection immediately?

LimitConnection bool
MaxConnection int64
MaxConnectionWait int64
ClientsNum int64
Copy link
Contributor

@shafreeck shafreeck May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an atomic variable to avoid locks

@@ -92,6 +91,10 @@ type TxnCommand func(ctx *Context, txn *db.Transaction) (OnCommit, error)
func Call(ctx *Context) {
ctx.Name = strings.ToLower(ctx.Name)

if _, ok := txnCommands[ctx.Name]; ok && ctx.Server.LimitersMgr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

txnCommands is not used anymore, use commands instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants