Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qps/rate limit & reduce expire seek range & expire-key hash #204

Open
wants to merge 61 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
db84a39
fix empty command which cause titan to crash in parsing command
Sep 3, 2019
3e4b89e
change empty command's reply error from ErrEmptyArray to ErrEmptyCommand
Sep 3, 2019
c89b27f
close the connection when meet empty command
Sep 3, 2019
43bd6c3
some illegal clients send unreadable command which may cause titan cr…
Sep 6, 2019
95bd04b
if a connection send unknown commands 3 times, close it
Sep 6, 2019
63b131d
add expire left seconds metrics
Sep 18, 2019
a9c28fe
fix expire left time calculating
Sep 18, 2019
e5ff501
when set left/delay current seconds(expire), also set other seconds to 0
Sep 19, 2019
fb11c67
delete zadd args output
Oct 25, 2019
586b30d
first version of rate limit
Nov 7, 2019
27f54e7
1 use rate.limiter to implement limit and read tikv key/value to get …
Nov 21, 2019
e5018e4
qps also can be set burst, its limit also support k/K/m/M suffix
Nov 21, 2019
ecd78a2
1 change log level--limit not set, to debug
Nov 21, 2019
0648ab8
add limit default config items and fix error in config.go
Nov 21, 2019
f86245a
in startSyncNewLimit, just read all match limit once for every comman…
Nov 21, 2019
157d70a
change got limit log trace
Nov 22, 2019
fd0d8cc
add limit cleared log trace, just log limit is trigger when delay > 0
Nov 22, 2019
756a0e6
change limit/commandFunc cost seconds factor from 2 to 1.4
Nov 22, 2019
46bb6da
fix titan active time decoding bug
Nov 22, 2019
a839995
fix limit local percent bug
Nov 22, 2019
815efa4
fix titan active time parsing bug
Nov 22, 2019
9612571
add updateLimitPercent log
Nov 22, 2019
cc4255a
when create commandLimiter, also use localPercent to set limit
Nov 22, 2019
f00d8c5
fix limitersMgr localPercent using bug and lock before use it
Nov 25, 2019
da49bfa
reportLocalStat every globalBalancePeriod,even the commandLimiter is nil
Nov 25, 2019
30f7d2c
limit can also work on auth-disabled titan server
Nov 26, 2019
bcbd5a7
decrease the lock range in reportStat
Nov 26, 2019
1a50695
reportLocalStat run in itself go routine
Nov 26, 2019
a7753c4
1 change how to balance
Nov 29, 2019
fda1e66
update some config item's description and log
Dec 2, 2019
a474656
add limit changed log
Dec 2, 2019
8ed0698
commandLimiter store globalQpsLimit/globalRateLimit in int64, use it …
Dec 2, 2019
6ae93be
when all titan'a limiter qps is < devideUsage of its weight percent, …
Dec 3, 2019
70b3f15
add showlimit.sh and add a conf parameter for setlimit.sh
Dec 4, 2019
a22ca0f
1 resolve the problem that titan still process command for remote clo…
Dec 6, 2019
8403a21
1 BytesArray add judge for write eror
Dec 6, 2019
2d59108
refactor commandLimiter's qpsLimiter/ratelimiter and their globalLimi…
Dec 7, 2019
b034581
fix balance limit's bug
Dec 7, 2019
3b71e69
1 in Exec(), add transaction begin, cmd args num, cmd func, reply fun…
Dec 11, 2019
0fb99b7
simplify the msg recv/send log in debug level
Dec 11, 2019
37c1c68
fix bug of setting cmd arg num metric in Exec()
Dec 11, 2019
9de2ae9
add expire/gc round/seek/commit cost metrics
Dec 12, 2019
6c2328d
prevent connection more and more: when tikv command cost high, client…
Dec 22, 2019
bbfce19
fix bug of connection limit
Dec 25, 2019
93cd7df
1 add metrics range, fix cost exceed max value problem
Feb 18, 2020
7e2521b
Merge branch 'fpxu_dev' into 'master'
Feb 18, 2020
ad7cff1
1 add example for limit-connection and max-connection-wait in titan.toml
Feb 19, 2020
aeca4b3
1 fix unlock bug in limit-connection
Feb 19, 2020
bb617b0
don't lock s.serverctx.lock when limit-connection disabled
Feb 19, 2020
003e6b6
expire seek end in at:now keys to reduce rocksdb tomstone problem
Feb 25, 2020
4423eaf
fix a failed case in tests/redis/unit/expire.tcl
Feb 26, 2020
cc15799
expire seek start in last processed expireKeys to reduce rocksdb toms…
Feb 27, 2020
d9a1baf
fix expire log
Feb 27, 2020
83a370d
also reduce expire seek range when no expire keys or all expire keys …
Feb 27, 2020
d75dd34
also reduce expire seek range when no expire keys or all expire keys …
Feb 28, 2020
3519ff9
comment unsupported string functions in command list, and delete its …
Mar 2, 2020
c4eea83
comment unit test of unsupported string functions in strings_test.go
Mar 2, 2020
bd43ec2
hash expire-key, it will improve the keys num handled every seconds, …
Apr 21, 2020
fe6e84b
1 fix runExpire's problem that unhashed goroutine scan hashed expire-key
Apr 24, 2020
977b457
unhash expire goroutine use individual limit configuration item、expir…
Apr 26, 2020
9161d7e
buckets of comand/limit/commandFunc/txn commit cost is too many to l…
Apr 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions bin/titan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,19 @@ func main() {
}

