Skip to content

Commit

Permalink
feat: support ttheader streaming error handling (#1566)
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima authored Oct 10, 2024
1 parent f4746b8 commit d727d89
Show file tree
Hide file tree
Showing 23 changed files with 491 additions and 74 deletions.
7 changes: 6 additions & 1 deletion pkg/remote/trans/streamx/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ func (t *svrTransHandler) OnStream(ctx context.Context, conn net.Conn, ss stream
reqArgs := streamx.NewStreamReqArgs(nil)
resArgs := streamx.NewStreamResArgs(nil)
serr := t.inkHdlFunc(ctx, reqArgs, resArgs)
ctx, err = t.provider.OnStreamFinish(ctx, ss)
if serr == nil {
if bizErr := ri.Invocation().BizStatusErr(); bizErr != nil {
serr = bizErr
}
}
ctx, err = t.provider.OnStreamFinish(ctx, ss, serr)
if err == nil && serr != nil {
err = serr
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamx/client_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/cloudwego/kitex/pkg/rpcinfo"
)

/* Hot it works
/* How it works
clientProvider := xxx.NewClientProvider(xxx.WithXXX(...))
client := {user_gencode}.NewClient({kitex_client}.WithClientProvider(clientProvider))
Expand Down
8 changes: 4 additions & 4 deletions pkg/streamx/provider/jsonrpc/server_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"context"
"net"

"github.com/cloudwego/netpoll"

"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/netpoll"
)

type serverTransCtxKey struct{}
Expand Down Expand Up @@ -70,8 +71,7 @@ func (s serverProvider) OnStream(ctx context.Context, conn net.Conn) (context.Co
return ctx, ss, nil
}

func (s serverProvider) OnStreamFinish(ctx context.Context, ss streamx.ServerStream) (context.Context, error) {
func (s serverProvider) OnStreamFinish(ctx context.Context, ss streamx.ServerStream, err error) (context.Context, error) {
sst := ss.(*serverStream)
err := sst.sendEOF()
return ctx, err
return ctx, sst.sendEOF()
}
3 changes: 2 additions & 1 deletion pkg/streamx/provider/ttstream/client_trans_pool_longconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package ttstream
import (
"time"

"github.com/cloudwego/netpoll"

"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/pkg/streamx/provider/ttstream/container"
"github.com/cloudwego/netpoll"
)

var DefaultLongConnConfig = LongConnConfig{
Expand Down
3 changes: 2 additions & 1 deletion pkg/streamx/provider/ttstream/client_trans_pool_mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"sync"
"time"

"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/netpoll"
"golang.org/x/sync/singleflight"

"github.com/cloudwego/kitex/pkg/serviceinfo"
)

var _ transPool = (*muxTransPool)(nil)
Expand Down
6 changes: 6 additions & 0 deletions pkg/streamx/provider/ttstream/exception.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package ttstream

type tException interface {
Error() string
TypeId() int32
}
18 changes: 18 additions & 0 deletions pkg/streamx/provider/ttstream/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cloudwego/gopkg/bufiox"
gopkgthrift "github.com/cloudwego/gopkg/protocol/thrift"
"github.com/cloudwego/gopkg/protocol/ttheader"

"github.com/cloudwego/kitex/pkg/remote/codec/thrift"
"github.com/cloudwego/kitex/pkg/streamx"
)
Expand All @@ -51,6 +52,7 @@ type Frame struct {
meta IntHeader
typ int32
payload []byte
err error
}

func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr *Frame) {
Expand All @@ -67,11 +69,23 @@ func newFrame(sframe streamFrame, meta IntHeader, typ int32, payload []byte) (fr
return fr
}

func newErrFrame(err error) (fr *Frame) {
v := framePool.Get()
if v == nil {
fr = new(Frame)
} else {
fr = v.(*Frame)
}
fr.err = err
return fr
}

func recycleFrame(frame *Frame) {
frame.streamFrame = streamFrame{}
frame.meta = nil
frame.typ = 0
frame.payload = nil
frame.err = nil
framePool.Put(frame)
}

Expand Down Expand Up @@ -180,3 +194,7 @@ func EncodePayload(ctx context.Context, msg any) ([]byte, error) {
func DecodePayload(ctx context.Context, payload []byte, msg any) error {
return thrift.UnmarshalThriftData(ctx, thriftCodec, "", payload, msg)
}

func EncodeException(ctx context.Context, method string, seq int32, ex tException) ([]byte, error) {
return gopkgthrift.MarshalFastMsg(method, gopkgthrift.EXCEPTION, seq, ex.(gopkgthrift.FastCodec))
}
33 changes: 30 additions & 3 deletions pkg/streamx/provider/ttstream/server_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ package ttstream
import (
"context"
"net"
"strconv"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/gopkg/protocol/thrift"
"github.com/cloudwego/gopkg/protocol/ttheader"
"github.com/cloudwego/netpoll"

"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/kitex/pkg/streamx/provider/ttstream/ktx"
"github.com/cloudwego/netpoll"
"github.com/cloudwego/kitex/pkg/utils"
)

type serverTransCtxKey struct{}
Expand Down Expand Up @@ -86,9 +92,30 @@ func (s serverProvider) OnStream(ctx context.Context, conn net.Conn) (context.Co
return ctx, ss, nil
}

func (s serverProvider) OnStreamFinish(ctx context.Context, ss streamx.ServerStream) (context.Context, error) {
func (s serverProvider) OnStreamFinish(ctx context.Context, ss streamx.ServerStream, err error) (context.Context, error) {
sst := ss.(*serverStream)
_ = sst.close()
var exception tException
if err != nil {
switch err.(type) {
case tException:
exception = err.(tException)
case kerrors.BizStatusErrorIface:
bizErr := err.(kerrors.BizStatusErrorIface)
sst.appendTrailer(
"biz-status", strconv.Itoa(int(bizErr.BizStatusCode())),
"biz-message", bizErr.BizMessage(),
)
if bizErr.BizExtra() != nil {
extra, _ := utils.Map2JSONStr(bizErr.BizExtra())
sst.appendTrailer("biz-extra", extra)
}
default:
exception = thrift.NewApplicationException(remote.InternalError, err.Error())
}
}
if err := sst.close(exception); err != nil {
return nil, err
}

cancelFunc, _ := ctx.Value(serverStreamCancelCtxKey{}).(context.CancelFunc)
if cancelFunc != nil {
Expand Down
48 changes: 40 additions & 8 deletions pkg/streamx/provider/ttstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import (
"sync/atomic"
"time"

"github.com/cloudwego/gopkg/protocol/thrift"
"github.com/cloudwego/gopkg/protocol/ttheader"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/kitex/pkg/transmeta"
)

var (
Expand Down Expand Up @@ -72,6 +75,7 @@ type stream struct {
peerEOF int32
headerSig chan int32
trailerSig chan int32
err error

StreamMeta
metaHandler MetaFrameHandler
Expand Down Expand Up @@ -157,12 +161,24 @@ func (s *stream) sendHeader() (err error) {

// readTrailer by client: unblock recv function and return EOF if no unread frame
// readTrailer by server: unblock recv function and return EOF if no unread frame
func (s *stream) readTrailer(tl streamx.Trailer) (err error) {
func (s *stream) readTrailerFrame(fr *Frame) (err error) {
if !atomic.CompareAndSwapInt32(&s.peerEOF, 0, 1) {
return fmt.Errorf("stream read a unexcept trailer")
}

s.trailer = tl
// when server-side returns non-biz error, it will be wrapped as ApplicationException stored in trailer frame payload
if len(fr.payload) > 0 {
_, _, ex := thrift.UnmarshalFastMsg(fr.payload, nil)
s.err = ex.(*thrift.ApplicationException)
} else { // when server-side returns biz error, payload is empty and biz error information is stored in trailer frame header
bizErr, err := transmeta.ParseBizStatusErr(fr.trailer)
if err != nil {
s.err = err
} else if bizErr != nil {
s.err = bizErr
}
}
s.trailer = fr.trailer
select {
case s.trailerSig <- streamSigActive:
default:
Expand All @@ -173,7 +189,8 @@ func (s *stream) readTrailer(tl streamx.Trailer) (err error) {
// if trailer arrived, we should return unblock stream.Header()
default:
}
klog.Debugf("stream[%d] recv trailer: %v", s.sid, tl)

klog.Debugf("stream[%d] recv trailer: %v, err: %v", s.sid, s.trailer, s.err)
return s.trans.streamCloseRecv(s)
}

Expand All @@ -187,7 +204,22 @@ func (s *stream) writeTrailer(tl streamx.Trailer) (err error) {
return nil
}

func (s *stream) sendTrailer() (err error) {
func (s *stream) appendTrailer(kvs ...string) (err error) {
if len(kvs)%2 != 0 {
return fmt.Errorf("got the odd number of input kvs for Trailer: %d", len(kvs))
}
var key string
for i, str := range kvs {
if i%2 == 0 {
key = str
continue
}
s.wtrailer[key] = str
}
return nil
}

func (s *stream) sendTrailer(ctx context.Context, ex tException) (err error) {
if !atomic.CompareAndSwapInt32(&s.selfEOF, 0, 1) {
return nil
}
Expand All @@ -197,7 +229,7 @@ func (s *stream) sendTrailer() (err error) {
return fmt.Errorf("stream trailer already sent")
}
klog.Debugf("transport[%d]-stream[%d] send trialer", s.trans.kind, s.sid)
return s.trans.streamCloseSend(s.sid, s.method, wtrailer)
return s.trans.streamCloseSend(s.sid, s.method, wtrailer, ex)
}

func (s *stream) finished() bool {
Expand Down Expand Up @@ -244,7 +276,7 @@ func (s *clientStream) RecvMsg(ctx context.Context, req any) error {
}

func (s *clientStream) CloseSend(ctx context.Context) error {
return s.sendTrailer()
return s.sendTrailer(ctx, nil)
}

func newServerStream(s *stream) streamx.ServerStream {
Expand Down Expand Up @@ -272,9 +304,9 @@ func (s *serverStream) SendMsg(ctx context.Context, res any) error {

// close will be called after server handler returned
// after close stream cannot be access again
func (s *serverStream) close() error {
func (s *serverStream) close(ex tException) error {
// write loop should help to delete stream
err := s.sendTrailer()
err := s.sendTrailer(context.Background(), ex)
if err != nil {
return err
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/streamx/provider/ttstream/stream_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type streamIO struct {
eofCallback func()
fpipe *container.Pipe[*Frame]
fcache [1]*Frame
err error
}

func newStreamIO(ctx context.Context, s *stream) *streamIO {
Expand All @@ -60,17 +61,27 @@ func (s *streamIO) input(ctx context.Context, f *Frame) {
}

func (s *streamIO) output(ctx context.Context) (f *Frame, err error) {
if s.err != nil {
return nil, s.err
}
n, err := s.fpipe.Read(ctx, s.fcache[:])
if err != nil {
if errors.Is(err, container.ErrPipeEOF) {
return nil, io.EOF
err = io.EOF
}
return nil, err
s.err = err
return nil, s.err
}
if n == 0 {
return nil, io.EOF
s.err = io.EOF
return nil, s.err
}
f = s.fcache[0]
if f.err != nil {
s.err = f.err
return nil, s.err
}
return s.fcache[0], nil
return f, nil
}

func (s *streamIO) closeRecv() {
Expand Down
Loading

0 comments on commit d727d89

Please sign in to comment.