Skip to content

Commit

Permalink
fix: pipe dead lock
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 17, 2024
1 parent 2def2a6 commit dd671fb
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 4 additions & 2 deletions pkg/streamx/provider/ttstream/container/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Pipe[Item any] struct {
func NewPipe[Item any]() *Pipe[Item] {
p := new(Pipe[Item])
p.queue = NewQueue[Item]()
p.trigger = make(chan struct{})
p.trigger = make(chan struct{}, 1)
return p
}

Expand Down Expand Up @@ -88,7 +88,8 @@ READ:
if err != nil {
return 0, err
}
return 0, fmt.Errorf("unknown err")
// never happen error
return 0, fmt.Errorf("unknown state error")
}
goto READ
}
Expand All @@ -100,6 +101,7 @@ func (p *Pipe[Item]) Write(ctx context.Context, items ...Item) error {
if err != nil {
return err
}
// never happen error
return fmt.Errorf("unknown state error")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/streamx/provider/ttstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *stream) sendTrailer(ctx context.Context, ex tException) (err error) {
if wtrailer == nil {
return fmt.Errorf("stream trailer already sent")
}
klog.Debugf("transport[%d]-stream[%d] send trialer", s.trans.kind, s.sid)
klog.Debugf("transport[%d]-stream[%d] send trailer", s.trans.kind, s.sid)
return s.trans.streamCloseSend(s.sid, s.method, wtrailer, ex)
}

Expand Down

0 comments on commit dd671fb

Please sign in to comment.