Skip to content

Commit

Permalink
fix: stream server close sio pipe when peer close (#1578)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored Oct 16, 2024
1 parent fe7f8cd commit dd52e7f
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 237 deletions.
1 change: 1 addition & 0 deletions client/client_streamx.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (kc *kClient) NewStream(ctx context.Context, method string, req any, callOp
// it's an ugly trick but if we don't want to refactor too much,
// this is the only way to compatible with current endpoint design
err = kc.sEps(ctx, req, streamArgs)
kc.opt.TracerCtl.DoFinish(ctx, ri, err)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/streamx/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
其他接口实际上最终是用来去组装了 transpipeline ....
*/

var streamWorkerPool = wpool.New(128, time.Minute)
var streamWorkerPool = wpool.New(128, time.Second)

type svrTransHandlerFactory struct {
provider streamx.ServerProvider
Expand Down
22 changes: 12 additions & 10 deletions pkg/streamx/provider/ttstream/client_provier.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,16 @@ func (c clientProvider) NewStream(ctx context.Context, ri rpcinfo.RPCInfo, callO
// if ctx from server side, we should cancel the stream when server handler already returned
// TODO: this canceling transmit should be configurable
ktx.RegisterCancelCallback(ctx, func() {
sio.stream.cancel()
sio.cancel()
})

cs := newClientStream(sio.stream)
// the END of a client stream means it should send and recv trailer and not hold by user anymore
var ended uint32
sio.setEOFCallback(func() {
// if stream is ended by both parties, put the transport back to pool
sio.stream.close()
if atomic.AddUint32(&ended, 1) == 2 {
if trans.IsActive() {
c.transPool.Put(trans)
}
err = trans.streamDelete(sio.stream.sid)
_ = c.streamFinalize(sio, trans)
}
})
runtime.SetFinalizer(cs, func(cstream *clientStream) {
Expand All @@ -112,11 +108,17 @@ func (c clientProvider) NewStream(ctx context.Context, ri rpcinfo.RPCInfo, callO
_ = cstream.CloseSend(ctx)
// only delete stream when clientStream be finalized
if atomic.AddUint32(&ended, 1) == 2 {
if trans.IsActive() {
c.transPool.Put(trans)
}
err = trans.streamDelete(sio.stream.sid)
_ = c.streamFinalize(sio, trans)
}
})
return cs, err
}

func (c clientProvider) streamFinalize(sio *streamIO, trans *transport) error {
sio.close()
err := trans.streamDelete(sio.stream.sid)
if trans.IsActive() {
c.transPool.Put(trans)
}
return err
}
19 changes: 14 additions & 5 deletions pkg/streamx/provider/ttstream/container/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Pipe[Item any] struct {
func NewPipe[Item any]() *Pipe[Item] {
p := new(Pipe[Item])
p.queue = NewQueue[Item]()
p.trigger = make(chan struct{}, 1)
p.trigger = make(chan struct{})
return p
}

Expand Down Expand Up @@ -86,6 +86,7 @@ READ:
if err != nil {
return 0, err
}
return 0, fmt.Errorf("unknown err")
}
goto READ
}
Expand All @@ -112,11 +113,19 @@ func (p *Pipe[Item]) Write(ctx context.Context, items ...Item) error {
}

func (p *Pipe[Item]) Close() {
atomic.StoreInt32(&p.state, pipeStateClosed)
close(p.trigger)
select {
case <-p.trigger:
default:
atomic.StoreInt32(&p.state, pipeStateClosed)
close(p.trigger)
}
}

func (p *Pipe[Item]) Cancel() {
atomic.StoreInt32(&p.state, pipeStateCanceled)
close(p.trigger)
select {
case <-p.trigger:
default:
atomic.StoreInt32(&p.state, pipeStateCanceled)
close(p.trigger)
}
}
4 changes: 0 additions & 4 deletions pkg/streamx/provider/ttstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,6 @@ func (s *stream) RecvMsg(ctx context.Context, req any) error {
return s.trans.streamRecv(ctx, s.sid, req)
}

func (s *stream) cancel() {
_ = s.trans.streamCancel(s)
}

func newClientStream(s *stream) *clientStream {
cs := &clientStream{stream: s}
return cs
Expand Down
6 changes: 6 additions & 0 deletions pkg/streamx/provider/ttstream/stream_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (s *streamIO) closeSend() {
}
}

func (s *streamIO) close() {
s.stream.close()
s.pipe.Close()
}

func (s *streamIO) cancel() {
s.pipe.Cancel()
s.stream.close()
}
11 changes: 1 addition & 10 deletions pkg/streamx/provider/ttstream/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (t *transport) Close() (err error) {
t.fpipe.Close()
t.streams.Range(func(key, value any) bool {
sio := value.(*streamIO)
sio.stream.close()
sio.close()
_ = t.streamDelete(sio.stream.sid)
return true
})
Expand Down Expand Up @@ -312,15 +312,6 @@ func (t *transport) streamCloseRecv(s *stream, exception error) error {
return nil
}

func (t *transport) streamCancel(s *stream) (err error) {
sio, ok := t.loadStreamIO(s.sid)
if !ok {
return fmt.Errorf("stream not found in stream map: sid=%d", s.sid)
}
sio.cancel()
return nil
}

func (t *transport) streamDelete(sid int32) (err error) {
// remove stream from transport
_, ok := t.streams.LoadAndDelete(sid)
Expand Down
203 changes: 0 additions & 203 deletions pkg/streamx/provider/ttstream/transport_test.go

This file was deleted.

Loading

0 comments on commit dd52e7f

Please sign in to comment.