Skip to content

Commit

Permalink
Merge pull request #85 from singchia/feat/server_accept_async
Browse files Browse the repository at this point in the history
server: change accept in goroutine for performance
  • Loading branch information
singchia authored Feb 25, 2024
2 parents f219a10 + 13727a8 commit 3842313
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 30 deletions.
11 changes: 9 additions & 2 deletions application/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type opts struct {
// delegate
dlgt delegate.ApplicationDelegate
// methods
remoteMethods []string
localMethods []*geminio.MethodRPC
remoteMethods []string
remoteMethodCheck bool
localMethods []*geminio.MethodRPC
// callback funcs
acceptStreamFunc func(geminio.Stream)
closedStreamFunc func(geminio.Stream)
Expand Down Expand Up @@ -73,6 +74,12 @@ func OptionWaitRemoteRPCs(methods ...string) EndOption {
}
}

func OptionWithRemoteRPCCheck() EndOption {
return func(end *End) {
end.remoteMethodCheck = true
}
}

func OptionRegisterLocalRPCs(methodRPCs ...*geminio.MethodRPC) EndOption {
return func(end *End) {
end.localMethods = methodRPCs
Expand Down
14 changes: 8 additions & 6 deletions application/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ func (sm *stream) Call(ctx context.Context, method string, req geminio.Request,
return nil, io.EOF
}

// check remote RPC exists
sm.rpcMtx.RLock()
_, ok := sm.remoteRPCs[method]
if !ok {
if sm.opts.remoteMethodCheck {
// check remote RPC exists
sm.rpcMtx.RLock()
_, ok := sm.remoteRPCs[method]
if !ok {
sm.rpcMtx.RUnlock()
return nil, ErrRemoteRPCUnregistered
}
sm.rpcMtx.RUnlock()
return nil, ErrRemoteRPCUnregistered
}
sm.rpcMtx.RUnlock()
// transfer to underlayer packet
pkt := sm.pf.NewRequestPacketWithIDAndSessionID(req.ID(), sm.dg.DialogueID(), []byte(method), req.Data())
if req.Timeout() != 0 {
Expand Down
3 changes: 3 additions & 0 deletions client/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func new(netcn net.Conn, opts ...*EndOptions) (geminio.End, error) {
application.OptionWaitRemoteRPCs(eo.RemoteMethods...),
application.OptionRegisterLocalRPCs(eo.LocalMethods...),
}
if eo.RemoteMethodCheck {
epOpts = append(epOpts, application.OptionWithRemoteRPCCheck())
}
ep, err = application.NewEnd(cn, mp, epOpts...)
if err != nil {
goto ERR
Expand Down
26 changes: 16 additions & 10 deletions client/end_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
)

type EndOptions struct {
Timer timer.Timer
TimerOwner interface{}
PacketFactory packet.PacketFactory
Log log.Logger
Delegate delegate.ClientDelegate
delegate delegate.ClientDelegate
ClientID *uint64
Meta []byte
RemoteMethods []string
LocalMethods []*geminio.MethodRPC
Timer timer.Timer
TimerOwner interface{}
PacketFactory packet.PacketFactory
Log log.Logger
Delegate delegate.ClientDelegate
delegate delegate.ClientDelegate
ClientID *uint64
Meta []byte
RemoteMethods []string
RemoteMethodCheck bool
LocalMethods []*geminio.MethodRPC
}

func (eo *EndOptions) SetTimer(timer timer.Timer) {
Expand Down Expand Up @@ -57,6 +58,10 @@ func (eo *EndOptions) SetWaitRemoteRPCs(methods ...string) {
eo.RemoteMethods = methods
}

func (eo *EndOptions) SetRemoteRPCCheck() {
eo.RemoteMethodCheck = true
}

func (eo *EndOptions) SetRegisterLocalRPCs(methodRPCs ...*geminio.MethodRPC) {
eo.LocalMethods = methodRPCs
}
Expand All @@ -71,6 +76,7 @@ func MergeEndOptions(opts ...*EndOptions) *EndOptions {
if opt == nil {
continue
}
eo.RemoteMethodCheck = opt.RemoteMethodCheck
if opt.Timer != nil {
eo.Timer = opt.Timer
eo.TimerOwner = opt.TimerOwner
Expand Down
1 change: 1 addition & 0 deletions client/end_retry_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func MergeRetryEndOptions(opts ...*RetryEndOptions) *RetryEndOptions {
if opt == nil {
continue
}
eo.RemoteMethodCheck = opt.RemoteMethodCheck
if opt.Timer != nil {
eo.Timer = opt.Timer
eo.TimerOwner = opt.TimerOwner
Expand Down
2 changes: 1 addition & 1 deletion examples/usage/brpc/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func main() {
// the option means all End from server will wait for the rpc registration
opt.SetWaitRemoteRPCs("client-echo")
// pre-register server side method
opt.SetRegisterLocalRPCs(&geminio.MethodRPC{"server-echo", echo})
opt.SetRegisterLocalRPCs(&geminio.MethodRPC{Method: "server-echo", RPC: echo})

ln, err := server.Listen("tcp", "127.0.0.1:8080", opt)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func new(netcn net.Conn, opts ...*EndOptions) (geminio.End, error) {
if eo.ClosedStreamFunc != nil {
epOpts = append(epOpts, application.OptionClosedStreamFunc(eo.ClosedStreamFunc))
}
if eo.RemoteMethodCheck {
epOpts = append(epOpts, application.OptionWithRemoteRPCCheck())
}
ep, err = application.NewEnd(cn, mp, epOpts...)
if err != nil {
goto ERR
Expand Down
22 changes: 14 additions & 8 deletions server/end_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
)

type EndOptions struct {
Timer timer.Timer
TimerOwner interface{}
PacketFactory packet.PacketFactory
Log log.Logger
Delegate delegate.ServerDelegate
ClientID *uint64
RemoteMethods []string
LocalMethods []*geminio.MethodRPC
Timer timer.Timer
TimerOwner interface{}
PacketFactory packet.PacketFactory
Log log.Logger
Delegate delegate.ServerDelegate
ClientID *uint64
RemoteMethods []string
RemoteMethodCheck bool
LocalMethods []*geminio.MethodRPC
// If set AcceptStreamFunc, the AcceptStream should never be called
AcceptStreamFunc func(geminio.Stream)
ClosedStreamFunc func(geminio.Stream)
Expand Down Expand Up @@ -48,6 +49,10 @@ func (eo *EndOptions) SetWaitRemoteRPCs(methods ...string) {
eo.RemoteMethods = methods
}

func (eo *EndOptions) SetRemoteRPCCheck() {
eo.RemoteMethodCheck = true
}

func (eo *EndOptions) SetRegisterLocalRPCs(methodRPCs ...*geminio.MethodRPC) {
eo.LocalMethods = methodRPCs
}
Expand All @@ -70,6 +75,7 @@ func MergeEndOptions(opts ...*EndOptions) *EndOptions {
if opt == nil {
continue
}
eo.RemoteMethodCheck = opt.RemoteMethodCheck
if opt.Timer != nil {
eo.Timer = opt.Timer
eo.TimerOwner = opt.TimerOwner
Expand Down
19 changes: 16 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,39 @@ type Listener interface {
Addr() net.Addr
}

type ret struct {
end geminio.End
err error
}

type listener struct {
opts []*EndOptions
ln net.Listener
ch chan *ret
}

func Listen(network, address string, opts ...*EndOptions) (Listener, error) {
ln, err := net.Listen(network, address)
if err != nil {
return nil, err
}
return &listener{ln: ln, opts: opts}, nil
return &listener{
ln: ln,
opts: opts,
ch: make(chan *ret, 128)}, nil
}

func (ln *listener) AcceptEnd() (geminio.End, error) {
netconn, err := ln.ln.Accept()
if err != nil {
return nil, err
}
end, err := NewEndWithConn(netconn, ln.opts...)
return end, err
go func() {
end, err := NewEndWithConn(netconn, ln.opts...)
ln.ch <- &ret{end, err}
}()
ret, _ := <-ln.ch
return ret.end, ret.err
}

func (ln *listener) Accept() (net.Conn, error) {
Expand Down
55 changes: 55 additions & 0 deletions test/regression/regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package regression

import (
"context"
"encoding/binary"
"errors"
"sync"
"sync/atomic"
"testing"

"github.com/singchia/geminio"
"github.com/singchia/geminio/client"
"github.com/singchia/geminio/server"
"github.com/singchia/geminio/test"
)

Expand Down Expand Up @@ -66,3 +71,53 @@ func TestMessage(t *testing.T) {

<-done
}

func TestServer(t *testing.T) {
network := "tcp"
address := "127.0.0.1:12345"
srv, err := server.Listen(network, address)
if err != nil {
t.Error(err)
return
}
wg := sync.WaitGroup{}
wg.Add(1)

index, count := uint64(0), uint64(1000)
accepted := make([]uint64, count)
go func() {
for {
end, err := srv.AcceptEnd()
if err != nil {
t.Error(err)
return
}
meta := end.Meta()
id := binary.BigEndian.Uint64(meta)
accepted[id-1] = 1
new := atomic.AddUint64(&index, 1)
if new == count {
wg.Done()
}
}
}()

for i := uint64(0); i < count; i++ {
opt := client.NewEndOptions()
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, i+1)
opt.SetMeta(buf)
_, err := client.NewEnd(network, address, opt)
if err != nil {
t.Error(err)
}
}

wg.Wait()
for _, elem := range accepted {
if elem != 1 {
t.Error("failed end exist")
return
}
}
}

0 comments on commit 3842313

Please sign in to comment.