svr := metrics.NewServer(&config.Status)

limitersMgr, err := db.NewLimitersMgr(store, &config.Tikv.RateLimit)
if err != nil {
zap.L().Fatal("create limitersMgr failed", zap.Error(err))
os.Exit(1)
}
serv := titan.New(&context.ServerContext{
RequirePass: config.Server.Auth,
Store: store,
ListZipThreshold: config.Server.ListZipThreshold,
RequirePass: config.Server.Auth,
Store: store,
ListZipThreshold: config.Server.ListZipThreshold,
LimitersMgr: limitersMgr,
LimitConnection: config.Server.LimitConnection,
MaxConnection: config.Server.MaxConnection,
MaxConnectionWait: config.Server.MaxConnectionWait,
})

var servOpts, statusOpts []continuous.ServerOption
Expand Down
76 changes: 46 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io/ioutil"
"net"
"strings"
"sync"
"time"

"github.com/distributedio/titan/command"
Expand All @@ -22,54 +21,43 @@ type client struct {
exec *command.Executor
r *bufio.Reader

eofLock sync.Mutex //the lock of reading_writing 'eof'
eof bool //is over when read data from socket
remoteClosed bool //is the connection closed by remote peer?
}

func newClient(cliCtx *context.ClientContext, s *Server, exec *command.Executor) *client {
return &client{
cliCtx: cliCtx,
server: s,
exec: exec,
eof: false,
cliCtx: cliCtx,
server: s,
exec: exec,
remoteClosed: false,
}
}

func (c *client) readEof() {
c.eofLock.Lock()
defer c.eofLock.Unlock()

c.eof = true
}

func (c *client) isEof() bool {
c.eofLock.Lock()
defer c.eofLock.Unlock()

return c.eof
}

// Write to conn and log error if needed
func (c *client) Write(p []byte) (int, error) {
zap.L().Debug("write to client", zap.Int64("clientid", c.cliCtx.ID), zap.String("msg", string(p)))
n, err := c.conn.Write(p)
if err != nil {
c.conn.Close()
if err == io.EOF {
zap.L().Info("close connection", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID))
zap.L().Info("connection was half-closed by remote peer", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace))
} else {
//may be unknown error with message "connection reset by peer"
zap.L().Error("write net failed", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID),
zap.String("namespace", c.cliCtx.Namespace),
zap.Bool("multi", c.cliCtx.Multi),
zap.Bool("watching", c.cliCtx.Txn != nil),
zap.String("command", c.cliCtx.LastCmd),
zap.String("error", err.Error()))
return 0, err
}
//client.serve() will get the channel close event, close the connection, exit current go routine
//if the remote client use pipeline to invoke command, then close the connection(timeout etc), titan still get command from client.bufio.Reader and process
//setting client.remoteClosed to true will help client.serve() to interrupt command processing
c.remoteClosed = true
}
return n, nil
//return err for above write() error, then replying many times command can break its sending to a half-closed connection, etc BytesArray(lrange invoke it).
return n, err
}

func (c *client) serve(conn net.Conn) error {
Expand All @@ -78,21 +66,27 @@ func (c *client) serve(conn net.Conn) error {

var cmd []string
var err error
unknownCmdTimes := int(0)
for {
select {
case <-c.cliCtx.Done:
return c.conn.Close()
default:
if c.remoteClosed {
zap.L().Info("close connection", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace))
return c.conn.Close()
}
cmd, err = c.readCommand()
if err != nil {
c.conn.Close()
if err == io.EOF {
zap.L().Info("close connection", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID))
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace))
return nil
}
zap.L().Error("read command failed", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID), zap.Error(err))
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace), zap.Error(err))
return err
}
}
Expand All @@ -102,8 +96,30 @@ func (c *client) serve(conn net.Conn) error {
c.server.servCtx.Pause = 0
}

