diff --git a/pkg/streamx/provider/ttstream/container/pipe.go b/pkg/streamx/provider/ttstream/container/pipe.go index 791670261b..08ec9ea727 100644 --- a/pkg/streamx/provider/ttstream/container/pipe.go +++ b/pkg/streamx/provider/ttstream/container/pipe.go @@ -51,7 +51,7 @@ type Pipe[Item any] struct { func NewPipe[Item any]() *Pipe[Item] { p := new(Pipe[Item]) p.queue = NewQueue[Item]() - p.trigger = make(chan struct{}) + p.trigger = make(chan struct{}, 1) return p } @@ -88,7 +88,8 @@ READ: if err != nil { return 0, err } - return 0, fmt.Errorf("unknown err") + // never happen error + return 0, fmt.Errorf("unknown state error") } goto READ } @@ -100,6 +101,7 @@ func (p *Pipe[Item]) Write(ctx context.Context, items ...Item) error { if err != nil { return err } + // never happen error return fmt.Errorf("unknown state error") } diff --git a/pkg/streamx/provider/ttstream/stream.go b/pkg/streamx/provider/ttstream/stream.go index d0de8bf45e..3bdb35dc25 100644 --- a/pkg/streamx/provider/ttstream/stream.go +++ b/pkg/streamx/provider/ttstream/stream.go @@ -206,7 +206,7 @@ func (s *stream) sendTrailer(ctx context.Context, ex tException) (err error) { if wtrailer == nil { return fmt.Errorf("stream trailer already sent") } - klog.Debugf("transport[%d]-stream[%d] send trialer", s.trans.kind, s.sid) + klog.Debugf("transport[%d]-stream[%d] send trailer", s.trans.kind, s.sid) return s.trans.streamCloseSend(s.sid, s.method, wtrailer, ex) }