Skip to content

Commit

Permalink
perf: recycle frame and rm err in stream io
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 10, 2024
1 parent 78b52c9 commit 4d23028
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 62 deletions.
22 changes: 3 additions & 19 deletions pkg/streamx/provider/ttstream/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
gopkgthrift "github.com/cloudwego/gopkg/protocol/thrift"
"github.com/cloudwego/gopkg/protocol/ttheader"

"github.com/cloudwego/kitex/pkg/remote/codec/thrift"
"github.com/cloudwego/kitex/pkg/streamx"
)

Expand All @@ -52,7 +51,6 @@ type Frame struct {
meta IntHeader
typ int32
payload []byte
err error
}

func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr *Frame) {
Expand All @@ -69,23 +67,11 @@ func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr
return fr
}

func newErrFrame(err error) (fr *Frame) {
v := framePool.Get()
if v == nil {
fr = new(Frame)
} else {
fr = v.(*Frame)
}
fr.err = err
return fr
}

func recycleFrame(frame *Frame) {
frame.streamFrame = streamFrame{}
frame.meta = nil
frame.typ = 0
frame.payload = nil
frame.err = nil
framePool.Put(frame)
}

Expand Down Expand Up @@ -184,15 +170,13 @@ func DecodeFrame(ctx context.Context, reader bufiox.Reader) (fr *Frame, err erro
return fr, nil
}

var thriftCodec = thrift.NewThriftCodec()

func EncodePayload(ctx context.Context, msg any) ([]byte, error) {
payload, err := thrift.MarshalThriftData(ctx, thriftCodec, msg)
return payload, err
payload := gopkgthrift.FastMarshal(msg.(gopkgthrift.FastCodec))
return payload, nil
}

func DecodePayload(ctx context.Context, payload []byte, msg any) error {
return thrift.UnmarshalThriftData(ctx, thriftCodec, "", payload, msg)
return gopkgthrift.FastUnmarshal(payload, msg.(gopkgthrift.FastCodec))
}

