From 0ea792fa9643c04a231a3beed26ff54f7ef09ca3 Mon Sep 17 00:00:00 2001 From: Joway Date: Thu, 10 Oct 2024 18:11:12 +0800 Subject: [PATCH] perf: recycle frame and rm err in stream io (#1570) --- pkg/streamx/provider/ttstream/frame.go | 22 +------- pkg/streamx/provider/ttstream/stream.go | 17 +++--- pkg/streamx/provider/ttstream/stream_io.go | 56 ++++++++++--------- pkg/streamx/provider/ttstream/transport.go | 17 +++--- .../provider/ttstream/ttstream_server_test.go | 4 +- 5 files changed, 54 insertions(+), 62 deletions(-) diff --git a/pkg/streamx/provider/ttstream/frame.go b/pkg/streamx/provider/ttstream/frame.go index 97a6bb8938..77b34426a3 100644 --- a/pkg/streamx/provider/ttstream/frame.go +++ b/pkg/streamx/provider/ttstream/frame.go @@ -27,7 +27,6 @@ import ( gopkgthrift "github.com/cloudwego/gopkg/protocol/thrift" "github.com/cloudwego/gopkg/protocol/ttheader" - "github.com/cloudwego/kitex/pkg/remote/codec/thrift" "github.com/cloudwego/kitex/pkg/streamx" ) @@ -52,7 +51,6 @@ type Frame struct { meta IntHeader typ int32 payload []byte - err error } func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr *Frame) { @@ -69,23 +67,11 @@ func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr return fr } -func newErrFrame(err error) (fr *Frame) { - v := framePool.Get() - if v == nil { - fr = new(Frame) - } else { - fr = v.(*Frame) - } - fr.err = err - return fr -} - func recycleFrame(frame *Frame) { frame.streamFrame = streamFrame{} frame.meta = nil frame.typ = 0 frame.payload = nil - frame.err = nil framePool.Put(frame) } @@ -184,15 +170,13 @@ func DecodeFrame(ctx context.Context, reader bufiox.Reader) (fr *Frame, err erro return fr, nil } -var thriftCodec = thrift.NewThriftCodec() - func EncodePayload(ctx context.Context, msg any) ([]byte, error) { - payload, err := thrift.MarshalThriftData(ctx, thriftCodec, msg) - return payload, err + payload := gopkgthrift.FastMarshal(msg.(gopkgthrift.FastCodec)) + return payload, nil } func DecodePayload(ctx context.Context, payload []byte, msg any) error { - return thrift.UnmarshalThriftData(ctx, thriftCodec, "", payload, msg) + return gopkgthrift.FastUnmarshal(payload, msg.(gopkgthrift.FastCodec)) } func EncodeException(ctx context.Context, method string, seq int32, ex tException) ([]byte, error) { diff --git a/pkg/streamx/provider/ttstream/stream.go b/pkg/streamx/provider/ttstream/stream.go index c6196e26a2..720e21a21d 100644 --- a/pkg/streamx/provider/ttstream/stream.go +++ b/pkg/streamx/provider/ttstream/stream.go @@ -75,7 +75,6 @@ type stream struct { peerEOF int32 headerSig chan int32 trailerSig chan int32 - err error StreamMeta metaHandler MetaFrameHandler @@ -166,16 +165,18 @@ func (s *stream) readTrailerFrame(fr *Frame) (err error) { return fmt.Errorf("stream read a unexcept trailer") } + var exception error // when server-side returns non-biz error, it will be wrapped as ApplicationException stored in trailer frame payload if len(fr.payload) > 0 { - _, _, ex := thrift.UnmarshalFastMsg(fr.payload, nil) - s.err = ex.(*thrift.ApplicationException) - } else { // when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header + // exception is type of (*thrift.ApplicationException) + _, _, exception = thrift.UnmarshalFastMsg(fr.payload, nil) + } else { + // when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header bizErr, err := transmeta.ParseBizStatusErr(fr.trailer) if err != nil { - s.err = err + exception = err } else if bizErr != nil { - s.err = bizErr + exception = bizErr } } s.trailer = fr.trailer @@ -190,8 +191,8 @@ func (s *stream) readTrailerFrame(fr *Frame) (err error) { default: } - klog.Debugf("stream[%d] recv trailer: %v, err: %v", s.sid, s.trailer, s.err) - return s.trans.streamCloseRecv(s) + klog.Debugf("stream[%d] recv trailer: %v, exception: %v", s.sid, s.trailer, exception) + return s.trans.streamCloseRecv(s, exception) } func (s *stream) writeTrailer(tl streamx.Trailer) (err error) { diff --git a/pkg/streamx/provider/ttstream/stream_io.go b/pkg/streamx/provider/ttstream/stream_io.go index 65851ca4dd..83b37264e3 100644 --- a/pkg/streamx/provider/ttstream/stream_io.go +++ b/pkg/streamx/provider/ttstream/stream_io.go @@ -26,18 +26,23 @@ import ( "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream/container" ) +type streamIOMsg struct { + payload []byte + exception error +} + type streamIO struct { - ctx context.Context - trigger chan struct{} - stream *stream + ctx context.Context + trigger chan struct{} + stream *stream + pipe *container.Pipe[streamIOMsg] + cache [1]streamIOMsg + exception error // once has exception, the stream should not work normally again // eofFlag == 2 when both parties send trailers eofFlag int32 // eofCallback will be called when eofFlag == 2 // eofCallback will not be called if stream is not be ended in a normal way eofCallback func() - fpipe *container.Pipe[*Frame] - fcache [1]*Frame - err error } func newStreamIO(ctx context.Context, s *stream) *streamIO { @@ -45,7 +50,7 @@ func newStreamIO(ctx context.Context, s *stream) *streamIO { sio.ctx = ctx sio.trigger = make(chan struct{}) sio.stream = s - sio.fpipe = container.NewPipe[*Frame]() + sio.pipe = container.NewPipe[streamIOMsg]() return sio } @@ -53,39 +58,40 @@ func (s *streamIO) setEOFCallback(f func()) { s.eofCallback = f } -func (s *streamIO) input(ctx context.Context, f *Frame) { - err := s.fpipe.Write(ctx, f) +func (s *streamIO) input(ctx context.Context, msg streamIOMsg) { + err := s.pipe.Write(ctx, msg) if err != nil { - klog.Errorf("fpipe write failed: %v", err) + klog.Errorf("pipe write failed: %v", err) } } -func (s *streamIO) output(ctx context.Context) (f *Frame, err error) { - if s.err != nil { - return nil, s.err +func (s *streamIO) output(ctx context.Context) (msg streamIOMsg, err error) { + if s.exception != nil { + return msg, s.exception } - n, err := s.fpipe.Read(ctx, s.fcache[:]) + + n, err := s.pipe.Read(ctx, s.cache[:]) if err != nil { if errors.Is(err, container.ErrPipeEOF) { err = io.EOF } - s.err = err - return nil, s.err + s.exception = err + return msg, s.exception } if n == 0 { - s.err = io.EOF - return nil, s.err + s.exception = io.EOF + return msg, s.exception } - f = s.fcache[0] - if f.err != nil { - s.err = f.err - return nil, s.err + msg = s.cache[0] + if msg.exception != nil { + s.exception = msg.exception + return msg, s.exception } - return f, nil + return msg, nil } func (s *streamIO) closeRecv() { - s.fpipe.Close() + s.pipe.Close() if atomic.AddInt32(&s.eofFlag, 1) == 2 && s.eofCallback != nil { s.eofCallback() } @@ -98,5 +104,5 @@ func (s *streamIO) closeSend() { } func (s *streamIO) cancel() { - s.fpipe.Cancel() + s.pipe.Cancel() } diff --git a/pkg/streamx/provider/ttstream/transport.go b/pkg/streamx/provider/ttstream/transport.go index af4a2be541..fc7666281a 100644 --- a/pkg/streamx/provider/ttstream/transport.go +++ b/pkg/streamx/provider/ttstream/transport.go @@ -142,6 +142,7 @@ func (t *transport) readFrame(reader bufiox.Reader) error { if err != nil { return err } + defer recycleFrame(fr) klog.Debugf("transport[%d] DecodeFrame: fr=%v", t.kind, fr) switch fr.typ { @@ -174,7 +175,7 @@ func (t *transport) readFrame(reader bufiox.Reader) error { // Data Frame: decode and distribute data sio, ok := t.loadStreamIO(fr.sid) if ok { - sio.input(context.Background(), fr) + sio.input(context.Background(), streamIOMsg{payload: fr.payload}) } else { klog.Errorf("transport[%d] read a unknown stream data: sid=%d", t.kind, fr.sid) } @@ -233,6 +234,7 @@ func (t *transport) loopWrite() (err error) { if err = EncodeFrame(context.Background(), writer, fr); err != nil { return err } + recycleFrame(fr) if delay >= batchWriteSize || len(t.wchannel) == 0 { delay = 0 if err = t.conn.Writer().Flush(); err != nil { @@ -303,24 +305,23 @@ func (t *transport) streamRecv(ctx context.Context, sid int32, data any) (err er if !ok { return io.EOF } - f, err := sio.output(ctx) + msg, err := sio.output(ctx) if err != nil { return err } - err = DecodePayload(context.Background(), f.payload, data.(thrift.FastCodec)) + err = DecodePayload(context.Background(), msg.payload, data.(thrift.FastCodec)) // payload will not be access after decode - mcache.Free(f.payload) - recycleFrame(f) + mcache.Free(msg.payload) return err } -func (t *transport) streamCloseRecv(s *stream) error { +func (t *transport) streamCloseRecv(s *stream, exception error) error { sio, ok := t.loadStreamIO(s.sid) if !ok { return fmt.Errorf("stream not found in stream map: sid=%d", s.sid) } - if s.err != nil { - sio.input(context.Background(), newErrFrame(s.err)) + if exception != nil { + sio.input(context.Background(), streamIOMsg{exception: exception}) } sio.closeRecv() return nil diff --git a/pkg/streamx/provider/ttstream/ttstream_server_test.go b/pkg/streamx/provider/ttstream/ttstream_server_test.go index 3f21f24d9c..3eecb2b665 100644 --- a/pkg/streamx/provider/ttstream/ttstream_server_test.go +++ b/pkg/streamx/provider/ttstream/ttstream_server_test.go @@ -159,7 +159,7 @@ func (si *streamingService) UnaryWithErr(ctx context.Context, req *Request) (*Re func (si *streamingService) ClientStreamWithErr(ctx context.Context, stream ClientStreamingServer[Request, Response]) (res *Response, err error) { req, err := stream.Recv(ctx) if err != nil { - klog.Errorf("Server ClientStreamWithErr Recv failed, err={%v}", err) + klog.Errorf("Server ClientStreamWithErr Recv failed, exception={%v}", err) return nil, err } err = buildErr(req) @@ -176,7 +176,7 @@ func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Reques func (si *streamingService) BidiStreamWithErr(ctx context.Context, stream BidiStreamingServer[Request, Response]) error { req, err := stream.Recv(ctx) if err != nil { - klog.Errorf("Server BidiStreamWithErr Recv failed, err={%v}", err) + klog.Errorf("Server BidiStreamWithErr Recv failed, exception={%v}", err) return err } err = buildErr(req)