Skip to content

Commit

Permalink
Merge pull request #3 from u0x01/master
Browse files Browse the repository at this point in the history
Dep: pkg/errors & go.uber.org/zap
  • Loading branch information
wongoo authored May 23, 2019
2 parents d200327 + 070dca7 commit 50264ef
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 321 deletions.
34 changes: 16 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
)

import (
"github.com/AlexStocks/goext/net"
log "github.com/dubbogo/log4go"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
perrors "github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -131,15 +129,15 @@ func (c *client) dialTCP() Session {
return nil
}
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
return newTCPSession(conn, c)
}

log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
Expand All @@ -163,12 +161,12 @@ func (c *client) dialUDP() Session {
return nil
}
conn, err = net.DialUDP("udp", localAddr, peerAddr)
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err != nil {
log.Warn("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
Expand All @@ -178,18 +176,18 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(time.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warn("conn.Write(%s) = {length:%d, err:%s}", string(connectPingPackage), length, jerrors.ErrorStack(err))
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
length, err = conn.Read(buf)
if netErr, ok := jerrors.Cause(err).(net.Error); ok && netErr.Timeout() {
if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() {
err = nil
}
if err != nil {
log.Info("conn{%#v}.Read() = {length:%d, err:%s}", conn, length, jerrors.ErrorStack(err))
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
conn.Close()
// time.Sleep(connInterval)
<-wheel.After(connInterval)
Expand All @@ -215,8 +213,8 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
Expand All @@ -229,7 +227,7 @@ func (c *client) dialWS() Session {
return ss
}

log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
Expand All @@ -256,7 +254,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error{%s}", c.cert, jerrors.ErrorStack(err)))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, err))
}

var cert tls.Certificate
Expand All @@ -278,7 +276,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %s\n", jerrors.ErrorStack(err)))
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", err))
}
for _, root = range roots {
certPool.AddCert(root)
Expand All @@ -294,7 +292,7 @@ func (c *client) dialWSS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
Expand All @@ -308,7 +306,7 @@ func (c *client) dialWSS() Session {
return ss
}

log.Info("websocket.dialer.Dial(addr:%s) = error{%s}", c.addr, jerrors.ErrorStack(err))
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
Expand Down Expand Up @@ -399,7 +397,7 @@ func (c *client) reConnect() {
// c.Unlock()
for {
if c.IsClosed() {
log.Warn("client{peer:%s} goroutine exit now.", c.addr)
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
break
}

Expand Down
59 changes: 29 additions & 30 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@ import (
)

import (
log "github.com/dubbogo/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
perrors "github.com/pkg/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

var (
launchTime = time.Now()

// ErrInvalidConnection = errors.New("connection has been closed.")
// ErrInvalidConnection = perrors.New("connection has been closed.")
)

/////////////////////////////////////////
Expand Down Expand Up @@ -193,10 +192,10 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
defer t.lock.Unlock()
n, err = t.flusher.Write(p)
if err != nil {
return n, jerrors.Trace(err)
return n, perrors.WithStack(err)
}
if err := t.flusher.Flush(); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}

return n, nil
Expand Down Expand Up @@ -243,16 +242,16 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
currentTime = time.Now()
if currentTime.Sub(t.rLastDeadline) > (t.rTimeout >> 2) {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
t.rLastDeadline = currentTime
}
}

length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length))
return length, jerrors.Trace(err)
return length, perrors.WithStack(err)
//return length, err
}

Expand All @@ -267,7 +266,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
)

if p, ok = pkg.([]byte); !ok {
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
Expand All @@ -276,7 +275,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
currentTime = time.Now()
if currentTime.Sub(t.wLastDeadline) > (t.wTimeout >> 2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout)); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
t.wLastDeadline = currentTime
}
Expand All @@ -285,8 +284,8 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
}
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
return length, jerrors.Trace(err)
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
return length, perrors.WithStack(err)
//return length, err
}

