Skip to content

Commit

Permalink
add more specified log
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Oct 9, 2024
1 parent 6272718 commit 4c90600
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
8 changes: 7 additions & 1 deletion pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,12 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
// to be sent and stream has some stream-level flow control.
func (l *loopyWriter) processData() (bool, error) {
if l.sendQuota == 0 {
klog.Infof("connection quota is 0, connection quota: %d, stream window: %d", l.sendQuota, l.oiws)
return true, nil
}
str := l.activeStreams.dequeue() // Remove the first stream.
if str == nil {
klog.Infof("stream is nil, connection quota: %d, stream window: %d", l.sendQuota, l.oiws)
return true, nil
}
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
Expand Down Expand Up @@ -868,6 +870,7 @@ func (l *loopyWriter) processData() (bool, error) {
maxSize := http2MaxFrameLen
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
str.state = waitingOnStreamQuota
klog.CtxInfof(str.wq.ctx, "stream flow control, connection quota: %d, stream window: %d, bytesOutstanding: %d", l.sendQuota, l.oiws, str.bytesOutStanding)
return false, nil
} else if maxSize > strQuota {
maxSize = strQuota
Expand Down Expand Up @@ -896,7 +899,9 @@ func (l *loopyWriter) processData() (bool, error) {
size := hSize + dSize

// Now that outgoing flow controls are checked we can replenish str's write quota
str.wq.replenish(size)
if stillBlock := str.wq.replenish(size); stillBlock {
klog.CtxInfof(str.wq.ctx, "stream still blocked, connection quota: %d, stream window: %d, bytesOutstanding: %d", l.sendQuota, l.oiws, str.bytesOutStanding)
}
var endStream bool
// If this is the last data message on this stream and all of it can be written in this iteration.
if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
Expand Down Expand Up @@ -929,6 +934,7 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err
}
} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
klog.CtxInfof(str.wq.ctx, "ran out ot stream quota, connection quota: %d, stream window: %d, bytesOutstanding: %d", l.sendQuota, l.oiws, str.bytesOutStanding)
str.state = waitingOnStreamQuota
} else { // Otherwise add it back to the list of active streams.
l.activeStreams.enqueue(str)
Expand Down
20 changes: 13 additions & 7 deletions pkg/remote/trans/nphttp2/grpc/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type ctxWriteQuota struct {
// replenish is called by loopyWriter to give quota back to.
// It is implemented as a field so that it can be updated
// by tests.
replenish func(n int)
replenish func(n int) bool
}

func newCtxWriteQuota(ctx context.Context, sz int32, done <-chan struct{}) *ctxWriteQuota {
Expand Down Expand Up @@ -124,17 +124,23 @@ func (w *ctxWriteQuota) get(sz int32) error {
}
}

func (w *ctxWriteQuota) realReplenish(n int) {
func (w *ctxWriteQuota) realReplenish(n int) bool {
sz := int32(n)
a := atomic.AddInt32(&w.quota, sz)
b := a - sz
if b <= 0 && a > 0 {
select {
case w.ch <- struct{}{}:
klog.CtxInfof(w.ctx, "replenish quota success, curQuota: %d, oriQuota: %d", a, b)
default:
if b <= 0 {
if a > 0 {
select {
case w.ch <- struct{}{}:
klog.CtxInfof(w.ctx, "replenish quota success, curQuota: %d, oriQuota: %d", a, b)
default:
}
} else {
klog.CtxInfof(w.ctx, "replenish quota failed, curQuota: %d, oriQuota: %d", a, b)
return false
}
}
return true
}

type trInFlow struct {
Expand Down

0 comments on commit 4c90600

Please sign in to comment.