diff --git a/pkg/remote/trans/nphttp2/grpc/controlbuf.go b/pkg/remote/trans/nphttp2/grpc/controlbuf.go index 7e3f8bce45..20996c0935 100644 --- a/pkg/remote/trans/nphttp2/grpc/controlbuf.go +++ b/pkg/remote/trans/nphttp2/grpc/controlbuf.go @@ -830,10 +830,12 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error { // to be sent and stream has some stream-level flow control. func (l *loopyWriter) processData() (bool, error) { if l.sendQuota == 0 { + klog.Infof("connection quota is 0, connection quota: %d, stream window: %d", l.sendQuota, l.oiws) return true, nil } str := l.activeStreams.dequeue() // Remove the first stream. if str == nil { + klog.Infof("stream is nil, connection quota: %d, stream window: %d", l.sendQuota, l.oiws) return true, nil } dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. @@ -868,6 +870,7 @@ func (l *loopyWriter) processData() (bool, error) { maxSize := http2MaxFrameLen if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. str.state = waitingOnStreamQuota + klog.CtxInfof(str.wq.ctx, "stream flow control, connection quota: %d, stream window: %d, bytesOutstanding: %d", l.sendQuota, l.oiws, str.bytesOutStanding) return false, nil } else if maxSize > strQuota { maxSize = strQuota @@ -896,7 +899,9 @@ func (l *loopyWriter) processData() (bool, error) { size := hSize + dSize // Now that outgoing flow controls are checked we can replenish str's write quota - str.wq.replenish(size) + if stillBlock := str.wq.replenish(size); stillBlock { + klog.CtxInfof(str.wq.ctx, "stream still blocked, connection quota: %d, stream window: %d, bytesOutstanding: %d", l.sendQuota, l.oiws, str.bytesOutStanding) + } var endStream bool // If this is the last data message on this stream and all of it can be written in this iteration. if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size { @@ -929,6 +934,7 @@ func (l *loopyWriter) processData() (bool, error) { return false, err } } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. + klog.CtxInfof(str.wq.ctx, "ran out ot stream quota, connection quota: %d, stream window: %d, bytesOutstanding: %d", l.sendQuota, l.oiws, str.bytesOutStanding) str.state = waitingOnStreamQuota } else { // Otherwise add it back to the list of active streams. l.activeStreams.enqueue(str) diff --git a/pkg/remote/trans/nphttp2/grpc/flowcontrol.go b/pkg/remote/trans/nphttp2/grpc/flowcontrol.go index d3833d88e3..ec98206cb8 100644 --- a/pkg/remote/trans/nphttp2/grpc/flowcontrol.go +++ b/pkg/remote/trans/nphttp2/grpc/flowcontrol.go @@ -93,7 +93,7 @@ type ctxWriteQuota struct { // replenish is called by loopyWriter to give quota back to. // It is implemented as a field so that it can be updated // by tests. - replenish func(n int) + replenish func(n int) bool } func newCtxWriteQuota(ctx context.Context, sz int32, done <-chan struct{}) *ctxWriteQuota { @@ -124,17 +124,23 @@ func (w *ctxWriteQuota) get(sz int32) error { } } -func (w *ctxWriteQuota) realReplenish(n int) { +func (w *ctxWriteQuota) realReplenish(n int) bool { sz := int32(n) a := atomic.AddInt32(&w.quota, sz) b := a - sz - if b <= 0 && a > 0 { - select { - case w.ch <- struct{}{}: - klog.CtxInfof(w.ctx, "replenish quota success, curQuota: %d, oriQuota: %d", a, b) - default: + if b <= 0 { + if a > 0 { + select { + case w.ch <- struct{}{}: + klog.CtxInfof(w.ctx, "replenish quota success, curQuota: %d, oriQuota: %d", a, b) + default: + } + } else { + klog.CtxInfof(w.ctx, "replenish quota failed, curQuota: %d, oriQuota: %d", a, b) + return false } } + return true } type trInFlow struct {