Skip to content

Commit

Permalink
perf: invoker cache and recycle frame
Browse files Browse the repository at this point in the history
perf: mux trans pool

perf: recycle frame and rm err in stream io (#1570)

perf: invoker cache
  • Loading branch information
joway committed Oct 14, 2024
1 parent 3f36585 commit 4873336
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 199 deletions.
74 changes: 54 additions & 20 deletions pkg/streamx/provider/ttstream/client_trans_pool_mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ttstream
import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cloudwego/netpoll"
Expand All @@ -29,43 +30,76 @@ import (

var _ transPool = (*muxTransPool)(nil)

type muxTransList struct {
L sync.RWMutex
size int
cursor uint32
transports []*transport
}

func newMuxTransList(size int) *muxTransList {
tl := new(muxTransList)
tl.size = size
tl.transports = make([]*transport, size)
return tl
}

func (tl *muxTransList) Get(sinfo *serviceinfo.ServiceInfo, network string, addr string) (*transport, error) {
idx := atomic.AddUint32(&tl.cursor, 1) % uint32(tl.size)
tl.L.RLock()
trans := tl.transports[idx]
tl.L.RUnlock()
if trans != nil && trans.IsActive() {
return trans, nil
}

conn, err := netpoll.DialConnection(network, addr, time.Second)
if err != nil {
return nil, err
}
trans = newTransport(clientTransport, sinfo, conn)
_ = conn.AddCloseCallback(func(connection netpoll.Connection) error {
// peer close
_ = trans.Close()
return nil
})
runtime.SetFinalizer(trans, func(trans *transport) {
// self close when not hold by user
_ = trans.Close()
})
tl.L.Lock()
tl.transports[idx] = trans
tl.L.Unlock()
return trans, nil
}

func newMuxTransPool() transPool {
t := new(muxTransPool)
t.poolSize = runtime.GOMAXPROCS(0)
return t
}

type muxTransPool struct {
pool sync.Map // addr:*transport
sflight singleflight.Group
poolSize int
pool sync.Map // addr:*muxTransList
sflight singleflight.Group
}

func (m *muxTransPool) Get(sinfo *serviceinfo.ServiceInfo, network string, addr string) (trans *transport, err error) {
v, ok := m.pool.Load(addr)
if ok {
return v.(*transport), nil
return v.(*muxTransList).Get(sinfo, network, addr)
}

v, err, _ = m.sflight.Do(addr, func() (interface{}, error) {
conn, err := netpoll.DialConnection(network, addr, time.Second)
if err != nil {
return nil, err
}
trans = newTransport(clientTransport, sinfo, conn)
_ = conn.AddCloseCallback(func(connection netpoll.Connection) error {
// peer close
_ = trans.Close()
return nil
})
m.pool.Store(addr, trans)
runtime.SetFinalizer(trans, func(trans *transport) {
// self close when not hold by user
_ = trans.Close()
})
return trans, nil
transList := newMuxTransList(m.poolSize)
m.pool.Store(addr, transList)
return transList, nil
})
if err != nil {
return nil, err
}
return v.(*transport), nil
return v.(*muxTransList).Get(sinfo, network, addr)
}

func (m *muxTransPool) Put(trans *transport) {
Expand Down
22 changes: 3 additions & 19 deletions pkg/streamx/provider/ttstream/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
gopkgthrift "github.com/cloudwego/gopkg/protocol/thrift"
"github.com/cloudwego/gopkg/protocol/ttheader"

"github.com/cloudwego/kitex/pkg/remote/codec/thrift"
"github.com/cloudwego/kitex/pkg/streamx"
)

Expand All @@ -52,7 +51,6 @@ type Frame struct {
meta IntHeader
typ int32
payload []byte
err error
}

func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr *Frame) {
Expand All @@ -69,23 +67,11 @@ func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr
return fr
}

func newErrFrame(err error) (fr *Frame) {
v := framePool.Get()
if v == nil {
fr = new(Frame)
} else {
fr = v.(*Frame)
}
fr.err = err
return fr
}

func recycleFrame(frame *Frame) {
frame.streamFrame = streamFrame{}
frame.meta = nil
frame.typ = 0
frame.payload = nil
frame.err = nil
framePool.Put(frame)
}

Expand Down Expand Up @@ -184,15 +170,13 @@ func DecodeFrame(ctx context.Context, reader bufiox.Reader) (fr *Frame, err erro
return fr, nil
}

var thriftCodec = thrift.NewThriftCodec()

func EncodePayload(ctx context.Context, msg any) ([]byte, error) {
payload, err := thrift.MarshalThriftData(ctx, thriftCodec, msg)
return payload, err
payload := gopkgthrift.FastMarshal(msg.(gopkgthrift.FastCodec))
return payload, nil
}

func DecodePayload(ctx context.Context, payload []byte, msg any) error {
return thrift.UnmarshalThriftData(ctx, thriftCodec, "", payload, msg)
return gopkgthrift.FastUnmarshal(payload, msg.(gopkgthrift.FastCodec))
}

func EncodeException(ctx context.Context, method string, seq int32, ex tException) ([]byte, error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/streamx/provider/ttstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type stream struct {
peerEOF int32
headerSig chan int32
trailerSig chan int32
err error

StreamMeta
metaHandler MetaFrameHandler
Expand Down Expand Up @@ -166,16 +165,18 @@ func (s *stream) readTrailerFrame(fr *Frame) (err error) {
return fmt.Errorf("stream read a unexcept trailer")
}

var exception error
// when server-side returns non-biz error, it will be wrapped as ApplicationException stored in trailer frame payload
if len(fr.payload) > 0 {
_, _, ex := thrift.UnmarshalFastMsg(fr.payload, nil)
s.err = ex.(*thrift.ApplicationException)
} else { // when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header
// exception is type of (*thrift.ApplicationException)
_, _, exception = thrift.UnmarshalFastMsg(fr.payload, nil)
} else {
// when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header
bizErr, err := transmeta.ParseBizStatusErr(fr.trailer)
if err != nil {
s.err = err
exception = err
} else if bizErr != nil {
s.err = bizErr
exception = bizErr
}
}
s.trailer = fr.trailer
Expand All @@ -190,8 +191,8 @@ func (s *stream) readTrailerFrame(fr *Frame) (err error) {
default:
}

klog.Debugf("stream[%d] recv trailer: %v, err: %v", s.sid, s.trailer, s.err)
return s.trans.streamCloseRecv(s)
klog.Debugf("stream[%d] recv trailer: %v, exception: %v", s.sid, s.trailer, exception)
return s.trans.streamCloseRecv(s, exception)
}

func (s *stream) writeTrailer(tl streamx.Trailer) (err error) {
Expand Down
56 changes: 31 additions & 25 deletions pkg/streamx/provider/ttstream/stream_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,66 +26,72 @@ import (
"github.com/cloudwego/kitex/pkg/streamx/provider/ttstream/container"
)

type streamIOMsg struct {
payload []byte
exception error
}

type streamIO struct {
ctx context.Context
trigger chan struct{}
stream *stream
ctx context.Context
trigger chan struct{}
stream *stream
pipe *container.Pipe[streamIOMsg]
cache [1]streamIOMsg
exception error // once has exception, the stream should not work normally again
// eofFlag == 2 when both parties send trailers
eofFlag int32
// eofCallback will be called when eofFlag == 2
// eofCallback will not be called if stream is not be ended in a normal way
eofCallback func()
fpipe *container.Pipe[*Frame]
fcache [1]*Frame
err error
}

func newStreamIO(ctx context.Context, s *stream) *streamIO {
sio := new(streamIO)
sio.ctx = ctx
sio.trigger = make(chan struct{})
sio.stream = s
sio.fpipe = container.NewPipe[*Frame]()
sio.pipe = container.NewPipe[streamIOMsg]()
return sio
}

func (s *streamIO) setEOFCallback(f func()) {
s.eofCallback = f
}

func (s *streamIO) input(ctx context.Context, f *Frame) {
err := s.fpipe.Write(ctx, f)
func (s *streamIO) input(ctx context.Context, msg streamIOMsg) {
err := s.pipe.Write(ctx, msg)
if err != nil {
klog.Errorf("fpipe write failed: %v", err)
klog.Errorf("pipe write failed: %v", err)
}
}

func (s *streamIO) output(ctx context.Context) (f *Frame, err error) {
if s.err != nil {
return nil, s.err
func (s *streamIO) output(ctx context.Context) (msg streamIOMsg, err error) {
if s.exception != nil {
return msg, s.exception
}
n, err := s.fpipe.Read(ctx, s.fcache[:])

n, err := s.pipe.Read(ctx, s.cache[:])
if err != nil {
if errors.Is(err, container.ErrPipeEOF) {
err = io.EOF
}
s.err = err
return nil, s.err
s.exception = err
return msg, s.exception
}
if n == 0 {
s.err = io.EOF
return nil, s.err
s.exception = io.EOF
return msg, s.exception
}
f = s.fcache[0]
if f.err != nil {
s.err = f.err
return nil, s.err
msg = s.cache[0]
if msg.exception != nil {
s.exception = msg.exception
return msg, s.exception
}
return f, nil
return msg, nil
}

func (s *streamIO) closeRecv() {
s.fpipe.Close()
s.pipe.Close()
if atomic.AddInt32(&s.eofFlag, 1) == 2 && s.eofCallback != nil {
s.eofCallback()
}
Expand All @@ -98,5 +104,5 @@ func (s *streamIO) closeSend() {
}

func (s *streamIO) cancel() {
s.fpipe.Cancel()
s.pipe.Cancel()
}
Loading

0 comments on commit 4873336

Please sign in to comment.