Skip to content

Commit

Permalink
feat: stream event metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 14, 2024
1 parent 4873336 commit fe7f8cd
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 26 deletions.
5 changes: 5 additions & 0 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (kc *kClient) invokeStreamingEndpoint() (endpoint.Endpoint, error) {

// streamx version streaming mw
kc.sxStreamMW = streamx.StreamMiddlewareChain(kc.opt.StreamXOptions.StreamMWs...)
eventHandler := kc.opt.TracerCtl.GetStreamEventHandler()
if eventHandler != nil {
kc.opt.StreamXOptions.StreamRecvMWs = append(kc.opt.StreamXOptions.StreamRecvMWs, streamx.NewStreamRecvStatMiddleware(eventHandler))
kc.opt.StreamXOptions.StreamSendMWs = append(kc.opt.StreamXOptions.StreamSendMWs, streamx.NewStreamSendStatMiddleware(eventHandler))
}
kc.sxStreamRecvMW = streamx.StreamRecvMiddlewareChain(kc.opt.StreamXOptions.StreamRecvMWs...)
kc.sxStreamSendMW = streamx.StreamSendMiddlewareChain(kc.opt.StreamXOptions.StreamSendMWs...)

Expand Down
19 changes: 2 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,14 @@ github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZ
github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk=
github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8=
github.com/cloudwego/frugal v0.2.0/go.mod h1:cpnV6kdRMjN3ylxRo63RNbZ9rBK6oxs70Zk6QZ4Enj4=
github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 h1:hj/AhlEngERp5Tjt864veEvyK6RglXKcXpxkIOSRfug=
github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw=
github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4 h1:SHw9GUBBcAnLWeK2MtPH7O6YQG9Q2ZZ8koD/4alpLvE=
github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
<<<<<<< HEAD
github.com/cloudwego/localsession v0.1.1 h1:tbK7laDVrYfFDXoBXo4uCGMAxU4qmz2dDm8d4BGBnDo=
github.com/cloudwego/localsession v0.1.1/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
github.com/cloudwego/netpoll v0.6.4 h1:z/dA4sOTUQof6zZIO4QNnLBXsDFFFEos9OOGloR6kno=
github.com/cloudwego/netpoll v0.6.4/go.mod h1:BtM+GjKTdwKoC8IOzD08/+8eEn2gYoiNLipFca6BVXQ=
=======
github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJG0YhQkY=
github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
<<<<<<< HEAD
github.com/cloudwego/netpoll v0.6.5-0.20240905095957-e6ec47be2fe0 h1:2aoCxK8fee7LhwWveg3ORVEDBoMtmTY2NuSAtNGpnFI=
github.com/cloudwego/netpoll v0.6.5-0.20240905095957-e6ec47be2fe0/go.mod h1:BtM+GjKTdwKoC8IOzD08/+8eEn2gYoiNLipFca6BVXQ=
<<<<<<< HEAD
>>>>>>> 9e44721 (feat: support multi service (#1538))
=======
=======
>>>>>>> 2b0e374 (perf: optimise pipe and queue locker)
github.com/cloudwego/netpoll v0.6.5-0.20240911073319-2ec9568b10cf h1:c/K4XrkloCgZp+En3LjbXtqfr0KQwC85utUvdDm76V4=
github.com/cloudwego/netpoll v0.6.5-0.20240911073319-2ec9568b10cf/go.mod h1:BtM+GjKTdwKoC8IOzD08/+8eEn2gYoiNLipFca6BVXQ=
>>>>>>> 968bdfc (chore: fix unit test (#1545))
github.com/cloudwego/runtimex v0.1.0 h1:HG+WxWoj5/CDChDZ7D99ROwvSMkuNXAqt6hnhTTZDiI=
github.com/cloudwego/runtimex v0.1.0/go.mod h1:23vL/HGV0W8nSCHbe084AgEBdDV4rvXenEUMnUNvUd8=
github.com/cloudwego/thriftgo v0.3.17 h1:k0iQe2jEAN1WhPsXWvatwHzoxObUSX2Nw5NqdnywS8k=
Expand Down
5 changes: 0 additions & 5 deletions internal/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/pkg/stats"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/kitex/pkg/transmeta"
"github.com/cloudwego/kitex/pkg/utils"
"github.com/cloudwego/localsession/backup"
Expand Down Expand Up @@ -80,10 +79,6 @@ type Options struct {
Limit Limit

MWBs []endpoint.MiddlewareBuilder
// streamx
SMWs []streamx.StreamMiddleware
SRecvMWs []streamx.StreamRecvMiddleware
SSendMWs []streamx.StreamSendMiddleware

Bus event.Bus
Events event.Queue
Expand Down
2 changes: 1 addition & 1 deletion internal/stream/stream_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// StreamEventHandler is used to handle stream events
type StreamEventHandler func(ctx context.Context, evt stats.Event, err error)
type StreamEventHandler = func(ctx context.Context, evt stats.Event, err error)

type StreamingConfig struct {
RecvMiddlewareBuilders []endpoint.RecvMiddlewareBuilder
Expand Down
7 changes: 5 additions & 2 deletions pkg/remote/trans/streamx/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"log"
"net"
"runtime/debug"
"time"

"github.com/cloudwego/kitex/internal/wpool"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand All @@ -42,6 +43,8 @@ import (
其他接口实际上最终是用来去组装了 transpipeline ....
*/

var streamWorkerPool = wpool.New(128, time.Minute)

type svrTransHandlerFactory struct {
provider streamx.ServerProvider
}
Expand Down Expand Up @@ -100,7 +103,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
return nerr
}
// stream level goroutine
gofunc.GoFunc(ctx, func() {
streamWorkerPool.GoCtx(ctx, func() {
err := t.OnStream(nctx, conn, ss)
if err != nil && !errors.Is(err, io.EOF) {
klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", nerr)
Expand Down
9 changes: 8 additions & 1 deletion pkg/streamx/client_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package streamx

import "time"
import (
"context"
"time"

"github.com/cloudwego/kitex/pkg/stats"
)

type EventHandler func(ctx context.Context, evt stats.Event, err error)

type ClientOptions struct {
RecvTimeout time.Duration
Expand Down
27 changes: 27 additions & 0 deletions pkg/streamx/stream_middleware_internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package streamx

import (
"context"

"github.com/cloudwego/kitex/pkg/stats"
)

func NewStreamRecvStatMiddleware(ehandler EventHandler) StreamRecvMiddleware {
return func(next StreamRecvEndpoint) StreamRecvEndpoint {
return func(ctx context.Context, stream Stream, res any) (err error) {
err = next(ctx, stream, res)
ehandler(ctx, stats.StreamRecv, err)
return err
}
}
}

func NewStreamSendStatMiddleware(ehandler EventHandler) StreamSendMiddleware {
return func(next StreamSendEndpoint) StreamSendEndpoint {
return func(ctx context.Context, stream Stream, res any) (err error) {
err = next(ctx, stream, res)
ehandler(ctx, stats.StreamSend, err)
return err
}
}
}
12 changes: 12 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,18 @@ func (s *server) RegisterService(svcInfo *serviceinfo.ServiceInfo, handler inter
}

registerOpts := internal_server.NewRegisterOptions(opts)
// add trace middlewares
ehandler := s.opt.TracerCtl.GetStreamEventHandler()
if ehandler != nil {
registerOpts.StreamRecvMiddlewares = append(
registerOpts.StreamRecvMiddlewares, streamx.NewStreamRecvStatMiddleware(ehandler),
)
registerOpts.StreamSendMiddlewares = append(
registerOpts.StreamSendMiddlewares, streamx.NewStreamSendStatMiddleware(ehandler),
)
}

// register service
if err := s.svcs.addService(svcInfo, handler, registerOpts); err != nil {
panic(err.Error())
}
Expand Down

0 comments on commit fe7f8cd

Please sign in to comment.