if len(cmd) <= 0 {
err := command.ErrEmptyCommand
zap.L().Error(err.Error(), zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID))
resp.ReplyError(c, err.Error())
c.conn.Close()
return nil
}

c.cliCtx.Updated = time.Now()
c.cliCtx.LastCmd = cmd[0]
if !c.exec.CanExecute(c.cliCtx.LastCmd) {
err := command.ErrUnKnownCommand(c.cliCtx.LastCmd)
zap.L().Error(err.Error(), zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID))
resp.ReplyError(c, err.Error())
unknownCmdTimes++
if unknownCmdTimes >= 3 {
c.conn.Close()
return nil
} else {
continue
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanExecute 这个逻辑设计的很好,在一些case 下返回的数据是否有问题。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi
lpush key 1
xxx zz
exec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以理解这个是安全性方面的考虑?我又两个疑问:

  1. 为什么需要这个功能?如果用户发送的是正确的协议,只是命令不对,可以认为是异常用户吗?
  2. 累加 3 次错误命令就要断开连接,是否有点太严格?如果这个功能是必要的,最好能有个配置选项。


ctx := &command.Context{
Name: cmd[0],
Expand All @@ -114,7 +130,6 @@ func (c *client) serve(conn net.Conn) error {
}

ctx.Context = context.New(c.cliCtx, c.server.servCtx)
zap.L().Debug("recv msg", zap.String("command", ctx.Name), zap.Strings("arguments", ctx.Args))

// Skip reply if necessary
if c.cliCtx.SkipN != 0 {
Expand All @@ -127,7 +142,8 @@ func (c *client) serve(conn net.Conn) error {
env.Write(zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID),
zap.String("traceid", ctx.TraceID),
zap.String("command", ctx.Name))
zap.String("command", ctx.Name),
zap.Strings("arguments", ctx.Args))
}

c.exec.Execute(ctx)
Expand Down
51 changes: 29 additions & 22 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,27 @@ func Integer(w io.Writer, v int64) OnCommit {
// BytesArray replies a [][]byte when commit
func BytesArray(w io.Writer, a [][]byte) OnCommit {
return func() {
start := time.Now()
resp.ReplyArray(w, len(a))
zap.L().Debug("reply array size", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
start = time.Now()
if _, err := resp.ReplyArray(w, len(a)); err != nil {
return
}
for i := range a {
if a[i] == nil {
resp.ReplyNullBulkString(w)
if err := resp.ReplyNullBulkString(w); err != nil {
return
}
continue
}
resp.ReplyBulkString(w, string(a[i]))
if i % 10 == 9 {
zap.L().Debug("reply 10 bulk string", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
start = time.Now()
if err := resp.ReplyBulkString(w, string(a[i])); err != nil {
return
}
}
}
}

func BytesArrayOnce(w io.Writer, a [][]byte) OnCommit {
return func() {
resp.ReplyStringArray(w, a)
}
return func() {
resp.ReplyStringArray(w, a)
}
}

// TxnCommand runs a command in transaction
Expand All @@ -92,6 +91,10 @@ type TxnCommand func(ctx *Context, txn *db.Transaction) (OnCommit, error)
func Call(ctx *Context) {
ctx.Name = strings.ToLower(ctx.Name)

if _, ok := txnCommands[ctx.Name]; ok && ctx.Server.LimitersMgr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

txnCommands is not used anymore, use commands instead.

ctx.Server.LimitersMgr.CheckLimit(ctx.Client.Namespace, ctx.Name, ctx.Args)
}

if ctx.Name != "auth" &&
ctx.Server.RequirePass != "" &&
ctx.Client.Authenticated == false {
Expand Down Expand Up @@ -182,18 +185,16 @@ func AutoCommit(cmd TxnCommand) Command {
return func(ctx *Context) {
retry.Ensure(ctx, func() error {
mt := metrics.GetMetrics()
start := time.Now()
start := time.Now()
txn, err := ctx.Client.DB.Begin()
key := ""
if len(ctx.Args) > 0 {
key = ctx.Args[0]
if len(ctx.Args) > 1 {
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(len(ctx.Args)-1))
}
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(len(ctx.Args)))
}
cost := time.Since(start).Seconds()
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mt.TxnBeginHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
mt.TxnBeginHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
if err != nil {
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc()
resp.ReplyError(ctx.Out, "ERR "+err.Error())
Expand All @@ -208,8 +209,8 @@ func AutoCommit(cmd TxnCommand) Command {
start = time.Now()
onCommit, err := cmd(ctx, txn)
cost = time.Since(start).Seconds()
zap.L().Debug("command done", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mt.CommandFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("command done", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
if err != nil {
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc()
resp.ReplyError(ctx.Out, err.Error())
Expand Down Expand Up @@ -257,8 +258,8 @@ func AutoCommit(cmd TxnCommand) Command {
onCommit()
}
cost = time.Since(start).Seconds()
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mt.ReplyFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mtFunc()
return nil
})
Expand All @@ -277,9 +278,9 @@ func feedMonitors(ctx *Context) {
id := strconv.FormatInt(int64(ctx.Client.DB.ID), 10)

line := ts + " [" + id + " " + ctx.Client.RemoteAddr + "]" + " " + ctx.Name + " " + strings.Join(ctx.Args, " ")
start := time.Now()
start := time.Now()
err := resp.ReplySimpleString(mCtx.Out, line)
zap.L().Debug("feedMonitors reply", zap.String("name", ctx.Name), zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
zap.L().Debug("feedMonitors reply", zap.String("name", ctx.Name), zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
if err != nil {
ctx.Server.Monitors.Delete(k)
}
Expand All @@ -299,6 +300,12 @@ func NewExecutor() *Executor {
return &Executor{txnCommands: txnCommands, commands: commands}
}

func (e *Executor) CanExecute(cmd string) bool {
lowerName := strings.ToLower(cmd)
_, ok := commands[lowerName]
return ok
}

// Execute a command
func (e *Executor) Execute(ctx *Context) {
start := time.Now()
Expand Down
2 changes: 2 additions & 0 deletions command/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var (
// ErrEmptyArray error
ErrEmptyArray = errors.New("EmptyArray error")

ErrEmptyCommand = errors.New("command is empty")

//ErrExec exec without multi
ErrExec = errors.New("ERR EXEC without MULTI")

Expand Down
48 changes: 25 additions & 23 deletions command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ func init() {
"getset": GetSet,
"getrange": GetRange,
// "msetnx": MSetNx,
"setnx": SetNx,
"setex": SetEx,
"psetex": PSetEx,
"setrange": SetRange,
"setbit": SetBit,
"setnx": SetNx,
"setex": SetEx,
"psetex": PSetEx,
//"setrange": SetRange,
//"setbit": SetBit,
// "bitop": BitOp,
// "bitfield": BitField,
"getbit": GetBit,
"bitpos": BitPos,
"bitcount": BitCount,
//"getbit": GetBit,
//"bitpos": BitPos,
//"bitcount": BitCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

命令已经实现,可以开放使用

"incr": Incr,
"incrby": IncrBy,
"decr": Decr,
Expand Down Expand Up @@ -117,6 +117,8 @@ func init() {

// transactions, exec and discard should called explicitly, so they are registered here
"multi": Desc{Proc: Multi, Cons: Constraint{1, flags("sF"), 0, 0, 0}},
"exec": Desc{Proc: Exec, Cons: Constraint{1, flags("sF"), 0, 0, 0}},
"discard": Desc{Proc: Discard, Cons: Constraint{1, flags("sF"), 0, 0, 0}},
"watch": Desc{Proc: Watch, Cons: Constraint{-2, flags("sF"), 1, -1, 1}},
"unwatch": Desc{Proc: Unwatch, Cons: Constraint{1, flags("sF"), 0, 0, 0}},

Expand All @@ -133,29 +135,29 @@ func init() {
"rpushx": Desc{Proc: AutoCommit(RPushx), Cons: Constraint{-3, flags("wmF"), 1, 1, 1}},

// strings
"get": Desc{Proc: AutoCommit(Get), Cons: Constraint{2, flags("rF"), 1, 1, 1}},
"set": Desc{Proc: AutoCommit(Set), Cons: Constraint{-3, flags("wm"), 1, 1, 1}},
"setnx": Desc{Proc: AutoCommit(SetNx), Cons: Constraint{3, flags("wmF"), 1, 1, 1}},
"setex": Desc{Proc: AutoCommit(SetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
"psetex": Desc{Proc: AutoCommit(PSetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
"mget": Desc{Proc: AutoCommit(MGet), Cons: Constraint{-2, flags("rF"), 1, -1, 1}},
"mset": Desc{Proc: AutoCommit(MSet), Cons: Constraint{-3, flags("wm"), 1, -1, 2}},
"msetnx": Desc{Proc: AutoCommit(MSetNx), Cons: Constraint{-3, flags("wm"), 1, -1, 2}},
"strlen": Desc{Proc: AutoCommit(Strlen), Cons: Constraint{2, flags("rF"), 1, 1, 1}},
"append": Desc{Proc: AutoCommit(Append), Cons: Constraint{3, flags("wm"), 1, 1, 1}},
"setrange": Desc{Proc: AutoCommit(SetRange), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
"get": Desc{Proc: AutoCommit(Get), Cons: Constraint{2, flags("rF"), 1, 1, 1}},
"set": Desc{Proc: AutoCommit(Set), Cons: Constraint{-3, flags("wm"), 1, 1, 1}},
"setnx": Desc{Proc: AutoCommit(SetNx), Cons: Constraint{3, flags("wmF"), 1, 1, 1}},
"setex": Desc{Proc: AutoCommit(SetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
"psetex": Desc{Proc: AutoCommit(PSetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
"mget": Desc{Proc: AutoCommit(MGet), Cons: Constraint{-2, flags("rF"), 1, -1, 1}},
"mset": Desc{Proc: AutoCommit(MSet), Cons: Constraint{-3, flags("wm"), 1, -1, 2}},
//"msetnx": Desc{Proc: AutoCommit(MSetNx), Cons: Constraint{-3, flags("wm"), 1, -1, 2}}, //run test in tests/redis/unit/type/string failed
"strlen": Desc{Proc: AutoCommit(Strlen), Cons: Constraint{2, flags("rF"), 1, 1, 1}},
"append": Desc{Proc: AutoCommit(Append), Cons: Constraint{3, flags("wm"), 1, 1, 1}},
//"setrange": Desc{Proc: AutoCommit(SetRange), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, //run test in tests/redis/unit/type/string failed
"getrange": Desc{Proc: AutoCommit(GetRange), Cons: Constraint{4, flags("r"), 1, 1, 1}},
"incr": Desc{Proc: AutoCommit(Incr), Cons: Constraint{2, flags("wmF"), 1, 1, 1}},
"decr": Desc{Proc: AutoCommit(Decr), Cons: Constraint{2, flags("wmF"), 1, 1, 1}},
"incrby": Desc{Proc: AutoCommit(IncrBy), Cons: Constraint{3, flags("wmF"), 1, 1, 1}},
"decrby": Desc{Proc: AutoCommit(DecrBy), Cons: Constraint{3, flags("wmF"), 1, 1, 1}},
"incrbyfloat": Desc{Proc: AutoCommit(IncrByFloat), Cons: Constraint{3, flags("wmF"), 1, 1, 1}},
"setbit": Desc{Proc: AutoCommit(SetBit), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
//"setbit": Desc{Proc: AutoCommit(SetBit), Cons: Constraint{4, flags("wm"), 1, 1, 1}},
// "bitop": Desc{Proc: AutoCommit(BitOp), Cons: Constraint{-4, flags("wm"), 2, -1, 1}},
// "bitfield": Desc{Proc: AutoCommit(BitField), Cons: Constraint{-2, flags("wm"), 1, 1, 1}},
"getbit": Desc{Proc: AutoCommit(GetBit), Cons: Constraint{3, flags("r"), 1, 1, 1}},
"bitcount": Desc{Proc: AutoCommit(BitCount), Cons: Constraint{-2, flags("r"), 1, 1, 1}},
"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}},
//"getbit": Desc{Proc: AutoCommit(GetBit), Cons: Constraint{3, flags("r"), 1, 1, 1}},
//"bitcount": Desc{Proc: AutoCommit(BitCount), Cons: Constraint{-2, flags("r"), 1, 1, 1}},
//"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}},

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把实现的命令注释掉有什么考虑吗?

// keys
"type": Desc{Proc: AutoCommit(Type), Cons: Constraint{2, flags("rF"), 1, 1, 1}},
Expand Down
3 changes: 3 additions & 0 deletions command/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func PSetEx(ctx *Context, txn *db.Transaction) (OnCommit, error) {
if err != nil {
return nil, ErrInteger
}
if ui <= 0 {
return nil, ErrExpireSetEx
}
unit := ui * uint64(time.Millisecond)
if err := s.Set([]byte(ctx.Args[2]), int64(unit)); err != nil {
return nil, errors.New("ERR " + err.Error())
Expand Down
Loading