Skip to content

Commit

Permalink
pkg/bpf - downsize flow_id to 32 bits, use Hasher sync.Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-mo committed Jan 28, 2020
1 parent ad437fd commit 8940173
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 25 deletions.
4 changes: 2 additions & 2 deletions internal/sinks/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *InfluxSink) push(e bpf.Event) {

// Create a point and add to batch.
tags := map[string]string{
"flow_id": strconv.FormatUint(e.FlowID, 10),
"flow_id": strconv.FormatUint(uint64(e.FlowID), 10),
"src_addr": e.SrcAddr.String(),
"dst_addr": e.DstAddr.String(),
"dst_port": strconv.FormatUint(uint64(e.DstPort), 10),
Expand All @@ -164,7 +164,7 @@ func (s *InfluxSink) push(e bpf.Event) {
// generally available. Only send signed ints for now until this is more widely deployed.
fields := map[string]interface{}{
// Include flow_id in both fields and tags so it can be used in both aggregations and selections.
"flow_id": strconv.FormatUint(e.FlowID, 10),
"flow_id": strconv.FormatUint(uint64(e.FlowID), 10),
"bytes_orig": int64(e.BytesOrig),
"bytes_ret": int64(e.BytesRet),
"bytes_total": int64(e.BytesOrig + e.BytesRet),
Expand Down
37 changes: 28 additions & 9 deletions pkg/bpf/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@ package bpf
import (
"encoding/binary"
"fmt"
"hash"
"net"
"sync"
"unsafe"

"lukechampine.com/blake3"
)

var hashPool = sync.Pool{
New: func() interface{} {
// Output size is 32 bits.
return blake3.New(4, nil)
},
}

// EventLength is the length of the struct sent by BPF.
const EventLength = 104

// Event is an accounting event delivered to userspace from the Probe.
type Event struct {
Start uint64 `json:"start"` // epoch timestamp of flow start
Timestamp uint64 `json:"timestamp"` // ktime of event, relative to machine boot time
FlowID uint64 `json:"flow_id"`
FlowID uint32 `json:"flow_id"`
Connmark uint32 `json:"connmark"`
SrcAddr net.IP `json:"src_addr"`
DstAddr net.IP `json:"dst_addr"`
Expand Down Expand Up @@ -74,13 +83,18 @@ func (e *Event) unmarshalBinary(b []byte) error {

e.NetNS = *(*uint32)(unsafe.Pointer(&b[92]))

// Generate and set the Event's FlowID.
e.FlowID = e.hashFlow()

return nil
}

// hashFlow appends the Event's source and destination address,
// ports, protocol and connection ID, calculates the hash,
// sets the Event's FlowID and resets the Hasher.
func (e *Event) hashFlow(h hash.Hash) {
// hashFlow calculates a flow hash base on the the Event's
// source and destination address, ports, protocol and connection ID.
func (e *Event) hashFlow() uint32 {

// Get a Hasher from the pool.
h := hashPool.Get().(*blake3.Hasher)

// Source/Destination Address.
_, _ = h.Write(e.SrcAddr)
Expand All @@ -105,11 +119,16 @@ func (e *Event) hashFlow(h hash.Hash) {
binary.BigEndian.PutUint32(b, e.connectionID)
_, _ = h.Write(b)

// Calculate the hash and reset the Hasher.
// Calculate the hash.
// Shift one position to the right to fit the FlowID into a
// (signed) long, eg. in elasticsearch.
e.FlowID = binary.LittleEndian.Uint64(h.Sum(nil)) >> 1
// signed integer field, eg. in elasticsearch.
out := binary.LittleEndian.Uint32(h.Sum(nil)) >> 1

// Reset and return the Hasher to the pool.
h.Reset()
hashPool.Put(h)

return out
}

// String returns a readable string representation of the Event.
Expand Down
7 changes: 1 addition & 6 deletions pkg/bpf/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"lukechampine.com/blake3"
)

func TestHashFlow(t *testing.T) {

h := blake3.New(8, nil)

e := Event{
SrcAddr: net.ParseIP("1.2.3.4"),
DstAddr: net.ParseIP("5.6.7.8"),
Expand All @@ -21,7 +18,5 @@ func TestHashFlow(t *testing.T) {
connectionID: 11111111,
}

e.hashFlow(h)

assert.Equal(t, uint64(0x557151a9a4846ab8), e.FlowID)
assert.Equal(t, uint32(0x24846ab8), e.hashFlow())
}
8 changes: 0 additions & 8 deletions pkg/bpf/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/pkg/errors"
"github.com/ti-mo/gobpf/elf"
"lukechampine.com/blake3"

"github.com/ti-mo/conntracct/pkg/kernel"
)
Expand Down Expand Up @@ -201,10 +200,6 @@ func (ap *Probe) perfWorker() {
var eb []byte
var ok, update bool

// Initialize a hasher local to the goroutine. Output size is 64 bits.
// The hasher is re-used between invocations of hashFlow and is not thread-safe.
h := blake3.New(8, nil)

for {
select {
case eb, ok = <-ap.perfUpdateChan:
Expand All @@ -225,9 +220,6 @@ func (ap *Probe) perfWorker() {
ap.sendError(errors.Wrap(err, "error unmarshaling Event byte array"))
}

// Generate and set the Event's FlowID.
ae.hashFlow(h)

// Fanout to all registered consumers.
ap.fanoutEvent(ae, update)
}
Expand Down

0 comments on commit 8940173

Please sign in to comment.