diff --git a/pkg/streamx/provider/ttstream/transport.go b/pkg/streamx/provider/ttstream/transport.go index 060fe97433..4c8201ef59 100644 --- a/pkg/streamx/provider/ttstream/transport.go +++ b/pkg/streamx/provider/ttstream/transport.go @@ -41,8 +41,7 @@ const ( serverTransport int32 = 2 streamCacheSize = 32 - frameCacheSize = 32 - batchWriteSize = 32 + frameCacheSize = 256 ) func isIgnoreError(err error) bool { @@ -55,9 +54,8 @@ type transport struct { conn netpoll.Connection streams sync.Map // key=streamID val=streamIO scache []*stream // size is streamCacheSize - spipe *container.Pipe[*stream] // in-coming stream channel - wchannel chan *Frame - closed chan struct{} + spipe *container.Pipe[*stream] // in-coming stream pipe + fpipe *container.Pipe[*Frame] // out-coming frame pipe closedFlag int32 streamingFlag int32 // flag == 0 means there is no active stream on transport } @@ -67,14 +65,13 @@ func newTransport(kind int32, sinfo *serviceinfo.ServiceInfo, conn netpoll.Conne // TODO: let it configurable _ = conn.SetReadTimeout(time.Minute * 10) t := &transport{ - kind: kind, - sinfo: sinfo, - conn: conn, - streams: sync.Map{}, - spipe: container.NewPipe[*stream](), - scache: make([]*stream, 0, streamCacheSize), - wchannel: make(chan *Frame, frameCacheSize), - closed: make(chan struct{}), + kind: kind, + sinfo: sinfo, + conn: conn, + streams: sync.Map{}, + spipe: container.NewPipe[*stream](), + scache: make([]*stream, 0, streamCacheSize), + fpipe: container.NewPipe[*Frame](), } go func() { err := t.loopRead() @@ -106,9 +103,9 @@ func (t *transport) Close() (err error) { if !atomic.CompareAndSwapInt32(&t.closedFlag, 0, 1) { return nil } - close(t.closed) klog.Debugf("transport[%s] is closing", t.conn.LocalAddr()) t.spipe.Close() + t.fpipe.Close() t.streams.Range(func(key, value any) bool { sio := value.(*streamIO) sio.stream.close() @@ -224,43 +221,32 @@ func (t *transport) loopRead() error { } } -func (t *transport) loopWrite() (err error) { +func (t *transport) loopWrite() error { defer func() { // loop write should help to close connection _ = t.conn.Close() }() writer := newWriterBuffer(t.conn.Writer()) - delay := 0 + fcache := make([]*Frame, frameCacheSize) // Important note: // loopWrite may cannot find stream by sid since it may send trailer and delete sid from streams for { - select { - case <-t.closed: - return nil - case fr, ok := <-t.wchannel: - if !ok { - // closed - return nil - } - select { - case <-t.closed: - // double check closed - return nil - default: - } - + n, err := t.fpipe.Read(context.Background(), fcache) + if err != nil { + return err + } + if n == 0 { + return io.EOF + } + for i := 0; i < n; i++ { + fr := fcache[i] if err = EncodeFrame(context.Background(), writer, fr); err != nil { return err } recycleFrame(fr) - if delay >= batchWriteSize || len(t.wchannel) == 0 { - delay = 0 - if err = t.conn.Writer().Flush(); err != nil { - return err - } - } else { - delay++ - } + } + if err = t.conn.Writer().Flush(); err != nil { + return err } } } @@ -268,8 +254,7 @@ func (t *transport) loopWrite() (err error) { // writeFrame is concurrent safe func (t *transport) writeFrame(sframe streamFrame, meta IntHeader, ftype int32, payload []byte) (err error) { frame := newFrame(sframe, meta, ftype, payload) - t.wchannel <- frame - return nil + return t.fpipe.Write(context.Background(), frame) } func (t *transport) streamSend(ctx context.Context, sid int32, method string, wheader streamx.Header, res any) (err error) {