Skip to content

Commit

Permalink
chore: fix server recv/send count
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 17, 2024
1 parent 059c67c commit 4e639ca
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
go-version: ${{ matrix.go }}
cache: false # don't use cache for self-hosted runners
- name: Unit Test
run: go test -race -covermode=atomic ./...
run: go test -v -race -covermode=atomic ./...

codegen-test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/streamx/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
streamWorkerPool.GoCtx(ctx, func() {
err := t.OnStream(nctx, conn, ss)
if err != nil && !errors.Is(err, io.EOF) {
klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", nerr)
klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", err)
}
})
}
Expand Down
37 changes: 16 additions & 21 deletions pkg/streamx/streamx_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ func TestStreamingBasic(t *testing.T) {
err = next(ctx, stream, res)
if err == nil {
atomic.AddInt32(&serverRecvCount, 1)
} else {
log.Printf("server recv middleware err=%v", err)
}
return err
}
Expand All @@ -113,8 +111,6 @@ func TestStreamingBasic(t *testing.T) {
err = next(ctx, stream, req)
if err == nil {
atomic.AddInt32(&serverSendCount, 1)
} else {
log.Printf("server send middleware err=%v", err)
}
return err
}
Expand Down Expand Up @@ -243,12 +239,12 @@ func TestStreamingBasic(t *testing.T) {
test.Assert(t, err == nil, err)
test.Assert(t, req.Type == res.Type, res.Type)
test.Assert(t, req.Message == res.Message, res.Message)
test.Assert(t, serverRecvCount == 1, serverRecvCount)
test.Assert(t, serverSendCount == 1, serverSendCount)
atomic.AddInt32(&serverStreamCount, -1)
waitServerStreamDone()
serverRecvCount = 0
serverSendCount = 0
test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(1))
test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(1))
atomic.StoreInt32(&serverRecvCount, 0)
atomic.StoreInt32(&serverSendCount, 0)

// client stream
round := 5
Expand All @@ -267,12 +263,12 @@ func TestStreamingBasic(t *testing.T) {
test.Assert(t, res.Message == "ClientStream", res.Message)
atomic.AddInt32(&serverStreamCount, -1)
waitServerStreamDone()
test.DeepEqual(t, serverRecvCount, int32(round))
test.Assert(t, serverSendCount == 1, serverSendCount)
testHeaderAndTrailer(t, cs)
test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(round))
test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(1))
atomic.StoreInt32(&serverRecvCount, 0)
atomic.StoreInt32(&serverSendCount, 0)
cs = nil
serverRecvCount = 0
serverSendCount = 0
runtime.GC()

// server stream
Expand All @@ -295,12 +291,12 @@ func TestStreamingBasic(t *testing.T) {
test.Assert(t, err == nil, err)
atomic.AddInt32(&serverStreamCount, -1)
waitServerStreamDone()
test.Assert(t, serverRecvCount == 1, serverRecvCount)
test.Assert(t, serverSendCount == int32(received), serverSendCount, received)
testHeaderAndTrailer(t, ss)
test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(1))
test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(received))
atomic.StoreInt32(&serverRecvCount, 0)
atomic.StoreInt32(&serverSendCount, 0)
ss = nil
serverRecvCount = 0
serverSendCount = 0
runtime.GC()

// bidi stream
Expand Down Expand Up @@ -331,7 +327,6 @@ func TestStreamingBasic(t *testing.T) {
i := 0
for {
res, err := bs.Recv(ctx)
t.Log(res, err)
if errors.Is(err, io.EOF) {
break
}
Expand All @@ -345,10 +340,10 @@ func TestStreamingBasic(t *testing.T) {
}()
}
waitServerStreamDone()
test.Assert(t, serverRecvCount == int32(concurrent*round), serverRecvCount)
test.Assert(t, serverSendCount == int32(concurrent*round), serverSendCount)
serverRecvCount = 0
serverSendCount = 0
test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(concurrent*round))
test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(concurrent*round))
atomic.StoreInt32(&serverRecvCount, 0)
atomic.StoreInt32(&serverSendCount, 0)
runtime.GC()

t.Logf("=== UnaryWithErr normalErr ===")
Expand Down

0 comments on commit 4e639ca

Please sign in to comment.