Skip to content

Commit

Permalink
Fix bug in datagram message due to shared buffer (#34)
Browse files Browse the repository at this point in the history
This bug was introduced by me in 8487663
and I'm sorry for that. The problem is that as the DatagramMessage
channel is buffered we cannot simply use the same buffer for reading all
incoming datagrams, because reading will change the []byte for all
messages still buffered in the channel.

I added some tests that fail on the previous version in the hope of
avoiding this same mistake in the future.

The solution was using a sync.Pool instance to acquire a buffer and
putting it back on the pool only when it was received and processed.
This solution has the same advantages of reusing the buffer without
the risk of changing a buffer still in use. And for some reason it
seems even faster than simply using the same buffer:

benchmark                           old ns/op     new ns/op     delta
BenchmarkDatagramNoFormatting-4     2693          2398          -10.95%

benchmark                           old MB/s     new MB/s     speedup
BenchmarkDatagramNoFormatting-4     17.45        19.59        1.12x

benchmark                           old allocs     new allocs     delta
BenchmarkDatagramNoFormatting-4     4              5              +25.00%

benchmark                           old bytes     new bytes     delta
BenchmarkDatagramNoFormatting-4     368           487           +32.34%
  • Loading branch information
cezarsa authored and mcuadros committed Oct 11, 2016
1 parent e2fcef3 commit 9cf13b7
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
10 changes: 8 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ type Server struct {
lastError error
readTimeoutMilliseconds int64
tlsPeerNameFunc TlsPeerNameFunc
datagramPool sync.Pool
}

//NewServer returns a new Server
func NewServer() *Server {
return &Server{tlsPeerNameFunc: defaultTlsPeerName}
return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{
New: func() interface{} {
return make([]byte, 65536)
},
}}
}

//Sets the syslog format (RFC3164 or RFC5424 or RFC6587)
Expand Down Expand Up @@ -319,8 +324,8 @@ func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) {
s.wait.Add(1)
go func() {
defer s.wait.Done()
buf := make([]byte, 65536)
for {
buf := s.datagramPool.Get().([]byte)
n, addr, err := packetconn.ReadFrom(buf)
if err == nil {
// Ignore trailing control characters and NULs
Expand Down Expand Up @@ -366,6 +371,7 @@ func (s *Server) goParseDatagrams() {
} else {
s.parser(msg.message, msg.client, "")
}
s.datagramPool.Put(msg.message[:cap(msg.message)])
}
}
}()
Expand Down
55 changes: 55 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,58 @@ func (s *ServerSuite) TestUDPAutomatic5424Plus6587OctetCount(c *C) {
c.Check(handler.LastMessageLength, Equals, int64(len(exampleRFC5424Syslog)))
c.Check(handler.LastError, IsNil)
}

type handlerSlow struct {
*handlerCounter
contents []string
}

func (s *handlerSlow) Handle(logParts format.LogParts, msgLen int64, err error) {
if len(s.contents) == 0 {
time.Sleep(time.Second)
}
s.contents = append(s.contents, logParts["content"].(string))
s.handlerCounter.Handle(logParts, msgLen, err)
}

func (s *ServerSuite) TestUDPRace(c *C) {
handler := &handlerSlow{handlerCounter: &handlerCounter{expected: 3, done: make(chan struct{})}}
server := NewServer()
server.SetFormat(Automatic)
server.SetHandler(handler)
server.SetTimeout(10)
server.ListenUDP("127.0.0.1:0")
server.Boot()
conn, err := net.Dial("udp", server.connections[0].LocalAddr().String())
c.Assert(err, IsNil)
_, err = conn.Write([]byte(exampleSyslog + "1"))
c.Assert(err, IsNil)
_, err = conn.Write([]byte(exampleSyslog + "2"))
c.Assert(err, IsNil)
_, err = conn.Write([]byte(exampleSyslog + "3"))
c.Assert(err, IsNil)
conn.Close()
<-handler.done
c.Check(handler.contents, DeepEquals, []string{"content1", "content2", "content3"})
}

func (s *ServerSuite) TestTCPRace(c *C) {
handler := &handlerSlow{handlerCounter: &handlerCounter{expected: 3, done: make(chan struct{})}}
server := NewServer()
server.SetFormat(Automatic)
server.SetHandler(handler)
server.SetTimeout(10)
server.ListenTCP("127.0.0.1:0")
server.Boot()
conn, err := net.Dial("tcp", server.listeners[0].Addr().String())
c.Assert(err, IsNil)
_, err = conn.Write([]byte(exampleSyslog + "1\n"))
c.Assert(err, IsNil)
_, err = conn.Write([]byte(exampleSyslog + "2\n"))
c.Assert(err, IsNil)
_, err = conn.Write([]byte(exampleSyslog + "3\n"))
c.Assert(err, IsNil)
conn.Close()
<-handler.done
c.Check(handler.contents, DeepEquals, []string{"content1", "content2", "content3"})
}

0 comments on commit 9cf13b7

Please sign in to comment.