Skip to content

Commit

Permalink
Merge branch 'feat/ttstream-v2' into feat/ttstream-v2-error_handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima authored Oct 10, 2024
2 parents aced7b8 + f4746b8 commit 961dda8
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 118 deletions.
35 changes: 28 additions & 7 deletions pkg/streamx/provider/ttstream/client_provier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ttstream
import (
"context"
"runtime"
"sync/atomic"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/gopkg/protocol/ttheader"
Expand Down Expand Up @@ -78,24 +79,44 @@ func (c clientProvider) NewStream(ctx context.Context, ri rpcinfo.RPCInfo, callO
return nil, err
}

s, err := trans.newStream(ctx, method, intHeader, strHeader)
sio, err := trans.newStreamIO(ctx, method, intHeader, strHeader)
if err != nil {
return nil, err
}
s.setRecvTimeout(rconfig.StreamRecvTimeout())
sio.stream.setRecvTimeout(rconfig.StreamRecvTimeout())
// only client can set meta frame handler
s.setMetaFrameHandler(c.metaHandler)
sio.stream.setMetaFrameHandler(c.metaHandler)

// if ctx from server side, we should cancel the stream when server handler already returned
// TODO: this canceling transmit should be configurable
ktx.RegisterCancelCallback(ctx, func() {
s.cancel()
sio.stream.cancel()
})

cs := newClientStream(s)
cs := newClientStream(sio.stream)
// the END of a client stream means it should send and recv trailer and not hold by user anymore
var ended uint32
sio.setEOFCallback(func() {
// if stream is ended by both parties, put the transport back to pool
sio.stream.close()
if atomic.AddUint32(&ended, 1) == 2 {
if trans.IsActive() {
c.transPool.Put(trans)
}
err = trans.streamDelete(sio.stream.sid)
}
})
runtime.SetFinalizer(cs, func(cstream *clientStream) {
_ = cstream.close()
c.transPool.Put(trans)
// it's safe to call CloseSend twice
// we do repeated CloseSend here to ensure stream can be closed normally
_ = cstream.CloseSend(ctx)
// only delete stream when clientStream be finalized
if atomic.AddUint32(&ended, 1) == 2 {
if trans.IsActive() {
c.transPool.Put(trans)
}
err = trans.streamDelete(sio.stream.sid)
}
})
return cs, err
}
19 changes: 10 additions & 9 deletions pkg/streamx/provider/ttstream/client_trans_pool_longconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,24 @@ type longConnTransPool struct {
}

