diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1127c83340..9eb99a227b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -48,7 +48,7 @@ jobs: go-version: ${{ matrix.go }} cache: false # don't use cache for self-hosted runners - name: Unit Test - run: go test -race -covermode=atomic ./... + run: go test -v -race -covermode=atomic ./... codegen-test: runs-on: ubuntu-latest diff --git a/pkg/remote/trans/streamx/server_handler.go b/pkg/remote/trans/streamx/server_handler.go index 73490ad261..ede3ec40bb 100644 --- a/pkg/remote/trans/streamx/server_handler.go +++ b/pkg/remote/trans/streamx/server_handler.go @@ -107,7 +107,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error { streamWorkerPool.GoCtx(ctx, func() { err := t.OnStream(nctx, conn, ss) if err != nil && !errors.Is(err, io.EOF) { - klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", nerr) + klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", err) } }) } diff --git a/pkg/streamx/streamx_user_test.go b/pkg/streamx/streamx_user_test.go index 646f8d49b7..e5e97c2444 100644 --- a/pkg/streamx/streamx_user_test.go +++ b/pkg/streamx/streamx_user_test.go @@ -102,8 +102,6 @@ func TestStreamingBasic(t *testing.T) { err = next(ctx, stream, res) if err == nil { atomic.AddInt32(&serverRecvCount, 1) - } else { - log.Printf("server recv middleware err=%v", err) } return err } @@ -113,8 +111,6 @@ func TestStreamingBasic(t *testing.T) { err = next(ctx, stream, req) if err == nil { atomic.AddInt32(&serverSendCount, 1) - } else { - log.Printf("server send middleware err=%v", err) } return err } @@ -243,12 +239,12 @@ func TestStreamingBasic(t *testing.T) { test.Assert(t, err == nil, err) test.Assert(t, req.Type == res.Type, res.Type) test.Assert(t, req.Message == res.Message, res.Message) - test.Assert(t, serverRecvCount == 1, serverRecvCount) - test.Assert(t, serverSendCount == 1, serverSendCount) atomic.AddInt32(&serverStreamCount, -1) waitServerStreamDone() - serverRecvCount = 0 - serverSendCount = 0 + test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(1)) + test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(1)) + atomic.StoreInt32(&serverRecvCount, 0) + atomic.StoreInt32(&serverSendCount, 0) // client stream round := 5 @@ -267,12 +263,12 @@ func TestStreamingBasic(t *testing.T) { test.Assert(t, res.Message == "ClientStream", res.Message) atomic.AddInt32(&serverStreamCount, -1) waitServerStreamDone() - test.DeepEqual(t, serverRecvCount, int32(round)) - test.Assert(t, serverSendCount == 1, serverSendCount) testHeaderAndTrailer(t, cs) + test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(round)) + test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(1)) + atomic.StoreInt32(&serverRecvCount, 0) + atomic.StoreInt32(&serverSendCount, 0) cs = nil - serverRecvCount = 0 - serverSendCount = 0 runtime.GC() // server stream @@ -295,12 +291,12 @@ func TestStreamingBasic(t *testing.T) { test.Assert(t, err == nil, err) atomic.AddInt32(&serverStreamCount, -1) waitServerStreamDone() - test.Assert(t, serverRecvCount == 1, serverRecvCount) - test.Assert(t, serverSendCount == int32(received), serverSendCount, received) testHeaderAndTrailer(t, ss) + test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(1)) + test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(received)) + atomic.StoreInt32(&serverRecvCount, 0) + atomic.StoreInt32(&serverSendCount, 0) ss = nil - serverRecvCount = 0 - serverSendCount = 0 runtime.GC() // bidi stream @@ -331,7 +327,6 @@ func TestStreamingBasic(t *testing.T) { i := 0 for { res, err := bs.Recv(ctx) - t.Log(res, err) if errors.Is(err, io.EOF) { break } @@ -345,10 +340,10 @@ func TestStreamingBasic(t *testing.T) { }() } waitServerStreamDone() - test.Assert(t, serverRecvCount == int32(concurrent*round), serverRecvCount) - test.Assert(t, serverSendCount == int32(concurrent*round), serverSendCount) - serverRecvCount = 0 - serverSendCount = 0 + test.DeepEqual(t, atomic.LoadInt32(&serverRecvCount), int32(concurrent*round)) + test.DeepEqual(t, atomic.LoadInt32(&serverSendCount), int32(concurrent*round)) + atomic.StoreInt32(&serverRecvCount, 0) + atomic.StoreInt32(&serverSendCount, 0) runtime.GC() t.Logf("=== UnaryWithErr normalErr ===")