Skip to content

Commit

Permalink
fix(netpoll): timeout when calling Next
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaost committed Oct 16, 2024
1 parent 811d888 commit 7f9c460
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
30 changes: 28 additions & 2 deletions pkg/remote/trans/netpoll/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package netpoll

import (
"errors"
"io"
"sync"

"github.com/cloudwego/netpoll"
Expand All @@ -35,6 +36,7 @@ func init() {
func NewReaderByteBuffer(r netpoll.Reader) remote.ByteBuffer {
bytebuf := bytebufPool.Get().(*netpollByteBuffer)
bytebuf.reader = r
bytebuf.rd, _ = r.(io.Reader)
bytebuf.status = remote.BitReadable
bytebuf.readSize = 0
return bytebuf
Expand All @@ -53,6 +55,7 @@ func NewReaderWriterByteBuffer(rw netpoll.ReadWriter) remote.ByteBuffer {
bytebuf := bytebufPool.Get().(*netpollByteBuffer)
bytebuf.writer = rw
bytebuf.reader = rw
bytebuf.rd, _ = rw.(io.Reader)
bytebuf.status = remote.BitWritable | remote.BitReadable
return bytebuf
}
Expand All @@ -66,6 +69,8 @@ type netpollByteBuffer struct {
reader netpoll.Reader
status int
readSize int

rd io.Reader // from reader for Read
}

var _ remote.ByteBuffer = &netpollByteBuffer{}
Expand Down Expand Up @@ -107,11 +112,32 @@ func (b *netpollByteBuffer) ReadableLen() (n int) {
}

// Read implement io.Reader
func (b *netpollByteBuffer) Read(p []byte) (n int, err error) {
func (b *netpollByteBuffer) Read(p []byte) (int, error) {
if b.status&remote.BitReadable == 0 {
return -1, errors.New("unreadable buffer, cannot support Read")
}
rb, err := b.reader.Next(len(p))
if b.rd != nil {
// use io.Reader if implemented, it works for netpoll connection.
// but for netpoll.Reader, it doesn't guarantee that
// the underlying implementation has Read method of io.Reader
return b.rd.Read(p)
}

// make sure we have at least one byte to read,
// or Next call may block till timeout
m := b.reader.Len()
if m == 0 {
_, err := b.reader.Peek(1)
if err != nil {
return 0, err
}
m = b.reader.Len() // must >= 1
}
n := len(p)
if n > m {
n = m
}
rb, err := b.reader.Next(n)
b.readSize += len(rb)
return copy(p, rb), err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/remote/trans/netpoll/bytebuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ func TestByteBuffer(t *testing.T) {
test.Assert(t, err == nil)
}

func TestByteBuffer_Read(t *testing.T) {
const teststr = "testing"
buf := &bytes.Buffer{}
buf.WriteString(teststr)
r := NewReaderByteBuffer(netpoll.NewReader(buf))
b := make([]byte, len(teststr)+1)
n, err := r.Read(b)
test.Assert(t, err == nil, err)
test.Assert(t, n == len(teststr), n)
test.Assert(t, string(b[:n]) == teststr)
}

// TestWriterBuffer test writerbytebufferr return writedirect err
func TestWriterBuffer(t *testing.T) {
// 1. prepare mock data
Expand Down

0 comments on commit 7f9c460

Please sign in to comment.