func (c *longConnTransPool) Get(sinfo *serviceinfo.ServiceInfo, network string, addr string) (trans *transport, err error) {
o := c.transPool.Pop(addr)
if o != nil {
return o.(*transport), nil
for {
o := c.transPool.Pop(addr)
if o == nil {
break
}
trans = o.(*transport)
if trans.IsActive() {
return trans, nil
}
}

// create new connection
conn, err := netpoll.DialConnection(network, addr, time.Second)

Check failure on line 65 in pkg/streamx/provider/ttstream/client_trans_pool_longconn.go

View workflow job for this annotation

GitHub Actions / windows-test

undefined: netpoll.DialConnection
if err != nil {
return nil, err
}
// create new transport
trans = newTransport(clientTransport, sinfo, conn)
_ = conn.AddCloseCallback(func(connection netpoll.Connection) error {
_ = trans.Close()
return nil
})
runtime.SetFinalizer(trans, func(t *transport) { t.Close() })
// create new transport
return trans, nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/streamx/provider/ttstream/container/object_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ func (s *ObjectPool) cleaning() {

now := time.Now()
s.L.Lock()
objSize := len(s.objects)
// update cleanInternal
objSize := 0
for _, stk := range s.objects {
objSize += stk.Size()
}
cleanInternal = time.Second + time.Duration(objSize)*time.Millisecond*10
if cleanInternal > time.Second*10 {
cleanInternal = time.Second * 10
}
// clean objects
for key, stk := range s.objects {
deleted := 0
var oldest *time.Time
Expand All @@ -97,7 +102,7 @@ func (s *ObjectPool) cleaning() {
return true, true
})
if oldest != nil {
klog.Infof("object[%s] pool delete %d objects", key, deleted)
klog.Infof("object[%s] pool deleted %d objects, oldest=%s", key, deleted, oldest.String())
}
}
s.L.Unlock()
Expand Down
63 changes: 41 additions & 22 deletions pkg/streamx/provider/ttstream/container/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (s *Stack[ValueType]) Size() (size int) {
func (s *Stack[ValueType]) RangeDelete(checking func(v ValueType) (deleteNode bool, continueRange bool)) {
// Stop the world!
s.L.Lock()
// range from the stack bottom(oldest item)
node := s.head
deleteNode := false
continueRange := true
Expand All @@ -56,18 +57,19 @@ func (s *Stack[ValueType]) RangeDelete(checking func(v ValueType) (deleteNode bo
last := node.last
next := node.next
// modify last node
if last == nil {
// cur node is head node
s.head = next
} else {
if last != nil {
// change last next ptr
last.next = next
}
// modify next node
if next != nil {
next.last = last
}
if node == s.tail {
// modify link list
if s.head == node {
s.head = next
}
if s.tail == node {
s.tail = last
}
node = node.next
Expand All @@ -76,48 +78,64 @@ func (s *Stack[ValueType]) RangeDelete(checking func(v ValueType) (deleteNode bo
s.L.Unlock()
}

func (s *Stack[ValueType]) Pop() (value ValueType, ok bool) {
var node *doubleLinkNode[ValueType]
s.L.Lock()
func (s *Stack[ValueType]) pop() (node *doubleLinkNode[ValueType]) {
if s.tail == nil {
s.L.Unlock()
return value, false
return nil
}
node = s.tail
if node.last == nil {
// first node
// if node is the only node in the list, clear the whole linklist
s.head = nil
s.tail = nil
} else {
node.last.next = nil
// if node is not the only node in the list, only modify the list's tail
s.tail = node.last
s.tail.next = nil
}
s.size--
return node
}

func (s *Stack[ValueType]) Pop() (value ValueType, ok bool) {
s.L.Lock()
node := s.pop()
s.L.Unlock()
if node == nil {
return value, false
}

value = node.val
node.reset()
s.nodePool.Put(node)
return value, true
}

func (s *Stack[ValueType]) PopBottom() (value ValueType, ok bool) {
var node *doubleLinkNode[ValueType]
s.L.Lock()
func (s *Stack[ValueType]) popBottom() (node *doubleLinkNode[ValueType]) {
if s.head == nil {
s.L.Unlock()
return value, false
return nil
}
node = s.head
s.head = s.head.next
if s.head != nil {
s.head.last = nil
}
if s.tail == node {
if node.next == nil {
// if node is the only node in the list, clear the whole linklist
s.head = nil
s.tail = nil
} else {
// if node is not the only node in the list, only modify the list's head
s.head = s.head.next
s.head.last = nil
}
s.size--
return node
}

func (s *Stack[ValueType]) PopBottom() (value ValueType, ok bool) {
s.L.Lock()
node := s.popBottom()
s.L.Unlock()
if node == nil {
return value, false
}

value = node.val
node.reset()
s.nodePool.Put(node)
Expand All @@ -142,6 +160,7 @@ func (s *Stack[ValueType]) Push(value ValueType) {
s.head = node
s.tail = node
} else {
// not first node
node.last = s.tail
s.tail.next = node
s.tail = node
Expand Down
5 changes: 4 additions & 1 deletion pkg/streamx/provider/ttstream/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/cloudwego/kitex/pkg/streamx"
)

var ErrInvalidStreamKind = errors.New("invalid stream kind")
var (
ErrInvalidStreamKind = errors.New("invalid stream kind")
ErrClosedStream = errors.New("stream is closed")
)

// only for meta frame handler
type IntHeader map[uint16]string
Expand Down
Loading

0 comments on commit 961dda8

Please sign in to comment.