Skip to content

Commit

Permalink
perf: using fpipe
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 11, 2024
1 parent 38b0de5 commit 744819a
Showing 1 changed file with 26 additions and 41 deletions.
67 changes: 26 additions & 41 deletions pkg/streamx/provider/ttstream/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ const (
serverTransport int32 = 2

streamCacheSize = 32
frameCacheSize = 32
batchWriteSize = 32
frameCacheSize = 256
)

func isIgnoreError(err error) bool {
Expand All @@ -55,9 +54,8 @@ type transport struct {
conn netpoll.Connection
streams sync.Map // key=streamID val=streamIO
scache []*stream // size is streamCacheSize
spipe *container.Pipe[*stream] // in-coming stream channel
wchannel chan *Frame
closed chan struct{}
spipe *container.Pipe[*stream] // in-coming stream pipe
fpipe *container.Pipe[*Frame] // out-coming frame pipe
closedFlag int32
streamingFlag int32 // flag == 0 means there is no active stream on transport
}
Expand All @@ -67,14 +65,13 @@ func newTransport(kind int32, sinfo *serviceinfo.ServiceInfo, conn netpoll.Conne
// TODO: let it configurable
_ = conn.SetReadTimeout(time.Minute * 10)
t := &transport{
kind: kind,
sinfo: sinfo,
conn: conn,
streams: sync.Map{},
spipe: container.NewPipe[*stream](),
scache: make([]*stream, 0, streamCacheSize),
wchannel: make(chan *Frame, frameCacheSize),
closed: make(chan struct{}),
kind: kind,
sinfo: sinfo,
conn: conn,
streams: sync.Map{},
spipe: container.NewPipe[*stream](),
scache: make([]*stream, 0, streamCacheSize),
fpipe: container.NewPipe[*Frame](),
}
go func() {
err := t.loopRead()
Expand Down Expand Up @@ -106,9 +103,9 @@ func (t *transport) Close() (err error) {
if !atomic.CompareAndSwapInt32(&t.closedFlag, 0, 1) {
return nil
}
close(t.closed)
klog.Debugf("transport[%s] is closing", t.conn.LocalAddr())
t.spipe.Close()
t.fpipe.Close()
t.streams.Range(func(key, value any) bool {
sio := value.(*streamIO)
sio.stream.close()
Expand Down Expand Up @@ -224,52 +221,40 @@ func (t *transport) loopRead() error {
}
}

func (t *transport) loopWrite() (err error) {
func (t *transport) loopWrite() error {
defer func() {
// loop write should help to close connection
_ = t.conn.Close()
}()
writer := newWriterBuffer(t.conn.Writer())
delay := 0
fcache := make([]*Frame, frameCacheSize)
// Important note:
// loopWrite may cannot find stream by sid since it may send trailer and delete sid from streams
for {
select {
case <-t.closed:
return nil
case fr, ok := <-t.wchannel:
if !ok {
// closed
return nil
}
select {
case <-t.closed:
// double check closed
return nil
default:
}

n, err := t.fpipe.Read(context.Background(), fcache)
if err != nil {
return err
}
if n == 0 {
return io.EOF
}
for i := 0; i < n; i++ {
fr := fcache[i]
if err = EncodeFrame(context.Background(), writer, fr); err != nil {
return err
}
recycleFrame(fr)
if delay >= batchWriteSize || len(t.wchannel) == 0 {
delay = 0
if err = t.conn.Writer().Flush(); err != nil {
return err
}
} else {
delay++
}
}
if err = t.conn.Writer().Flush(); err != nil {
return err
}
}
}

// writeFrame is concurrent safe
func (t *transport) writeFrame(sframe streamFrame, meta IntHeader, ftype int32, payload []byte) (err error) {
frame := newFrame(sframe, meta, ftype, payload)
t.wchannel <- frame
return nil
return t.fpipe.Write(context.Background(), frame)
}

func (t *transport) streamSend(ctx context.Context, sid int32, method string, wheader streamx.Header, res any) (err error) {
Expand Down

0 comments on commit 744819a

Please sign in to comment.