Expand All @@ -299,7 +298,7 @@ func (t *gettyTCPConn) close(waitSec int) {
if t.conn != nil {
if writer, ok := t.writer.(*snappy.Writer); ok {
if err := writer.Close(); err != nil {
log.Error("snappy.Writer.Close() = error{%s}", jerrors.ErrorStack(err))
log.Errorf("snappy.Writer.Close() = error:%+v", err)
}
}
t.conn.(*net.TCPConn).SetLinger(waitSec)
Expand Down Expand Up @@ -333,7 +332,7 @@ func setUDPSocketOptions(conn *net.UDPConn) error {
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return jerrors.Trace(err4)
return perrors.WithStack(err4)
}
return nil
}
Expand Down Expand Up @@ -393,20 +392,20 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
currentTime = time.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, jerrors.Trace(err)
return 0, nil, perrors.WithStack(err)
}
u.rLastDeadline = currentTime
}
}

length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debug("ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}", length, addr, err)
log.Debugf("ReadFromUDP() = {length:%d, peerAddr:%s, error:%v}", length, addr, err)
if err == nil {
atomic.AddUint32(&u.readBytes, uint32(length))
}

//return length, addr, err
return length, addr, jerrors.Trace(err)
return length, addr, perrors.WithStack(err)
}

// write udp packet, @ctx should be of type UDPContext
Expand All @@ -422,10 +421,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
)

if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, jerrors.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
return 0, perrors.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
}
if buf, ok = ctx.Pkg.([]byte); !ok {
return 0, jerrors.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
return 0, perrors.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
}
if u.ss.EndPoint().EndPointType() == UDP_ENDPOINT {
peerAddr = ctx.PeerAddr
Expand All @@ -441,7 +440,7 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
currentTime = time.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wTimeout >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
u.wLastDeadline = currentTime
}
Expand All @@ -450,9 +449,9 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
}
log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err)
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)

return length, jerrors.Trace(err)
return length, perrors.WithStack(err)
//return length, err
}

Expand Down Expand Up @@ -529,7 +528,7 @@ func (w *gettyWSConn) handlePing(message string) error {
w.UpdateActive()
}

return jerrors.Trace(err)
return perrors.WithStack(err)
}

func (w *gettyWSConn) handlePong(string) error {
Expand All @@ -546,11 +545,11 @@ func (w *gettyWSConn) read() ([]byte, error) {
w.incReadPkgNum()
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warn("websocket unexpected close error: %v", e)
log.Warnf("websocket unexpected close error: %v", e)
}
}

return b, jerrors.Trace(e)
return b, perrors.WithStack(e)
//return b, e
}

Expand All @@ -567,7 +566,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
currentTime = time.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wTimeout >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout)); err != nil {
return jerrors.Trace(err)
return perrors.WithStack(err)
}
w.wLastDeadline = currentTime
}
Expand All @@ -585,25 +584,25 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
)

if p, ok = pkg.([]byte); !ok {
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}

w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
}
return len(p), jerrors.Trace(err)
return len(p), perrors.WithStack(err)
//return len(p), err
}

func (w *gettyWSConn) writePing() error {
w.updateWriteDeadline()
return jerrors.Trace(w.conn.WriteMessage(websocket.PingMessage, []byte{}))
return perrors.WithStack(w.conn.WriteMessage(websocket.PingMessage, []byte{}))
}

func (w *gettyWSConn) writePong(message []byte) error {
w.updateWriteDeadline()
return jerrors.Trace(w.conn.WriteMessage(websocket.PongMessage, message))
return perrors.WithStack(w.conn.WriteMessage(websocket.PongMessage, message))
}

// close websocket connection
Expand Down
9 changes: 5 additions & 4 deletions getty.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ package getty

import (
"compress/flate"
"errors"
"net"
"time"

perrors "github.com/pkg/errors"
)

// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
Expand Down Expand Up @@ -119,9 +120,9 @@ type Connection interface {
/////////////////////////////////////////

var (
ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked")
ErrNullPeerAddr = errors.New("peer address is nil")
ErrSessionClosed = perrors.New("session Already Closed")
ErrSessionBlocked = perrors.New("session Full Blocked")
ErrNullPeerAddr = perrors.New("peer address is nil")
)

type Session interface {
Expand Down
Loading

0 comments on commit 50264ef

Please sign in to comment.