func EncodeException(ctx context.Context, method string, seq int32, ex tException) ([]byte, error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/streamx/provider/ttstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type stream struct {
peerEOF int32
headerSig chan int32
trailerSig chan int32
err error

StreamMeta
metaHandler MetaFrameHandler
Expand Down Expand Up @@ -166,16 +165,18 @@ func (s *stream) readTrailerFrame(fr *Frame) (err error) {
return fmt.Errorf("stream read a unexcept trailer")
}

var exception error
// when server-side returns non-biz error, it will be wrapped as ApplicationException stored in trailer frame payload
if len(fr.payload) > 0 {
_, _, ex := thrift.UnmarshalFastMsg(fr.payload, nil)
s.err = ex.(*thrift.ApplicationException)
} else { // when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header
// exception is type of (*thrift.ApplicationException)
_, _, exception = thrift.UnmarshalFastMsg(fr.payload, nil)
} else {
// when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header
bizErr, err := transmeta.ParseBizStatusErr(fr.trailer)
if err != nil {
s.err = err
exception = err
} else if bizErr != nil {
s.err = bizErr
exception = bizErr
}
}
s.trailer = fr.trailer
Expand All @@ -190,8 +191,8 @@ func (s *stream) readTrailerFrame(fr *Frame) (err error) {
default:
}

klog.Debugf("stream[%d] recv trailer: %v, err: %v", s.sid, s.trailer, s.err)
return s.trans.streamCloseRecv(s)
klog.Debugf("stream[%d] recv trailer: %v, exception: %v", s.sid, s.trailer, exception)
return s.trans.streamCloseRecv(s, exception)
}

func (s *stream) writeTrailer(tl streamx.Trailer) (err error) {
Expand Down
56 changes: 31 additions & 25 deletions pkg/streamx/provider/ttstream/stream_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,66 +26,72 @@ import (
"github.com/cloudwego/kitex/pkg/streamx/provider/ttstream/container"
)

type streamIOMsg struct {
payload []byte
exception error
}

type streamIO struct {
ctx context.Context
trigger chan struct{}
stream *stream
ctx context.Context
trigger chan struct{}
stream *stream
pipe *container.Pipe[streamIOMsg]
cache [1]streamIOMsg
exception error // once has exception, the stream should not work normally again
// eofFlag == 2 when both parties send trailers
eofFlag int32
// eofCallback will be called when eofFlag == 2
// eofCallback will not be called if stream is not be ended in a normal way
eofCallback func()
fpipe *container.Pipe[*Frame]
fcache [1]*Frame
err error
}

func newStreamIO(ctx context.Context, s *stream) *streamIO {
sio := new(streamIO)
sio.ctx = ctx
sio.trigger = make(chan struct{})
sio.stream = s
sio.fpipe = container.NewPipe[*Frame]()
sio.pipe = container.NewPipe[streamIOMsg]()
return sio
}

func (s *streamIO) setEOFCallback(f func()) {
s.eofCallback = f
}

func (s *streamIO) input(ctx context.Context, f *Frame) {
err := s.fpipe.Write(ctx, f)
func (s *streamIO) input(ctx context.Context, msg streamIOMsg) {
err := s.pipe.Write(ctx, msg)
if err != nil {
klog.Errorf("fpipe write failed: %v", err)
klog.Errorf("pipe write failed: %v", err)
}
}

func (s *streamIO) output(ctx context.Context) (f *Frame, err error) {
if s.err != nil {
return nil, s.err
func (s *streamIO) output(ctx context.Context) (msg streamIOMsg, err error) {
if s.exception != nil {
return msg, s.exception
}
n, err := s.fpipe.Read(ctx, s.fcache[:])

n, err := s.pipe.Read(ctx, s.cache[:])
if err != nil {
if errors.Is(err, container.ErrPipeEOF) {
err = io.EOF
}
s.err = err
return nil, s.err
s.exception = err
return msg, s.exception
}
if n == 0 {
s.err = io.EOF
return nil, s.err
s.exception = io.EOF
return msg, s.exception
}
f = s.fcache[0]
if f.err != nil {
s.err = f.err
return nil, s.err
msg = s.cache[0]
if msg.exception != nil {
s.exception = msg.exception
return msg, s.exception
}
return f, nil
return msg, nil
}

func (s *streamIO) closeRecv() {
s.fpipe.Close()
s.pipe.Close()
if atomic.AddInt32(&s.eofFlag, 1) == 2 && s.eofCallback != nil {
s.eofCallback()
}
Expand All @@ -98,5 +104,5 @@ func (s *streamIO) closeSend() {
}

func (s *streamIO) cancel() {
s.fpipe.Cancel()
s.pipe.Cancel()
}
17 changes: 9 additions & 8 deletions pkg/streamx/provider/ttstream/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (t *transport) readFrame(reader bufiox.Reader) error {
if err != nil {
return err
}
defer recycleFrame(fr)
klog.Debugf("transport[%d] DecodeFrame: fr=%v", t.kind, fr)

switch fr.typ {
Expand Down Expand Up @@ -174,7 +175,7 @@ func (t *transport) readFrame(reader bufiox.Reader) error {
// Data Frame: decode and distribute data
sio, ok := t.loadStreamIO(fr.sid)
if ok {
sio.input(context.Background(), fr)
sio.input(context.Background(), streamIOMsg{payload: fr.payload})
} else {
klog.Errorf("transport[%d] read a unknown stream data: sid=%d", t.kind, fr.sid)
}
Expand Down Expand Up @@ -233,6 +234,7 @@ func (t *transport) loopWrite() (err error) {
if err = EncodeFrame(context.Background(), writer, fr); err != nil {
return err
}
recycleFrame(fr)
if delay >= batchWriteSize || len(t.wchannel) == 0 {
delay = 0
if err = t.conn.Writer().Flush(); err != nil {
Expand Down Expand Up @@ -303,24 +305,23 @@ func (t *transport) streamRecv(ctx context.Context, sid int32, data any) (err er
if !ok {
return io.EOF
}
f, err := sio.output(ctx)
msg, err := sio.output(ctx)
if err != nil {
return err
}
err = DecodePayload(context.Background(), f.payload, data.(thrift.FastCodec))
err = DecodePayload(context.Background(), msg.payload, data.(thrift.FastCodec))
// payload will not be access after decode
mcache.Free(f.payload)
recycleFrame(f)
mcache.Free(msg.payload)
return err
}

func (t *transport) streamCloseRecv(s *stream) error {
func (t *transport) streamCloseRecv(s *stream, exception error) error {
sio, ok := t.loadStreamIO(s.sid)
if !ok {
return fmt.Errorf("stream not found in stream map: sid=%d", s.sid)
}
if s.err != nil {
sio.input(context.Background(), newErrFrame(s.err))
if exception != nil {
sio.input(context.Background(), streamIOMsg{exception: exception})
}
sio.closeRecv()
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamx/provider/ttstream/ttstream_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (si *streamingService) UnaryWithErr(ctx context.Context, req *Request) (*Re
func (si *streamingService) ClientStreamWithErr(ctx context.Context, stream ClientStreamingServer[Request, Response]) (res *Response, err error) {
req, err := stream.Recv(ctx)
if err != nil {
klog.Errorf("Server ClientStreamWithErr Recv failed, err={%v}", err)
klog.Errorf("Server ClientStreamWithErr Recv failed, exception={%v}", err)
return nil, err
}
err = buildErr(req)
Expand All @@ -176,7 +176,7 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques
func (si *streamingService) BidiStreamWithErr(ctx context.Context, stream BidiStreamingServer[Request, Response]) error {
req, err := stream.Recv(ctx)
if err != nil {
klog.Errorf("Server BidiStreamWithErr Recv failed, err={%v}", err)
klog.Errorf("Server BidiStreamWithErr Recv failed, exception={%v}", err)
return err
}
err = buildErr(req)
Expand Down

0 comments on commit 4d23028

Please sign in to comment.