Skip to content

Commit

Permalink
online: add writeQuota log
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 26, 2024
1 parent 8526b3a commit 6272718
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
6 changes: 3 additions & 3 deletions pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type cbItem interface {
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
wq *ctxWriteQuota
}

func (*registerStream) isTransportResponseFrame() bool { return false }
Expand All @@ -115,7 +115,7 @@ type headerFrame struct {
endStream bool // Valid on server side.
initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
wq *ctxWriteQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
onOrphaned func(error) // Valid on client-side
}
Expand Down Expand Up @@ -217,7 +217,7 @@ type outStream struct {
state outStreamState
itl *itemList
bytesOutStanding int
wq *writeQuota
wq *ctxWriteQuota

next *outStream
prev *outStream
Expand Down
58 changes: 58 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package grpc

import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/cloudwego/kitex/pkg/klog"
)

// writeQuota is a soft limit on the amount of data a stream can
Expand Down Expand Up @@ -79,6 +82,61 @@ func (w *writeQuota) realReplenish(n int) {
}
}

type ctxWriteQuota struct {
ctx context.Context
quota int32
// get waits on read from when quota goes less than or equal to zero.
// replenish writes on it when quota goes positive again.
ch chan struct{}
// done is triggered in error case.
done <-chan struct{}
// replenish is called by loopyWriter to give quota back to.
// It is implemented as a field so that it can be updated
// by tests.
replenish func(n int)
}

func newCtxWriteQuota(ctx context.Context, sz int32, done <-chan struct{}) *ctxWriteQuota {
w := &ctxWriteQuota{
ctx: ctx,
quota: sz,
ch: make(chan struct{}, 1),
done: done,
}
w.replenish = w.realReplenish
return w
}

func (w *ctxWriteQuota) get(sz int32) error {
for {
curSize := atomic.LoadInt32(&w.quota)
if curSize > 0 {
atomic.AddInt32(&w.quota, -sz)
return nil
}
klog.CtxInfof(w.ctx, "get quota failed, curSize: %d, request sz: %d", curSize, sz)
select {
case <-w.ch:
continue
case <-w.done:
return errStreamDone
}
}
}

func (w *ctxWriteQuota) realReplenish(n int) {
sz := int32(n)
a := atomic.AddInt32(&w.quota, sz)
b := a - sz
if b <= 0 && a > 0 {
select {
case w.ch <- struct{}{}:
klog.CtxInfof(w.ctx, "replenish quota success, curQuota: %d, oriQuota: %d", a, b)
default:
}
}
}

type trInFlow struct {
limit uint32
unacked uint32
Expand Down
4 changes: 2 additions & 2 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func fillStreamFields(s *Stream) {
fields = allocateStreamFields()
}
s.buf = fields.recvBuffer
s.wq = fields.writeQuota
s.wq = newCtxWriteQuota(s.ctx, defaultWriteQuota, nil)
s.wq.done = s.done
}

Expand All @@ -305,7 +305,7 @@ func preallocateForStream() {
func allocateStreamFields() preAllocatedStreamFields {
return preAllocatedStreamFields{
recvBuffer: newRecvBuffer(),
writeQuota: newWriteQuota(defaultWriteQuota, nil),
//writeQuota: newWriteQuota(defaultWriteQuota, nil),
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (t *http2Server) operateHeaders(frame *grpcframe.MetaHeadersFrame, handle f
}
s.ctx = traceCtx(s.ctx, s.method)
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.wq = newCtxWriteQuota(s.ctx, defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
Expand Down
4 changes: 2 additions & 2 deletions pkg/remote/trans/nphttp2/grpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ type Stream struct {
buf *recvBuffer
trReader io.Reader
fc *inFlow
wq *writeQuota
wq *ctxWriteQuota

// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
Expand Down Expand Up @@ -501,7 +501,7 @@ func CreateStream(id uint32, requestRead func(i int)) *Stream {
id: id,
buf: recvBuffer,
trReader: trReader,
wq: newWriteQuota(defaultWriteQuota, nil),
wq: newCtxWriteQuota(context.Background(), defaultWriteQuota, nil),
requestRead: requestRead,
hdrMu: sync.Mutex{},
}
Expand Down

0 comments on commit 6272718

Please sign in to comment.